diff --git a/apis/kueue/v1beta1/clusterqueue_types.go b/apis/kueue/v1beta1/clusterqueue_types.go index 8a5e9975b43..d5326715073 100644 --- a/apis/kueue/v1beta1/clusterqueue_types.go +++ b/apis/kueue/v1beta1/clusterqueue_types.go @@ -142,6 +142,24 @@ type ClusterQueueSpec struct { // admissionScope indicates whether ClusterQueue uses the Admission Fair Sharing // +optional AdmissionScope *AdmissionScope `json:"admissionScope,omitempty"` + + // excludeResourcePrefixes defines which resources should be ignored by + // Kueue for quota management in this ClusterQueue. Resources matching any + // of the prefixes will be excluded from quota calculations. + // When specified, this list is combined (union) with the global + // excludeResourcePrefixes from the Kueue Configuration. + // The prefix matching follows the same semantics as the global configuration: + // - An exact match of the resource name + // - A prefix match followed by a slash (e.g., "example.com" matches "example.com/gpu") + // + // Example: ["ephemeral-storage", "hugepages-", "example.com"] + // + // +listType=set + // +kubebuilder:validation:MaxItems=64 + // +kubebuilder:validation:items:MaxLength=256 + // +kubebuilder:validation:items:MinLength=1 + // +optional + ExcludeResourcePrefixes []string `json:"excludeResourcePrefixes,omitempty"` } // AdmissionChecksStrategy defines a strategy for a AdmissionCheck. diff --git a/apis/kueue/v1beta1/zz_generated.conversion.go b/apis/kueue/v1beta1/zz_generated.conversion.go index 7fb80707c5c..88e7adadd42 100644 --- a/apis/kueue/v1beta1/zz_generated.conversion.go +++ b/apis/kueue/v1beta1/zz_generated.conversion.go @@ -1253,6 +1253,7 @@ func autoConvert_v1beta1_ClusterQueueSpec_To_v1beta2_ClusterQueueSpec(in *Cluste out.StopPolicy = (*v1beta2.StopPolicy)(unsafe.Pointer(in.StopPolicy)) out.FairSharing = (*v1beta2.FairSharing)(unsafe.Pointer(in.FairSharing)) out.AdmissionScope = (*v1beta2.AdmissionScope)(unsafe.Pointer(in.AdmissionScope)) + out.ExcludeResourcePrefixes = *(*[]string)(unsafe.Pointer(&in.ExcludeResourcePrefixes)) return nil } @@ -1268,6 +1269,7 @@ func autoConvert_v1beta2_ClusterQueueSpec_To_v1beta1_ClusterQueueSpec(in *v1beta out.StopPolicy = (*StopPolicy)(unsafe.Pointer(in.StopPolicy)) out.FairSharing = (*FairSharing)(unsafe.Pointer(in.FairSharing)) out.AdmissionScope = (*AdmissionScope)(unsafe.Pointer(in.AdmissionScope)) + out.ExcludeResourcePrefixes = *(*[]string)(unsafe.Pointer(&in.ExcludeResourcePrefixes)) return nil } diff --git a/apis/kueue/v1beta1/zz_generated.deepcopy.go b/apis/kueue/v1beta1/zz_generated.deepcopy.go index 7277dd8b717..89118b871f1 100644 --- a/apis/kueue/v1beta1/zz_generated.deepcopy.go +++ b/apis/kueue/v1beta1/zz_generated.deepcopy.go @@ -457,6 +457,11 @@ func (in *ClusterQueueSpec) DeepCopyInto(out *ClusterQueueSpec) { *out = new(AdmissionScope) **out = **in } + if in.ExcludeResourcePrefixes != nil { + in, out := &in.ExcludeResourcePrefixes, &out.ExcludeResourcePrefixes + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterQueueSpec. diff --git a/apis/kueue/v1beta2/clusterqueue_types.go b/apis/kueue/v1beta2/clusterqueue_types.go index 9f749757bf6..6f70121336c 100644 --- a/apis/kueue/v1beta2/clusterqueue_types.go +++ b/apis/kueue/v1beta2/clusterqueue_types.go @@ -142,6 +142,24 @@ type ClusterQueueSpec struct { // admissionScope indicates whether ClusterQueue uses the Admission Fair Sharing // +optional AdmissionScope *AdmissionScope `json:"admissionScope,omitempty"` + + // excludeResourcePrefixes defines which resources should be ignored by + // Kueue for quota management in this ClusterQueue. Resources matching any + // of the prefixes will be excluded from quota calculations. + // When specified, this list is combined (union) with the global + // excludeResourcePrefixes from the Kueue Configuration. + // The prefix matching follows the same semantics as the global configuration: + // - An exact match of the resource name + // - A prefix match followed by a slash (e.g., "example.com" matches "example.com/gpu") + // + // Example: ["ephemeral-storage", "hugepages-", "example.com"] + // + // +listType=set + // +kubebuilder:validation:MaxItems=64 + // +kubebuilder:validation:items:MaxLength=256 + // +kubebuilder:validation:items:MinLength=1 + // +optional + ExcludeResourcePrefixes []string `json:"excludeResourcePrefixes,omitempty"` } // AdmissionChecksStrategy defines a strategy for a AdmissionCheck. diff --git a/apis/kueue/v1beta2/zz_generated.deepcopy.go b/apis/kueue/v1beta2/zz_generated.deepcopy.go index 281deaf91f6..dde72517972 100644 --- a/apis/kueue/v1beta2/zz_generated.deepcopy.go +++ b/apis/kueue/v1beta2/zz_generated.deepcopy.go @@ -457,6 +457,11 @@ func (in *ClusterQueueSpec) DeepCopyInto(out *ClusterQueueSpec) { *out = new(AdmissionScope) **out = **in } + if in.ExcludeResourcePrefixes != nil { + in, out := &in.ExcludeResourcePrefixes, &out.ExcludeResourcePrefixes + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterQueueSpec. diff --git a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go index 62a02cc5ba4..6aa647d7aec 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go +++ b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go @@ -36,6 +36,7 @@ type ClusterQueueSpecApplyConfiguration struct { StopPolicy *kueuev1beta1.StopPolicy `json:"stopPolicy,omitempty"` FairSharing *FairSharingApplyConfiguration `json:"fairSharing,omitempty"` AdmissionScope *AdmissionScopeApplyConfiguration `json:"admissionScope,omitempty"` + ExcludeResourcePrefixes []string `json:"excludeResourcePrefixes,omitempty"` } // ClusterQueueSpecApplyConfiguration constructs a declarative configuration of the ClusterQueueSpec type for use with @@ -138,3 +139,13 @@ func (b *ClusterQueueSpecApplyConfiguration) WithAdmissionScope(value *Admission b.AdmissionScope = value return b } + +// WithExcludeResourcePrefixes adds the given value to the ExcludeResourcePrefixes field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the ExcludeResourcePrefixes field. +func (b *ClusterQueueSpecApplyConfiguration) WithExcludeResourcePrefixes(values ...string) *ClusterQueueSpecApplyConfiguration { + for i := range values { + b.ExcludeResourcePrefixes = append(b.ExcludeResourcePrefixes, values[i]) + } + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta2/clusterqueuespec.go b/client-go/applyconfiguration/kueue/v1beta2/clusterqueuespec.go index b76cd0f2426..c018caf1f24 100644 --- a/client-go/applyconfiguration/kueue/v1beta2/clusterqueuespec.go +++ b/client-go/applyconfiguration/kueue/v1beta2/clusterqueuespec.go @@ -36,6 +36,7 @@ type ClusterQueueSpecApplyConfiguration struct { StopPolicy *kueuev1beta2.StopPolicy `json:"stopPolicy,omitempty"` FairSharing *FairSharingApplyConfiguration `json:"fairSharing,omitempty"` AdmissionScope *AdmissionScopeApplyConfiguration `json:"admissionScope,omitempty"` + ExcludeResourcePrefixes []string `json:"excludeResourcePrefixes,omitempty"` } // ClusterQueueSpecApplyConfiguration constructs a declarative configuration of the ClusterQueueSpec type for use with @@ -138,3 +139,13 @@ func (b *ClusterQueueSpecApplyConfiguration) WithAdmissionScope(value *Admission b.AdmissionScope = value return b } + +// WithExcludeResourcePrefixes adds the given value to the ExcludeResourcePrefixes field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the ExcludeResourcePrefixes field. +func (b *ClusterQueueSpecApplyConfiguration) WithExcludeResourcePrefixes(values ...string) *ClusterQueueSpecApplyConfiguration { + for i := range values { + b.ExcludeResourcePrefixes = append(b.ExcludeResourcePrefixes, values[i]) + } + return b +} diff --git a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml index e42809e8f55..45083b192db 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml @@ -131,6 +131,25 @@ spec: maxLength: 253 pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$ type: string + excludeResourcePrefixes: + description: |- + excludeResourcePrefixes defines which resources should be ignored by + Kueue for quota management in this ClusterQueue. Resources matching any + of the prefixes will be excluded from quota calculations. + When specified, this list is combined (union) with the global + excludeResourcePrefixes from the Kueue Configuration. + The prefix matching follows the same semantics as the global configuration: + - An exact match of the resource name + - A prefix match followed by a slash (e.g., "example.com" matches "example.com/gpu") + + Example: ["ephemeral-storage", "hugepages-", "example.com"] + items: + maxLength: 256 + minLength: 1 + type: string + maxItems: 64 + type: array + x-kubernetes-list-type: set fairSharing: description: |- fairSharing defines the properties of the ClusterQueue when @@ -894,6 +913,25 @@ spec: maxLength: 253 pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$ type: string + excludeResourcePrefixes: + description: |- + excludeResourcePrefixes defines which resources should be ignored by + Kueue for quota management in this ClusterQueue. Resources matching any + of the prefixes will be excluded from quota calculations. + When specified, this list is combined (union) with the global + excludeResourcePrefixes from the Kueue Configuration. + The prefix matching follows the same semantics as the global configuration: + - An exact match of the resource name + - A prefix match followed by a slash (e.g., "example.com" matches "example.com/gpu") + + Example: ["ephemeral-storage", "hugepages-", "example.com"] + items: + maxLength: 256 + minLength: 1 + type: string + maxItems: 64 + type: array + x-kubernetes-list-type: set fairSharing: description: |- fairSharing defines the properties of the ClusterQueue when diff --git a/pkg/cache/queue/cluster_queue.go b/pkg/cache/queue/cluster_queue.go index 48ce5e128a7..bd06a6bcf21 100644 --- a/pkg/cache/queue/cluster_queue.go +++ b/pkg/cache/queue/cluster_queue.go @@ -114,12 +114,24 @@ type ClusterQueue struct { localQueuesInClusterQueue map[utilqueue.LocalQueueReference]bool sw *stickyWorkload + + // excludeResourcePrefixes contains the resource prefixes to exclude from quota management + // at the ClusterQueue level. These are merged with global exclusions. + excludeResourcePrefixes []string } func (c *ClusterQueue) GetName() kueue.ClusterQueueReference { return c.name } +// GetExcludeResourcePrefixes returns the ClusterQueue-specific resource exclusion prefixes. +// These should be merged with global exclusions when creating workload Info. +func (c *ClusterQueue) GetExcludeResourcePrefixes() []string { + c.rwm.RLock() + defer c.rwm.RUnlock() + return slices.Clone(c.excludeResourcePrefixes) +} + func workloadKey(i *workload.Info) workload.Reference { return workload.Key(i.Obj) } @@ -162,6 +174,7 @@ func (c *ClusterQueue) Update(apiCQ *kueue.ClusterQueue) error { } c.namespaceSelector = nsSelector c.active = apimeta.IsStatusConditionTrue(apiCQ.Status.Conditions, kueue.ClusterQueueActive) + c.excludeResourcePrefixes = slices.Clone(apiCQ.Spec.ExcludeResourcePrefixes) return nil } diff --git a/pkg/cache/queue/cluster_queue_test.go b/pkg/cache/queue/cluster_queue_test.go index 1c55073024b..ad3d0b26eb5 100644 --- a/pkg/cache/queue/cluster_queue_test.go +++ b/pkg/cache/queue/cluster_queue_test.go @@ -1138,3 +1138,111 @@ func TestFsAdmission(t *testing.T) { }) } } + +func TestClusterQueueGetExcludeResourcePrefixes(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + + cases := map[string]struct { + cqExclusions []string + want []string + }{ + "no exclusions": { + cqExclusions: []string{}, + want: []string{}, + }, + "single exclusion": { + cqExclusions: []string{"example.com/"}, + want: []string{"example.com/"}, + }, + "multiple exclusions": { + cqExclusions: []string{"example.com/", "foo.io/", "bar.io/"}, + want: []string{"example.com/", "foo.io/", "bar.io/"}, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + cq := newClusterQueueImpl(ctx, nil, defaultOrdering, testingclock.NewFakeClock(time.Now()), nil, false, nil) + apiCQ := utiltestingapi.MakeClusterQueue("test-cq"). + ExcludeResourcePrefixes(tc.cqExclusions). + Obj() + + err := cq.Update(apiCQ) + if err != nil { + t.Fatalf("Failed to update ClusterQueue: %v", err) + } + + got := cq.GetExcludeResourcePrefixes() + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("Unexpected exclude resource prefixes (-want,+got):\n%s", diff) + } + + // Verify that the returned slice is a copy (not the original) + if len(got) > 0 { + got[0] = "modified/" + unmodified := cq.GetExcludeResourcePrefixes() + if diff := cmp.Diff(tc.want, unmodified); diff != "" { + t.Errorf("Modifying returned slice affected internal state (-want,+got):\n%s", diff) + } + } + }) + } +} + +func TestClusterQueueUpdateExclusions(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + + cases := map[string]struct { + initialExclusions []string + updatedExclusions []string + }{ + "empty to non-empty": { + initialExclusions: []string{}, + updatedExclusions: []string{"example.com/"}, + }, + "non-empty to empty": { + initialExclusions: []string{"example.com/"}, + updatedExclusions: []string{}, + }, + "update existing": { + initialExclusions: []string{"example.com/"}, + updatedExclusions: []string{"foo.io/", "bar.io/"}, + }, + "no change": { + initialExclusions: []string{"example.com/"}, + updatedExclusions: []string{"example.com/"}, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + cq := newClusterQueueImpl(ctx, nil, defaultOrdering, testingclock.NewFakeClock(time.Now()), nil, false, nil) + + // Set initial exclusions + apiCQ := utiltestingapi.MakeClusterQueue("test-cq"). + ExcludeResourcePrefixes(tc.initialExclusions). + Obj() + err := cq.Update(apiCQ) + if err != nil { + t.Fatalf("Failed initial update: %v", err) + } + + got := cq.GetExcludeResourcePrefixes() + if diff := cmp.Diff(tc.initialExclusions, got); diff != "" { + t.Errorf("Initial exclusions incorrect (-want,+got):\n%s", diff) + } + + // Update exclusions + apiCQ.Spec.ExcludeResourcePrefixes = tc.updatedExclusions + err = cq.Update(apiCQ) + if err != nil { + t.Fatalf("Failed to update ClusterQueue: %v", err) + } + + got = cq.GetExcludeResourcePrefixes() + if diff := cmp.Diff(tc.updatedExclusions, got); diff != "" { + t.Errorf("Updated exclusions incorrect (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/pkg/cache/queue/manager.go b/pkg/cache/queue/manager.go index d853d7455a0..ed16e9adc60 100644 --- a/pkg/cache/queue/manager.go +++ b/pkg/cache/queue/manager.go @@ -77,6 +77,7 @@ func WithPodsReadyRequeuingTimestamp(ts config.RequeuingTimestamp) Option { // WithExcludedResourcePrefixes sets the list of excluded resource prefixes func WithExcludedResourcePrefixes(excludedPrefixes []string) Option { return func(m *Manager) { + m.globalExcludedResourcePrefixes = excludedPrefixes m.workloadInfoOptions = append(m.workloadInfoOptions, workload.WithExcludedResourcePrefixes(excludedPrefixes)) } } @@ -88,6 +89,65 @@ func WithResourceTransformations(transforms []config.ResourceTransformation) Opt } } +// workloadInfoOptionsWithCQExclusions creates workload info options by merging +// global excludeResourcePrefixes with ClusterQueue-specific ones. +// Returns a new slice with merged exclusions when the feature gate is enabled. +func (m *Manager) workloadInfoOptionsWithCQExclusions(cq *ClusterQueue, additionalOpts ...workload.InfoOption) []workload.InfoOption { + if !features.Enabled(features.ClusterQueueExcludeResources) { + // Feature gate disabled, use only global exclusions + return append(m.workloadInfoOptions, additionalOpts...) + } + + cqExclusions := cq.GetExcludeResourcePrefixes() + if len(cqExclusions) == 0 { + // No CQ-specific exclusions, use only global ones + return append(m.workloadInfoOptions, additionalOpts...) + } + + // Merge global and CQ-specific exclusions + globalExclusions := m.extractGlobalExclusions() + mergedExclusions := mergeExclusions(globalExclusions, cqExclusions) + + // Build new options: first add non-exclusion global options, then merged exclusions, then additional opts + // We start with global options (which includes global exclusions) + // Then we override with merged exclusions + // The last WithExcludedResourcePrefixes call will take precedence in workload.NewInfo + newOptions := make([]workload.InfoOption, 0, len(m.workloadInfoOptions)+len(additionalOpts)+1) + newOptions = append(newOptions, m.workloadInfoOptions...) + newOptions = append(newOptions, workload.WithExcludedResourcePrefixes(mergedExclusions)) + newOptions = append(newOptions, additionalOpts...) + + return newOptions +} + +// extractGlobalExclusions returns the global excluded resource prefixes. +func (m *Manager) extractGlobalExclusions() []string { + return m.globalExcludedResourcePrefixes +} + +// mergeExclusions combines global and ClusterQueue-specific exclusions. +// Returns a deduplicated list containing all unique prefixes from both sources. +func mergeExclusions(global, cq []string) []string { + seen := make(map[string]bool, len(global)+len(cq)) + result := make([]string, 0, len(global)+len(cq)) + + for _, prefix := range global { + if !seen[prefix] { + seen[prefix] = true + result = append(result, prefix) + } + } + + for _, prefix := range cq { + if !seen[prefix] { + seen[prefix] = true + result = append(result, prefix) + } + } + + return result +} + // SetDRAReconcileChannel sets the DRA reconcile channel after manager creation. func (m *Manager) SetDRAReconcileChannel(ch chan<- event.TypedGenericEvent[*kueue.Workload]) { m.draReconcileChannel = ch @@ -115,6 +175,9 @@ type Manager struct { workloadOrdering workload.Ordering workloadInfoOptions []workload.InfoOption + // globalExcludedResourcePrefixes stores the global excluded resource prefixes + // for easier merging with ClusterQueue-specific exclusions + globalExcludedResourcePrefixes []string hm hierarchy.Manager[*ClusterQueue, *cohort] @@ -335,7 +398,14 @@ func (m *Manager) AddLocalQueue(ctx context.Context, q *kueue.LocalQueue) error } workload.AdjustResources(ctx, m.client, &w) - qImpl.AddOrUpdate(workload.NewInfo(&w, m.workloadInfoOptions...)) + // Get ClusterQueue to merge exclusions + cq := m.hm.ClusterQueue(qImpl.ClusterQueue) + if cq != nil { + options := m.workloadInfoOptionsWithCQExclusions(cq) + qImpl.AddOrUpdate(workload.NewInfo(&w, options...)) + } else { + qImpl.AddOrUpdate(workload.NewInfo(&w, m.workloadInfoOptions...)) + } } cq := m.hm.ClusterQueue(qImpl.ClusterQueue) if cq != nil && cq.AddFromLocalQueue(qImpl) { @@ -450,13 +520,17 @@ func (m *Manager) AddOrUpdateWorkloadWithoutLock(w *kueue.Workload, opts ...work if q == nil { return ErrLocalQueueDoesNotExistOrInactive } - allOptions := append(m.workloadInfoOptions, opts...) - wInfo := workload.NewInfo(w, allOptions...) - q.AddOrUpdate(wInfo) + + // Get ClusterQueue to access its exclude resource prefixes cq := m.hm.ClusterQueue(q.ClusterQueue) if cq == nil { return ErrClusterQueueDoesNotExist } + + // Merge global and ClusterQueue-specific exclusions + allOptions := m.workloadInfoOptionsWithCQExclusions(cq, opts...) + wInfo := workload.NewInfo(w, allOptions...) + q.AddOrUpdate(wInfo) cq.PushOrUpdate(wInfo) if features.Enabled(features.LocalQueueMetrics) { m.reportLQPendingWorkloads(q) @@ -833,7 +907,20 @@ func (m *Manager) queueSecondPass(ctx context.Context, w *kueue.Workload, iterat defer m.Unlock() log := ctrl.LoggerFrom(ctx) - wInfo := workload.NewInfo(w, m.workloadInfoOptions...) + + // Get ClusterQueue for merged exclusions + var wInfo *workload.Info + qKey := queue.KeyFromWorkload(w) + if q, ok := m.localQueues[qKey]; ok { + if cq := m.hm.ClusterQueue(q.ClusterQueue); cq != nil { + options := m.workloadInfoOptionsWithCQExclusions(cq) + wInfo = workload.NewInfo(w, options...) + } + } + if wInfo == nil { + wInfo = workload.NewInfo(w, m.workloadInfoOptions...) + } + wInfo.SecondPassIteration = iteration if m.secondPassQueue.queue(wInfo) { log.V(3).Info("Workload queued for second pass of scheduling", "workload", workload.Key(w)) diff --git a/pkg/cache/queue/manager_test.go b/pkg/cache/queue/manager_test.go index 7674d5d69f5..424477f250d 100644 --- a/pkg/cache/queue/manager_test.go +++ b/pkg/cache/queue/manager_test.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/util/queue" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" utiltestingapi "sigs.k8s.io/kueue/pkg/util/testing/v1beta2" @@ -1353,3 +1354,142 @@ func TestQueueSecondPassIfNeeded(t *testing.T) { }) } } + +func TestMergeExclusions(t *testing.T) { + cases := map[string]struct { + global []string + cq []string + want []string + }{ + "both empty": { + global: []string{}, + cq: []string{}, + want: []string{}, + }, + "only global": { + global: []string{"example.com/"}, + cq: []string{}, + want: []string{"example.com/"}, + }, + "only cq": { + global: []string{}, + cq: []string{"foo.io/"}, + want: []string{"foo.io/"}, + }, + "no overlap": { + global: []string{"example.com/", "bar.io/"}, + cq: []string{"foo.io/", "baz.io/"}, + want: []string{"example.com/", "bar.io/", "foo.io/", "baz.io/"}, + }, + "with overlap": { + global: []string{"example.com/", "bar.io/"}, + cq: []string{"example.com/", "baz.io/"}, + want: []string{"example.com/", "bar.io/", "baz.io/"}, + }, + "all same": { + global: []string{"example.com/"}, + cq: []string{"example.com/"}, + want: []string{"example.com/"}, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := mergeExclusions(tc.global, tc.cq) + if diff := cmp.Diff(tc.want, got, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { + t.Errorf("Unexpected merged exclusions (-want,+got):\n%s", diff) + } + }) + } +} + +func TestWorkloadInfoOptionsWithCQExclusions(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + + cases := map[string]struct { + globalExclusions []string + cqExclusions []string + featureEnabled bool + wantExclusions []string + }{ + "feature disabled, only global": { + globalExclusions: []string{"example.com/"}, + cqExclusions: []string{"foo.io/"}, + featureEnabled: false, + wantExclusions: []string{"example.com/"}, + }, + "feature enabled, only global": { + globalExclusions: []string{"example.com/"}, + cqExclusions: []string{}, + featureEnabled: true, + wantExclusions: []string{"example.com/"}, + }, + "feature enabled, both present": { + globalExclusions: []string{"example.com/"}, + cqExclusions: []string{"foo.io/"}, + featureEnabled: true, + wantExclusions: []string{"example.com/", "foo.io/"}, + }, + "feature enabled, with overlap": { + globalExclusions: []string{"example.com/", "bar.io/"}, + cqExclusions: []string{"example.com/", "baz.io/"}, + featureEnabled: true, + wantExclusions: []string{"example.com/", "bar.io/", "baz.io/"}, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + features.SetFeatureGateDuringTest(t, features.ClusterQueueExcludeResources, tc.featureEnabled) + + manager := NewManager(utiltesting.NewFakeClient(), nil, WithExcludedResourcePrefixes(tc.globalExclusions)) + + cq := newClusterQueueImpl(ctx, nil, defaultOrdering, testingclock.NewFakeClock(time.Now()), nil, false, nil) + apiCQ := utiltestingapi.MakeClusterQueue("test-cq"). + ExcludeResourcePrefixes(tc.cqExclusions). + Obj() + err := cq.Update(apiCQ) + if err != nil { + t.Fatalf("Failed to update ClusterQueue: %v", err) + } + + opts := manager.workloadInfoOptionsWithCQExclusions(cq) + + // Create a workload with resources that should be excluded + wl := utiltestingapi.MakeWorkload("test-wl", "default"). + Request("example.com/gpu", "1"). + Request("foo.io/special", "2"). + Request("bar.io/resource", "3"). + Request("baz.io/resource", "4"). + Request(corev1.ResourceCPU, "1"). + Obj() + + wInfo := workload.NewInfo(wl, opts...) + + // Check which resources were excluded + for _, prefix := range tc.wantExclusions { + for _, podSetRes := range wInfo.TotalRequests { + for resourceName := range podSetRes.Requests { + if strings.HasPrefix(string(resourceName), prefix) { + t.Errorf("Resource %s with prefix %s should have been excluded but was found in TotalRequests", resourceName, prefix) + } + } + } + } + + // CPU should never be excluded + cpuFound := false + for _, podSetRes := range wInfo.TotalRequests { + for resourceName := range podSetRes.Requests { + if resourceName == corev1.ResourceCPU { + cpuFound = true + break + } + } + } + if !cpuFound { + t.Error("CPU resource should not be excluded") + } + }) + } +} diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 230badd3277..129ac7a09aa 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -198,6 +198,12 @@ const ( // // Enable all updates to Workload objects to use Patch Merge instead of Patch Apply. WorkloadRequestUseMergePatch featuregate.Feature = "WorkloadRequestUseMergePatch" + + // owner: @kehannon + // kep: https://github.com/kubernetes-sigs/kueue/tree/main/keps/5800-clusterqueue-exclude-resources + // + // Enable ClusterQueue-level resource exclusion via excludeResourcePrefixes field. + ClusterQueueExcludeResources featuregate.Feature = "ClusterQueueExcludeResources" ) func init() { @@ -312,6 +318,9 @@ var defaultVersionedFeatureGates = map[featuregate.Feature]featuregate.Versioned WorkloadRequestUseMergePatch: { {Version: version.MustParse("0.14"), Default: false, PreRelease: featuregate.Alpha}, }, + ClusterQueueExcludeResources: { + {Version: version.MustParse("0.15"), Default: false, PreRelease: featuregate.Alpha}, + }, } func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) { diff --git a/pkg/util/testing/v1beta2/wrappers.go b/pkg/util/testing/v1beta2/wrappers.go index 47a3472a400..46bbd5a3701 100644 --- a/pkg/util/testing/v1beta2/wrappers.go +++ b/pkg/util/testing/v1beta2/wrappers.go @@ -866,6 +866,12 @@ func (c *ClusterQueueWrapper) Cohort(cohort kueue.CohortReference) *ClusterQueue return c } +// ExcludeResourcePrefixes sets the resource prefixes to exclude from quota management. +func (c *ClusterQueueWrapper) ExcludeResourcePrefixes(prefixes []string) *ClusterQueueWrapper { + c.Spec.ExcludeResourcePrefixes = prefixes + return c +} + func (c *ClusterQueueWrapper) AdmissionCheckStrategy(acs ...kueue.AdmissionCheckStrategyRule) *ClusterQueueWrapper { if c.Spec.AdmissionChecksStrategy == nil { c.Spec.AdmissionChecksStrategy = &kueue.AdmissionChecksStrategy{} diff --git a/site/content/en/docs/reference/kueue.v1beta1.md b/site/content/en/docs/reference/kueue.v1beta1.md index c6e40fb2955..531ebae0edd 100644 --- a/site/content/en/docs/reference/kueue.v1beta1.md +++ b/site/content/en/docs/reference/kueue.v1beta1.md @@ -1103,6 +1103,23 @@ if FairSharing is enabled in the Kueue configuration.

admissionScope indicates whether ClusterQueue uses the Admission Fair Sharing

+excludeResourcePrefixes
+[]string + + +

excludeResourcePrefixes defines which resources should be ignored by +Kueue for quota management in this ClusterQueue. Resources matching any +of the prefixes will be excluded from quota calculations. +When specified, this list is combined (union) with the global +excludeResourcePrefixes from the Kueue Configuration. +The prefix matching follows the same semantics as the global configuration:

+ +

Example: ["ephemeral-storage", "hugepages-", "example.com"]

+ + diff --git a/site/content/en/docs/reference/kueue.v1beta2.md b/site/content/en/docs/reference/kueue.v1beta2.md index 563503ba4fd..1d167f97174 100644 --- a/site/content/en/docs/reference/kueue.v1beta2.md +++ b/site/content/en/docs/reference/kueue.v1beta2.md @@ -1103,6 +1103,23 @@ if FairSharing is enabled in the Kueue configuration.

admissionScope indicates whether ClusterQueue uses the Admission Fair Sharing

+excludeResourcePrefixes
+[]string + + +

excludeResourcePrefixes defines which resources should be ignored by +Kueue for quota management in this ClusterQueue. Resources matching any +of the prefixes will be excluded from quota calculations. +When specified, this list is combined (union) with the global +excludeResourcePrefixes from the Kueue Configuration. +The prefix matching follows the same semantics as the global configuration:

+ +

Example: ["ephemeral-storage", "hugepages-", "example.com"]

+ + diff --git a/test/integration/singlecluster/scheduler/clusterqueue_exclude_resources_test.go b/test/integration/singlecluster/scheduler/clusterqueue_exclude_resources_test.go new file mode 100644 index 00000000000..65afee5659d --- /dev/null +++ b/test/integration/singlecluster/scheduler/clusterqueue_exclude_resources_test.go @@ -0,0 +1,161 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2" + "sigs.k8s.io/kueue/pkg/features" + utiltestingapi "sigs.k8s.io/kueue/pkg/util/testing/v1beta2" + "sigs.k8s.io/kueue/test/util" +) + +var _ = ginkgo.Describe("ClusterQueue ExcludeResourcePrefixes", func() { + const ( + excludedPrefix1 = "example.com/" + excludedPrefix2 = "foo.io/" + excludedPrefix3 = "bar.io/" + ) + + var ( + ns *corev1.Namespace + rf *kueue.ResourceFlavor + ) + + ginkgo.BeforeEach(func() { + features.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), features.ClusterQueueExcludeResources, true) + ns = util.CreateNamespaceFromPrefixWithLog(ctx, k8sClient, "cq-exclude-") + + rf = utiltestingapi.MakeResourceFlavor("default").Obj() + util.MustCreate(ctx, k8sClient, rf) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectObjectToBeDeleted(ctx, k8sClient, rf, true) + }) + + ginkgo.When("ClusterQueue has excludeResourcePrefixes", func() { + ginkgo.It("Should store and retrieve exclude resource prefixes", func() { + cq := utiltestingapi.MakeClusterQueue("cq-with-exclusions"). + ResourceGroup( + *utiltestingapi.MakeFlavorQuotas("default"). + Resource(corev1.ResourceCPU, "10"). + Obj(), + ). + ExcludeResourcePrefixes([]string{excludedPrefix1, excludedPrefix2}). + Obj() + + ginkgo.By("Verifying test object was created with prefixes") + gomega.Expect(cq.Spec.ExcludeResourcePrefixes).To(gomega.ConsistOf(excludedPrefix1, excludedPrefix2)) + + util.MustCreate(ctx, k8sClient, cq) + defer func() { + util.ExpectObjectToBeDeleted(ctx, k8sClient, cq, true) + }() + + ginkgo.By("Checking the ClusterQueue has the correct exclude prefixes") + var createdCQ kueue.ClusterQueue + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), &createdCQ)).To(gomega.Succeed()) + ginkgo.GinkgoWriter.Printf("Retrieved CQ prefixes: %v\n", createdCQ.Spec.ExcludeResourcePrefixes) + g.Expect(createdCQ.Spec.ExcludeResourcePrefixes).To(gomega.ConsistOf(excludedPrefix1, excludedPrefix2)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.It("Should update exclude resource prefixes", func() { + cq := utiltestingapi.MakeClusterQueue("cq-update-exclusions"). + ResourceGroup( + *utiltestingapi.MakeFlavorQuotas("default"). + Resource(corev1.ResourceCPU, "10"). + Obj(), + ). + ExcludeResourcePrefixes([]string{excludedPrefix1}). + Obj() + util.MustCreate(ctx, k8sClient, cq) + defer func() { + util.ExpectObjectToBeDeleted(ctx, k8sClient, cq, true) + }() + + ginkgo.By("Updating the ClusterQueue to change exclusions") + gomega.Eventually(func(g gomega.Gomega) { + var updatedCQ kueue.ClusterQueue + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), &updatedCQ)).To(gomega.Succeed()) + updatedCQ.Spec.ExcludeResourcePrefixes = []string{excludedPrefix2, excludedPrefix3} + g.Expect(k8sClient.Update(ctx, &updatedCQ)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("Checking the updated exclusions") + var updatedCQ kueue.ClusterQueue + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), &updatedCQ)).To(gomega.Succeed()) + g.Expect(updatedCQ.Spec.ExcludeResourcePrefixes).To(gomega.ConsistOf(excludedPrefix2, excludedPrefix3)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.It("Should handle empty exclude resource prefixes", func() { + cq := utiltestingapi.MakeClusterQueue("cq-no-exclusions"). + ResourceGroup( + *utiltestingapi.MakeFlavorQuotas("default"). + Resource(corev1.ResourceCPU, "10"). + Obj(), + ). + Obj() + util.MustCreate(ctx, k8sClient, cq) + defer func() { + util.ExpectObjectToBeDeleted(ctx, k8sClient, cq, true) + }() + + ginkgo.By("Checking the ClusterQueue has no exclude prefixes") + var createdCQ kueue.ClusterQueue + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), &createdCQ)).To(gomega.Succeed()) + g.Expect(createdCQ.Spec.ExcludeResourcePrefixes).To(gomega.BeEmpty()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + }) + + ginkgo.When("Feature gate is disabled", func() { + ginkgo.It("Should still accept exclude resource prefixes field", func() { + features.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), features.ClusterQueueExcludeResources, false) + + cq := utiltestingapi.MakeClusterQueue("cq-feature-disabled"). + ResourceGroup( + *utiltestingapi.MakeFlavorQuotas("default"). + Resource(corev1.ResourceCPU, "10"). + Obj(), + ). + ExcludeResourcePrefixes([]string{excludedPrefix1}). + Obj() + util.MustCreate(ctx, k8sClient, cq) + defer func() { + util.ExpectObjectToBeDeleted(ctx, k8sClient, cq, true) + }() + + ginkgo.By("Checking the ClusterQueue was created with prefixes (field is always accepted)") + var createdCQ kueue.ClusterQueue + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), &createdCQ)).To(gomega.Succeed()) + g.Expect(createdCQ.Spec.ExcludeResourcePrefixes).To(gomega.ConsistOf(excludedPrefix1)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + }) +}) diff --git a/test/integration/singlecluster/scheduler/suite_test.go b/test/integration/singlecluster/scheduler/suite_test.go index ea8eaa9c961..0efc97600e4 100644 --- a/test/integration/singlecluster/scheduler/suite_test.go +++ b/test/integration/singlecluster/scheduler/suite_test.go @@ -81,8 +81,10 @@ func managerAndSchedulerSetup(ctx context.Context, mgr manager.Manager) { Outputs: corev1.ResourceList{corev1.ResourceCPU: resourcev1.MustParse("2")}, }, } - cCache := schdcache.New(mgr.GetClient()) - queues := qcache.NewManager(mgr.GetClient(), cCache, qcache.WithResourceTransformations(transformations)) + + excludedPrefixes := []string{"example.com/", "foo.io/", "bar.io/"} + cCache := schdcache.New(mgr.GetClient(), schdcache.WithExcludedResourcePrefixes(excludedPrefixes)) + queues := qcache.NewManager(mgr.GetClient(), cCache, qcache.WithResourceTransformations(transformations), qcache.WithExcludedResourcePrefixes((excludedPrefixes))) configuration := &config.Configuration{} mgr.GetScheme().Default(configuration)