From e284ce4a12d6d0c98aef31e06011f0f4a01d3e22 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 17 Nov 2025 17:06:17 -0500 Subject: [PATCH] prototype of custom resource scheduling Platforms other than Linux, especially non-Unix platforms, may have a different view of resources that don't model well as `resources.cpu` or `resources.memory`. And users with unusual deployment environments have asked about the possibility of scheduling resources specific to those environments. This PR is a prototype of the kind of interface we might want to be able to support. Nodes get a `client.custom_resource` block where they can define a resource the node will make available in its fingerprint. Resources are one of five types, designed to make it possible to model all our existing resources as custom resources: * `ratio`: The resource is used as a value relative to other tasks with the same resource. An example of this would be Linux `cpu.weight` as implemented by a systemd unit file. * `capped-ratio`: Like a ratio, except all the quantity used by tasks has a maximum total value. An example of this would be Linux `cpu.weight` as currently implemented in Nomad, where the "cap" is derived from the total Mhz of CPUs on the host. * `countable`: The resource has a fixed but fungible amount. An example of this would be memory allocation (ignoring NUMA), where there's a certain amount of memory available on the host and it's "used up" by allocations, but we don't care about identity of the individual blocks of memory. * `dynamic`: The resource has a fixed set of items with exclusive access, but the job doesn't care which ones it gets. An example of this would be Nomad's current dynamic port assignment or `resource.cores`. * `static`: The resource has a fixed set of items with exclusive access, and the job wants a specific one of those items. An example of this woul dbe Nomad's current static port assignment. We can use this prototype to anchor discussions about how we might implement a set of custom resource scheduling features but it's nowhere near production-ready. I've included enough plumbing to implement these five resource types, fingerprint them on clients via config files, and schedule them for allocations. What's not included, all of which we'd want to solve in any productionized version of this work: * Support for exposing the custom resource allocation to a task driver. We'd need to thread custom resources into the protobufs we send via `go-plugin` and then the user's task driver would need to consume that data. * Support for preemption * Support for quotas * Support for dynamically adding custom resources to a node Ref: https://github.com/hashicorp/nomad/issues/1081 --- api/allocations.go | 2 + api/nodes.go | 13 + api/resources.go | 30 ++ client/client.go | 4 + client/config/config.go | 3 + command/agent/agent.go | 2 + command/agent/config.go | 9 + command/agent/config_parse.go | 5 + command/agent/job_endpoint.go | 16 + command/node_status.go | 30 ++ nomad/structs/funcs.go | 3 +- nomad/structs/funcs_test.go | 184 ++++++++++++ nomad/structs/resources.go | 393 +++++++++++++++++++++++++ nomad/structs/resources_test.go | 323 ++++++++++++++++++++ nomad/structs/structs.go | 56 +++- scheduler/feasible/custom_resources.go | 26 ++ scheduler/feasible/rank.go | 21 ++ 17 files changed, 1117 insertions(+), 3 deletions(-) create mode 100644 nomad/structs/resources.go create mode 100644 nomad/structs/resources_test.go create mode 100644 scheduler/feasible/custom_resources.go diff --git a/api/allocations.go b/api/allocations.go index 08dd3fbed21..76d60ce23c7 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -436,12 +436,14 @@ type AllocatedTaskResources struct { Memory AllocatedMemoryResources Networks []*NetworkResource Devices []*AllocatedDeviceResource + Custom []*CustomResource } type AllocatedSharedResources struct { DiskMB int64 Networks []*NetworkResource Ports []PortMapping + Custom []*CustomResource } type PortMapping struct { diff --git a/api/nodes.go b/api/nodes.go index fda4763f7df..6359a9a6b2f 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -587,6 +587,7 @@ type NodeResources struct { Disk NodeDiskResources Networks []*NetworkResource Devices []*NodeDeviceResource + Custom []*NodeCustomResource MinDynamicPort int MaxDynamicPort int @@ -629,6 +630,18 @@ type NodeReservedNetworkResources struct { ReservedHostPorts string } +type NodeCustomResource struct { + Name string `hcl:"name,label"` + Version uint64 `hcl:"version,optional"` + Type CustomResourceType `hcl:"type,optional"` + Scope CustomResourceScope `hcl:"scope,optional"` + + Quantity int64 `hcl:"quantity,optional"` + Range string `hcl:"range,optional"` + Items []any `hcl:"items,optional"` + Meta map[string]string `hcl:"meta,block"` +} + type CSITopologyRequest struct { Required []*CSITopology `hcl:"required"` Preferred []*CSITopology `hcl:"preferred"` diff --git a/api/resources.go b/api/resources.go index f45a4615145..573981ccc6f 100644 --- a/api/resources.go +++ b/api/resources.go @@ -20,6 +20,7 @@ type Resources struct { Devices []*RequestedDevice `hcl:"device,block"` NUMA *NUMAResource `hcl:"numa,block"` SecretsMB *int `mapstructure:"secrets" hcl:"secrets,optional"` + Custom []*CustomResource `mapstructure:"custom" hcl:"custom,block"` // COMPAT(0.10) // XXX Deprecated. Please do not use. The field will be removed in Nomad @@ -330,3 +331,32 @@ func (d *RequestedDevice) Canonicalize() { a.Canonicalize() } } + +type CustomResource struct { + Name string `hcl:"name,label"` + Version uint64 `hcl:"version,optional"` + Type CustomResourceType `hcl:"type,optional"` + Scope CustomResourceScope `hcl:"scope,optional"` + + Quantity int64 `hcl:"quantity,optional"` + Range string `hcl:"range,optional"` + Items []any `hcl:"items,optional"` + Constraints []*Constraint `hcl:"constraint,block"` +} + +type CustomResourceScope string + +const ( + CustomResourceScopeGroup CustomResourceScope = "group" + CustomResourceScopeTask CustomResourceScope = "task" +) + +type CustomResourceType string + +const ( + CustomResourceTypeRatio CustomResourceType = "ratio" // ex. weight + CustomResourceTypeCappedRatio CustomResourceType = "capped-ratio" // ex. resource.cpu + CustomResourceTypeCountable CustomResourceType = "countable" // ex. memory, disk + CustomResourceTypeDynamicInstance CustomResourceType = "dynamic" // ex. ports, cores + CustomResourceTypeStaticInstance CustomResourceType = "static" // ex. ports, devices +) diff --git a/client/client.go b/client/client.go index 587d70b6a09..d428c84f1da 100644 --- a/client/client.go +++ b/client/client.go @@ -1649,12 +1649,14 @@ func (c *Client) setupNode() error { node.NodeResources.MinDynamicPort = newConfig.MinDynamicPort node.NodeResources.MaxDynamicPort = newConfig.MaxDynamicPort node.NodeResources.Processors = newConfig.Node.NodeResources.Processors + node.NodeResources.Custom = newConfig.CustomResources if node.NodeResources.Processors.Empty() { node.NodeResources.Processors = structs.NodeProcessorResources{ Topology: &numalib.Topology{}, } } + node.NodeResources.Custom = newConfig.CustomResources } if node.ReservedResources == nil { node.ReservedResources = &structs.NodeReservedResources{} @@ -1813,6 +1815,8 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp if cpu := response.NodeResources.Processors.TotalCompute(); cpu > 0 { newConfig.CpuCompute = cpu } + + response.NodeResources.Custom = newConfig.CustomResources } if nodeHasChanged { diff --git a/client/config/config.go b/client/config/config.go index 5b6525d1803..ad6bb25d7fd 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -395,6 +395,8 @@ type Config struct { // LogFile is used by MonitorExport to stream a server's log file LogFile string `hcl:"log_file"` + + CustomResources structs.CustomResources } type APIListenerRegistrar interface { @@ -896,6 +898,7 @@ func (c *Config) Copy() *Config { nc.ReservableCores = slices.Clone(c.ReservableCores) nc.Artifact = c.Artifact.Copy() nc.Users = c.Users.Copy() + nc.CustomResources = c.CustomResources.Copy() return &nc } diff --git a/command/agent/agent.go b/command/agent/agent.go index 56dc7089e36..9ca75bd8791 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -158,6 +158,7 @@ func NewAgent(config *Config, logger log.InterceptLogger, logOutput io.Writer, i if err := a.setupServer(); err != nil { return nil, err } + if err := a.setupClient(); err != nil { return nil, err } @@ -936,6 +937,7 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) { conf.Node.Meta = agentConfig.Client.Meta conf.Node.NodeClass = agentConfig.Client.NodeClass conf.Node.NodePool = agentConfig.Client.NodePool + conf.CustomResources = agentConfig.Client.CustomResources // Set up the HTTP advertise address conf.Node.HTTPAddr = agentConfig.AdvertiseAddrs.HTTP diff --git a/command/agent/config.go b/command/agent/config.go index 6aeb1d9b388..fbcd1655586 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -443,6 +443,10 @@ type ClientConfig struct { // LogFile is used by MonitorExport to stream a client's log file LogFile string `hcl:"log_file"` + + // CustomResources allows the user to define custom schedulable resources on + // this node + CustomResources structs.CustomResources `hcl:"custom_resource,block"` } func (c *ClientConfig) Copy() *ClientConfig { @@ -466,6 +470,7 @@ func (c *ClientConfig) Copy() *ClientConfig { nc.Drain = c.Drain.Copy() nc.Users = c.Users.Copy() nc.ExtraKeysHCL = slices.Clone(c.ExtraKeysHCL) + nc.CustomResources = c.CustomResources.Copy() return &nc } @@ -2883,6 +2888,10 @@ func (c *ClientConfig) Merge(b *ClientConfig) *ClientConfig { result.IntroToken = b.IntroToken } + if b.CustomResources != nil { + result.CustomResources.Merge(b.CustomResources) + } + return &result } diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index afd023c667f..ee5cd45a17c 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -330,6 +330,11 @@ func extraKeys(c *Config) error { helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, "host_network") } + for _, cr := range c.Client.CustomResources { + helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, "custom_resource") + helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, cr.Name) + } + // Remove Template extra keys for _, t := range []string{"function_denylist", "disable_file_sandbox", "max_stale", "wait", "wait_bounds", "block_query_wait", "consul_retry", "vault_retry", "nomad_retry"} { helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, t) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index df055515ed7..d4b4f5d4774 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -1646,6 +1646,22 @@ func ApiResourcesToStructs(in *api.Resources) *structs.Resources { out.SecretsMB = *in.SecretsMB } + if len(in.Custom) > 0 { + out.Custom = []*structs.CustomResource{} + for _, apiCr := range in.Custom { + out.Custom = append(out.Custom, &structs.CustomResource{ + Name: apiCr.Name, + Version: apiCr.Version, + Type: structs.CustomResourceType(apiCr.Type), + Scope: structs.CustomResourceScope(apiCr.Scope), + Quantity: apiCr.Quantity, + Range: apiCr.Range, + Items: apiCr.Items, + Constraints: ApiConstraintsToStructs(apiCr.Constraints), + }) + } + } + return out } diff --git a/command/node_status.go b/command/node_status.go index df31b3c02fd..463325db148 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -569,6 +569,7 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { c.outputNodeNetworkInfo(node) c.outputNodeCSIVolumeInfo(client, node, runningAllocs) c.outputNodeDriverInfo(node) + c.outputNodeCustomResourceInfo(node) } // Emit node events @@ -788,6 +789,35 @@ func (c *NodeStatusCommand) outputNodeDriverInfo(node *api.Node) { c.Ui.Output(formatList(nodeDrivers)) } +func (c *NodeStatusCommand) outputNodeCustomResourceInfo(node *api.Node) { + c.Ui.Output(c.Colorize().Color("\n[bold]Custom Resources")) + + resources := node.NodeResources.Custom + if len(resources) == 0 { + c.Ui.Output("") + return + } + + out := make([]string, 0, len(resources)+1) + out = append(out, "Resource|Version|Type|Available") + + sort.Slice(resources, func(i int, j int) bool { return resources[i].Name < resources[j].Name }) + for _, resource := range resources { + switch resource.Type { + case api.CustomResourceTypeRatio, api.CustomResourceTypeCappedRatio, + api.CustomResourceTypeCountable: + out = append(out, fmt.Sprintf("%s|%d|%s|%d", + resource.Name, resource.Version, resource.Type, resource.Quantity)) + + case api.CustomResourceTypeStaticInstance, api.CustomResourceTypeDynamicInstance: + out = append(out, fmt.Sprintf("%s|%d|%s|%v", + resource.Name, resource.Version, resource.Type, resource.Items)) + + } + } + c.Ui.Output(formatList(out)) +} + func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { c.Ui.Output(c.Colorize().Color("\n[bold]Node Events")) c.outputNodeEvent(node.Events) diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 59b6b3d38c7..4f6f51512c2 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -163,7 +163,8 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi cr := alloc.AllocatedResources.Comparable() used.Add(cr) - // Adding the comparable resource unions reserved core sets, need to check if reserved cores overlap + // Adding the comparable resource unions reserved core sets, need to + // check if reserved cores overlap for _, core := range cr.Flattened.Cpu.ReservedCores { if _, ok := reservedCores[core]; ok { coreOverlap = true diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index 79db2d1bf07..aeffa2ada5a 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -638,6 +638,190 @@ func TestAllocsFit_MemoryOversubscription(t *testing.T) { must.Eq(t, 12000, used.Flattened.Memory.MemoryMaxMB) } +func TestAllocsFit_CustomResources(t *testing.T) { + ci.Parallel(t) + + n0 := node2k() + n0.NodeResources.Memory.MemoryMB = 2048 + n0.ReservedResources = nil + n0.NodeResources.Custom = []*CustomResource{ + { + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Range: "1-3,7-10", + Items: []any{1, 2, 3, 7, 8, 9}, + }, + { + Name: "bar_countable", + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }, + } + + a1 := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{CpuShares: 100}, + Memory: AllocatedMemoryResources{MemoryMB: 500}, + Custom: []*CustomResource{ + { + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{3}, + }, + }, + }, + }, + }, + } + + ok, msg, used, err := AllocsFit(n0, []*Allocation{a1}, nil, false) + test.True(t, ok) + test.Eq(t, "", msg) + test.Eq(t, &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 100, ReservedCores: []uint16{}}, + Memory: AllocatedMemoryResources{MemoryMB: 500, MemoryMaxMB: 500}, + Custom: CustomResources{{ + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{3}, + }}, + }, + Shared: AllocatedSharedResources{}, + }, used) + test.NoError(t, err) + + a2 := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{CpuShares: 100}, + Memory: AllocatedMemoryResources{MemoryMB: 500}, + Custom: []*CustomResource{ + { + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{5}, + }, + }, + }, + }, + }, + } + ok, msg, used, err = AllocsFit(n0, []*Allocation{a1, a2}, nil, false) + test.False(t, ok) + test.Eq(t, "custom resource: foo_dynamic", msg) + test.Eq(t, &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 200, ReservedCores: []uint16{}}, + Memory: AllocatedMemoryResources{MemoryMB: 1000, MemoryMaxMB: 1000}, + Custom: CustomResources{{ + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{3, 5}, // TODO(tgross): shows as used here even if not available? + }}, + }, + Shared: AllocatedSharedResources{}, + }, used) + test.NoError(t, err) + + a2.AllocatedResources.Tasks["web"].Custom[0].Items = []any{7} + ok, msg, used, err = AllocsFit(n0, []*Allocation{a1, a2}, nil, false) + test.True(t, ok) + test.Eq(t, "", msg) + test.Eq(t, &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 200, ReservedCores: []uint16{}}, + Memory: AllocatedMemoryResources{MemoryMB: 1000, MemoryMaxMB: 1000}, + Custom: CustomResources{{ + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{3, 7}, + }}, + }, + Shared: AllocatedSharedResources{}, + }, used) + test.NoError(t, err) + + a2.AllocatedResources.Shared.Custom = CustomResources{{ + Name: "bar_countable", + Version: 2, + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }} + ok, msg, used, err = AllocsFit(n0, []*Allocation{a1, a2}, nil, false) + test.False(t, ok) + test.Eq(t, "custom resources could not be compared: resource request 2 for \"bar_countable\" is newer than available version 0", msg) + test.Eq(t, &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 200, ReservedCores: []uint16{}}, + Memory: AllocatedMemoryResources{MemoryMB: 1000, MemoryMaxMB: 1000}, + Custom: CustomResources{{ + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{3, 7}, + }}, + }, + Shared: AllocatedSharedResources{ + Custom: CustomResources{{ + Name: "bar_countable", + Version: 2, + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }}, + }, + }, used) + test.NoError(t, err) + + a2.AllocatedResources.Shared.Custom[0].Version = 0 + ok, msg, used, err = AllocsFit(n0, []*Allocation{a1, a2}, nil, false) + test.True(t, ok) + test.Eq(t, "", msg) + test.Eq(t, &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 200, ReservedCores: []uint16{}}, + Memory: AllocatedMemoryResources{MemoryMB: 1000, MemoryMaxMB: 1000}, + Custom: CustomResources{{ + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{3, 7}, + }}, + }, + Shared: AllocatedSharedResources{ + Custom: CustomResources{{ + Name: "bar_countable", + Version: 2, + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }}, + }, + }, used) + test.NoError(t, err) + +} + func TestScoreFitBinPack(t *testing.T) { ci.Parallel(t) diff --git a/nomad/structs/resources.go b/nomad/structs/resources.go new file mode 100644 index 00000000000..9aba4e6cbb3 --- /dev/null +++ b/nomad/structs/resources.go @@ -0,0 +1,393 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package structs + +import ( + "cmp" + "errors" + "fmt" + "maps" + "math/rand" + "slices" + "strings" + + "github.com/hashicorp/go-set/v3" + "github.com/hashicorp/nomad/helper" +) + +type CustomResource struct { + // TODO(tgross): we need this old ",key" construction for HCL v1 parsing of + // the client config; should this even be the same struct as the config? + Name string `hcl:",key"` + Version uint64 // optional + Type CustomResourceType + Scope CustomResourceScope + + Quantity int64 // for countable or capped-ratio + Range string // for dynamic or static + Items []any // for dynamic or static + Meta map[string]string + + Constraints []*Constraint +} + +type CustomResourceScope string + +const ( + CustomResourceScopeGroup CustomResourceScope = "group" + CustomResourceScopeTask CustomResourceScope = "task" +) + +type CustomResourceType string + +const ( + CustomResourceTypeRatio CustomResourceType = "ratio" // ex. weight + CustomResourceTypeCappedRatio CustomResourceType = "capped-ratio" // ex. resource.cpu + CustomResourceTypeCountable CustomResourceType = "countable" // ex. memory, disk + CustomResourceTypeDynamicInstance CustomResourceType = "dynamic" // ex. ports, cores + CustomResourceTypeStaticInstance CustomResourceType = "static" // ex. ports, devices +) + +// Copy returns a deep clone of the CustomResource +func (cr *CustomResource) Copy() *CustomResource { + ncr := new(CustomResource) + *ncr = *cr + + ncr.Items = slices.Clone(cr.Items) + ncr.Meta = maps.Clone(cr.Meta) + ncr.Constraints = helper.CopySlice(cr.Constraints) + return ncr +} + +func (cr *CustomResource) Validate() error { + // TODO(tgross): what do we need to do to validate these? + return nil +} + +func (cr *CustomResource) Equal(or *CustomResource) bool { + if cr.Name != or.Name || + cr.Quantity != or.Quantity || + cr.Type != or.Type || + cr.Scope != or.Scope || + cr.Range != or.Range || + len(cr.Items) != len(or.Items) || + len(cr.Constraints) != len(or.Constraints) || + !slices.Equal(cr.Items, or.Items) || + !maps.Equal(cr.Meta, or.Meta) || + !slices.EqualFunc(cr.Constraints, or.Constraints, func(l, r *Constraint) bool { + return l.Equal(r) + }) { + return false + } + + return true +} + +var ( + ErrInvalidCustomResourceComparison = errors.New("custom resources could not be compared") + ErrCustomResourceExhausted = errors.New("custom resources exhausted") + ErrSubtractFromNothing = errors.New("custom resource request cannot be subtracted from non-existant base") +) + +func (cr *CustomResource) Superset(other *CustomResource) (bool, string) { + + if cr == nil || other == nil { + return false, fmt.Sprintf("custom resource: %s", cr.Name) + } + err := cr.compatible(other) + if err != nil { + return false, err.Error() // not ideal, but this matches the other Comparable APIs + } + + switch cr.Type { + case CustomResourceTypeRatio: + // fallthrough: ratios always fit? + + case CustomResourceTypeCappedRatio, CustomResourceTypeCountable: + if cr.Quantity-other.Quantity < 0 { + return false, fmt.Sprintf("custom resource: %s", cr.Name) + } + + case CustomResourceTypeDynamicInstance, CustomResourceTypeStaticInstance: + items := set.From(cr.Items) + if !items.ContainsSlice(other.Items) { + return false, fmt.Sprintf("custom resource: %s", cr.Name) + } + } + + return true, "" +} + +// Add mutates the CustomResource by the delta +func (cr *CustomResource) Add(delta *CustomResource) error { + if cr == nil || delta == nil { + return nil + } + err := cr.compatible(delta) + if err != nil { + return err + } + + switch cr.Type { + case CustomResourceTypeRatio: + return nil // ratios don't sum up + + case CustomResourceTypeCappedRatio, CustomResourceTypeCountable: + cr.Quantity += delta.Quantity + + case CustomResourceTypeDynamicInstance, CustomResourceTypeStaticInstance: + items := set.From(cr.Items) + cr.Items = items.Union(set.From(delta.Items)).Slice() + if len(cr.Items) > 1 { + slices.SortFunc(cr.Items, func(a, b any) int { + switch a.(type) { + case string: + if _, ok := b.(string); !ok { + return 0 + } + return strings.Compare(a.(string), b.(string)) + case int: + if _, ok := b.(int); !ok { + return 0 + } + return cmp.Compare(a.(int), b.(int)) + } + return 0 + }) + } + } + + return nil +} + +// Subtract mutates the CustomResource by the delta +func (cr *CustomResource) Subtract(delta *CustomResource) error { + if cr == nil || delta == nil { + return nil + } + err := cr.compatible(delta) + if err != nil { + return err + } + + switch cr.Type { + case CustomResourceTypeRatio: + return nil // ratios don't sum up + + case CustomResourceTypeCappedRatio, CustomResourceTypeCountable: + quantity := cr.Quantity - delta.Quantity + cr.Quantity = max(quantity, 0) + + case CustomResourceTypeDynamicInstance, CustomResourceTypeStaticInstance: + items := set.From(cr.Items) + items.RemoveSet(set.From(delta.Items)) + cr.Items = items.Slice() + } + + return nil +} + +func (cr *CustomResource) compatible(delta *CustomResource) error { + // TODO(tgross): these are programmer errors, I think? + if cr.Name != delta.Name { + return fmt.Errorf("%w: resource names %q and %q mismatch", + ErrInvalidCustomResourceComparison, cr.Name, delta.Name) + } + if cr.Version < delta.Version && delta.Version > 0 { + // requests for version 0 (default) can be considered compatible with + // any instance of the resource + return fmt.Errorf("%w: resource request %d for %q is newer than available version %d", + ErrInvalidCustomResourceComparison, delta.Version, delta.Name, cr.Version) + } + if cr.Type != delta.Type { + return fmt.Errorf("%w: resource types %q and %q for %q are not the same", + ErrInvalidCustomResourceComparison, cr.Type, delta.Type, delta.Name) + } + if cr.Scope != delta.Scope { + return fmt.Errorf("%w: resource scopes %q and %q for %q are not the same", + ErrInvalidCustomResourceComparison, cr.Scope, delta.Scope, delta.Name) + } + + return nil +} + +// CustomResources is a convenience wrapper around a slice of CustomResources +type CustomResources []*CustomResource + +func (cr CustomResources) Copy() CustomResources { + return helper.CopySlice(cr) +} + +func (cr *CustomResources) Select(available CustomResources) error { +NEXT: + for _, r := range *cr { + for _, base := range available { + if base.Name == r.Name && base.Version == r.Version { + if r.Type != CustomResourceTypeDynamicInstance { + if ok, exhausted := base.Superset(r); !ok { + return fmt.Errorf("%w: %s", ErrCustomResourceExhausted, exhausted) + } + } else { + // for dynamic instances, we need to select items for the + // quantity and mutate the Items of this CustomResource with + // the selected items + if r.Quantity > int64(len(base.Items)) { + return fmt.Errorf("%w: %s", ErrCustomResourceExhausted, r.Name) + } + + // selecting randomly by shuffling and slicing can be + // expensive, so this tries random picks similar to how we + // do port selection if we know the ratio of quantity to + // items is sparse. these values are selected by hunch, so + // this is a place for fine-tuning for sure + if len(base.Items) > 64 && r.Quantity < 8 { + r.Items = make([]any, 0, r.Quantity) + for int64(len(r.Items)) < r.Quantity { + item := base.Items[rand.Intn(len(base.Items))] + if !slices.Contains(r.Items, item) { + r.Items = append(r.Items, item) + } + // TODO(tgross): should we cap attempts? + } + } else { + baseItems := slices.Clone(base.Items) + rand.Shuffle(len(baseItems), func(i, j int) { + baseItems[i], baseItems[j] = baseItems[j], baseItems[i] + }) + items := base.Items[:r.Quantity] + r.Items = items + } + } + + continue NEXT + } + } + } + + return nil +} + +func (cr CustomResources) CopySharedOnly() CustomResources { + out := make([]*CustomResource, 0, len(cr)) + for _, r := range cr { + if r.Scope == CustomResourceScopeGroup { + out = append(out, r) + } + } + + return out +} + +func (cr CustomResources) CopyTaskOnly() CustomResources { + out := make([]*CustomResource, 0, len(cr)) + for _, r := range cr { + if r.Scope == CustomResourceScopeTask || r.Scope == "" { + out = append(out, r) + } + } + + return out +} + +// Merge combines other custom resources, overwriting the resources with +// matching names and versions +func (cr *CustomResources) Merge(other CustomResources) { + out := *cr + +NEXT: + for _, ocr := range other { + for i, base := range *cr { + if ocr.Name == base.Name && ocr.Version == base.Version { + out[i] = ocr + continue NEXT + } + } + out = append(out, ocr) + } + + *cr = out +} + +func (cr *CustomResources) Add(delta *CustomResources) error { + if cr == nil || delta == nil { + return nil + } + + // it's possible to get an error after we've mutated one *CR in the slice, + // so copy to make the function succeed entirely or not at all + out := cr.Copy() + seen := []int{} +NEXT: + for _, r := range out { + for i, o := range *delta { + if r.Name == o.Name { + seen = append(seen, i) + err := r.Add(o) + if err != nil { + return err + } + continue NEXT + } + } + } + for i, o := range *delta { + if !slices.Contains(seen, i) { + out = append(out, o) + } + } + *cr = out + return nil +} + +func (cr CustomResources) Superset(other CustomResources) (bool, string) { + if other == nil { + return true, "" + } + if cr == nil { + return false, ErrInvalidCustomResourceComparison.Error() + } + for _, ocr := range other { + for _, base := range cr { + if ocr.Name != base.Name { + continue + } + if ok, name := base.Superset(ocr); !ok { + return false, name + } + } + } + + return true, "" +} + +func (cr *CustomResources) Subtract(delta *CustomResources) error { + if cr == nil || delta == nil { + return nil + } + + // it's possible to get an error after we've mutated one *CR in the slice, + // so copy to make the function succeed entirely or not at all + out := cr.Copy() + seen := []int{} +NEXT: + for _, r := range out { + for i, o := range *delta { + if r.Name == o.Name { + seen = append(seen, i) + err := r.Subtract(o) + if err != nil { + return err + } + continue NEXT + } + } + } + for i := range *delta { + if !slices.Contains(seen, i) { + return ErrSubtractFromNothing + } + } + *cr = out + return nil +} diff --git a/nomad/structs/resources_test.go b/nomad/structs/resources_test.go new file mode 100644 index 00000000000..d7c796a0a7d --- /dev/null +++ b/nomad/structs/resources_test.go @@ -0,0 +1,323 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package structs + +import ( + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test" + "github.com/shoenig/test/must" +) + +func TestCustomResourcesMath(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + have *CustomResource + delta *CustomResource + expect *CustomResource + method func(*CustomResource, *CustomResource) error + expectErr string + }{ + { + name: "incompatible", + have: &CustomResource{Name: "foo"}, + delta: &CustomResource{Name: "bar"}, + method: (*CustomResource).Add, + expectErr: `custom resources could not be compared: resource names "foo" and "bar" mismatch`, + }, + { + name: "countable add", + have: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 100_000, + }, + delta: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 10_000, + }, + method: (*CustomResource).Add, + expect: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 110_000, + }, + }, + { + name: "countable subtract floor", + have: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 10_000, + }, + delta: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 20_000, + }, + method: (*CustomResource).Subtract, + expect: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 0, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + got := tc.have.Copy() + delta := tc.delta.Copy() + err := tc.method(got, delta) + + if tc.expectErr == "" { + must.NoError(t, err) + must.Eq(t, tc.expect, got) + } else { + must.EqError(t, err, tc.expectErr) + must.Eq(t, got, tc.have, must.Sprint("expected unchanged on error")) + } + must.Eq(t, tc.delta, delta, must.Sprint("expected delta to be unchanged")) + }) + } + +} + +func TestCustomResourcesSuperset(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + have *CustomResource + want *CustomResource + expectOk bool + expectMsg string + }{ + { + name: "incompatible", + have: &CustomResource{Name: "foo"}, + want: &CustomResource{Name: "bar"}, + expectMsg: `custom resources could not be compared: resource names "foo" and "bar" mismatch`, + }, + { + name: "countable ok", + have: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 100_000, + }, + want: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 10_000, + }, + expectOk: true, + }, + { + name: "countable exhausted", + have: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 100_000, + }, + want: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 200_000, + }, + expectMsg: "custom resource: disk", + }, + { + name: "dynamic ok", + have: &CustomResource{ + Name: "ports", + Type: CustomResourceTypeDynamicInstance, + Items: []any{8001, 8002, 8003}, + }, + want: &CustomResource{ + Name: "ports", + Type: CustomResourceTypeDynamicInstance, + Items: []any{8002}, + }, + expectOk: true, + }, + { + name: "dynamic exhausted", + have: &CustomResource{ + Name: "ports", + Type: CustomResourceTypeDynamicInstance, + Items: []any{8001, 8002, 8003}, + }, + want: &CustomResource{ + Name: "ports", + Type: CustomResourceTypeDynamicInstance, + Items: []any{8006}, + }, + expectMsg: "custom resource: ports", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ok, msg := tc.have.Superset(tc.want) + test.Eq(t, tc.expectOk, ok) + test.Eq(t, tc.expectMsg, msg) + }) + } + +} + +func TestCustomResourcesMerge(t *testing.T) { + ci.Parallel(t) + + base := CustomResources{ + { // should be updated + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Range: "1-3", + Items: []any{1, 2, 3}, + }, + { // different version, should be ignored + Name: "bar_countable", + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }, + { // should be ignored + Name: "quuz_ratio", + Type: CustomResourceTypeRatio, + Scope: CustomResourceScopeTask, + Quantity: 100, + }, + } + + other := CustomResources{ + { // should update + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Range: "2-4,7-8", + Items: []any{2, 3, 4, 7, 8}, + }, + { // different version, should be added + Name: "bar_countable", + Version: 2, + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 20_000, + }, + { // new resource, should be added + Name: "baz_static", + Type: CustomResourceTypeStaticInstance, + Scope: CustomResourceScopeGroup, + Items: []any{10, 20, 30}, + }, + } + + got := base.Copy() + got.Merge(other) + must.Eq(t, + CustomResources{ + { + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Range: "2-4,7-8", + Items: []any{2, 3, 4, 7, 8}, + }, + { + Name: "bar_countable", + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }, + { + Name: "quuz_ratio", + Type: CustomResourceTypeRatio, + Scope: CustomResourceScopeTask, + Quantity: 100, + }, + { + Name: "bar_countable", + Version: 2, + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 20_000, + }, + { + Name: "baz_static", + Type: CustomResourceTypeStaticInstance, + Scope: CustomResourceScopeGroup, + Items: []any{10, 20, 30}, + }, + }, + got) + +} + +func TestCustomResources_Select(t *testing.T) { + + ci.Parallel(t) + + available := CustomResources{ + { + Name: "foo_countable", + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }, + { + Name: "bar_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Range: "1-10", + Items: []any{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + }, + } + + request := CustomResources{ + { + Name: "foo_countable", + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 0, + }, + { + Name: "bar_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Quantity: 0, + }, + } + + request[0].Quantity = 10 + request[1].Quantity = 20 + err := request.Select(available) + must.EqError(t, err, "custom resources exhausted: bar_dynamic") + + request[0].Quantity = 10 + request[1].Quantity = 4 + err = request.Select(available) + must.NoError(t, err) + must.Len(t, 4, request[1].Items) + + available[1].Items = []any{} + for i := range 100 { + available[1].Items = append(available[1].Items, i) + } + err = request.Select(available) + must.NoError(t, err) + must.Len(t, 4, request[1].Items) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 615ea398d55..89206d5b8cc 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2386,6 +2386,7 @@ type Resources struct { Devices ResourceDevices NUMA *NUMA SecretsMB int + Custom CustomResources `hcl:"custom,block"` } const ( @@ -2480,6 +2481,13 @@ func (r *Resources) Validate() error { mErr.Errors = append(mErr.Errors, fmt.Errorf("SecretsMB value (%d) cannot be negative", r.SecretsMB)) } + for _, resource := range r.Custom { + err := resource.Validate() + if err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() } @@ -2510,6 +2518,10 @@ func (r *Resources) Merge(other *Resources) { if other.SecretsMB != 0 { r.SecretsMB = other.SecretsMB } + if len(other.Custom) != 0 { + // TODO(tgross): this looks wrong but matches devices behavior + r.Custom = other.Custom + } } // Equal Resources. @@ -2530,7 +2542,11 @@ func (r *Resources) Equal(o *Resources) bool { r.IOPS == o.IOPS && r.Networks.Equal(&o.Networks) && r.Devices.Equal(&o.Devices) && - r.SecretsMB == o.SecretsMB + r.SecretsMB == o.SecretsMB && + slices.EqualFunc(r.Custom, o.Custom, + func(l, r *CustomResource) bool { + return l.Equal(r) + }) } // ResourceDevices are part of Resources. @@ -2619,6 +2635,7 @@ func (r *Resources) Copy() *Resources { if r == nil { return nil } + return &Resources{ CPU: r.CPU, Cores: r.Cores, @@ -2630,6 +2647,7 @@ func (r *Resources) Copy() *Resources { Devices: r.Devices.Copy(), NUMA: r.NUMA.Copy(), SecretsMB: r.SecretsMB, + Custom: helper.CopySlice(r.Custom), } } @@ -3188,6 +3206,12 @@ type NodeResources struct { // to select dynamic ports from across all networks. MinDynamicPort int MaxDynamicPort int + + // Custom defines any custom resources this node advertises + // + // TODO(tgross): we may want to split this out to a CustomNodeResources if + // we want more ability to define a schema for the resource request? + Custom CustomResources } func (n *NodeResources) Copy() *NodeResources { @@ -3216,6 +3240,8 @@ func (n *NodeResources) Copy() *NodeResources { } } + newN.Custom = n.Custom.Copy() + // COMPAT remove in 1.10+ // apply compatibility fixups covering node topology newN.Compatibility() @@ -3245,9 +3271,11 @@ func (n *NodeResources) Comparable() *ComparableResources { MemoryMB: n.Memory.MemoryMB, }, Networks: n.Networks, + Custom: n.Custom.CopyTaskOnly(), }, Shared: AllocatedSharedResources{ DiskMB: n.Disk.DiskMB, + Custom: n.Custom.CopySharedOnly(), }, } return c @@ -3280,6 +3308,8 @@ func (n *NodeResources) Merge(o *NodeResources) { } } + n.Custom.Merge(o.Custom) + // COMPAT remove in 1.10+ // apply compatibility fixups covering node topology n.Compatibility() @@ -3887,6 +3917,7 @@ type AllocatedTaskResources struct { Memory AllocatedMemoryResources Networks Networks Devices []*AllocatedDeviceResource + Custom CustomResources } func (a *AllocatedTaskResources) Copy() *AllocatedTaskResources { @@ -3908,6 +3939,8 @@ func (a *AllocatedTaskResources) Copy() *AllocatedTaskResources { } } + newA.Custom = a.Custom.Copy() + return newA } @@ -3943,6 +3976,8 @@ func (a *AllocatedTaskResources) Add(delta *AllocatedTaskResources) { a.Devices[idx].Add(d) } } + + _ = a.Custom.Add(&delta.Custom) } func (a *AllocatedTaskResources) Max(other *AllocatedTaskResources) { @@ -3972,6 +4007,9 @@ func (a *AllocatedTaskResources) Max(other *AllocatedTaskResources) { a.Devices[idx].Add(d) } } + + // TODO(tgross): what does this even mean here? + // a.CustomResources.Max(delta.CustomResources) } // Comparable turns AllocatedTaskResources into ComparableResources @@ -4002,6 +4040,7 @@ func (a *AllocatedTaskResources) Subtract(delta *AllocatedTaskResources) { a.Cpu.Subtract(&delta.Cpu) a.Memory.Subtract(&delta.Memory) + _ = a.Custom.Subtract(&delta.Custom) } // AllocatedSharedResources are the set of resources allocated to a task group. @@ -4009,6 +4048,7 @@ type AllocatedSharedResources struct { Networks Networks DiskMB int64 Ports AllocatedPorts + Custom CustomResources } func (a AllocatedSharedResources) Copy() AllocatedSharedResources { @@ -4016,6 +4056,7 @@ func (a AllocatedSharedResources) Copy() AllocatedSharedResources { Networks: a.Networks.Copy(), DiskMB: a.DiskMB, Ports: a.Ports, + Custom: a.Custom.Copy(), } } @@ -4025,7 +4066,7 @@ func (a *AllocatedSharedResources) Add(delta *AllocatedSharedResources) { } a.Networks = append(a.Networks, delta.Networks...) a.DiskMB += delta.DiskMB - + _ = a.Custom.Add(&delta.Custom) } func (a *AllocatedSharedResources) Subtract(delta *AllocatedSharedResources) { @@ -4045,6 +4086,7 @@ func (a *AllocatedSharedResources) Subtract(delta *AllocatedSharedResources) { } a.Networks = nets a.DiskMB -= delta.DiskMB + _ = a.Custom.Subtract(&delta.Custom) } func (a *AllocatedSharedResources) Canonicalize() { @@ -4276,6 +4318,16 @@ func (c *ComparableResources) Superset(other *ComparableResources) (bool, string if c.Shared.DiskMB < other.Shared.DiskMB { return false, "disk" } + + ok, exhaustedResource := c.Flattened.Custom.Superset(other.Flattened.Custom) + if !ok { + return false, exhaustedResource + } + ok, exhaustedResource = c.Shared.Custom.Superset(other.Shared.Custom) + if !ok { + return false, exhaustedResource + } + return true, "" } diff --git a/scheduler/feasible/custom_resources.go b/scheduler/feasible/custom_resources.go new file mode 100644 index 00000000000..c611c308837 --- /dev/null +++ b/scheduler/feasible/custom_resources.go @@ -0,0 +1,26 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package feasible + +import "github.com/hashicorp/nomad/nomad/structs" + +type customResourceChecker struct { + ask *structs.CustomResources + proposed *structs.CustomResources + available *structs.CustomResources +} + +func (crc *customResourceChecker) addProposed(proposed []*structs.Allocation) { + for _, alloc := range proposed { + for _, task := range alloc.AllocatedResources.Tasks { + proposedResources := []*structs.CustomResource(*crc.proposed) + proposedResources = append(proposedResources, task.Custom...) + } + } +} + +func (crc *customResourceChecker) Select(ask *structs.CustomResources) error { + crc.available.Subtract(crc.proposed) + return ask.Select(*crc.available) +} diff --git a/scheduler/feasible/rank.go b/scheduler/feasible/rank.go index e2c94161961..49720ab8ad2 100644 --- a/scheduler/feasible/rank.go +++ b/scheduler/feasible/rank.go @@ -8,6 +8,7 @@ import ( "math" "slices" + "github.com/davecgh/go-spew/spew" "github.com/hashicorp/go-set/v3" "github.com/hashicorp/nomad/client/lib/idset" "github.com/hashicorp/nomad/client/lib/numalib/hw" @@ -258,6 +259,12 @@ NEXTNODE: devAllocator := newDeviceAllocator(iter.ctx, option.Node) devAllocator.AddAllocs(proposed) + customAllocator := customResourceChecker{ + available: &option.Node.NodeResources.Custom, + } + + customAllocator.addProposed(proposed) + // Track the affinities of the devices totalDeviceAffinityWeight := 0.0 sumMatchingAffinities := 0.0 @@ -379,7 +386,21 @@ NEXTNODE: MemoryMB: safemath.Add( int64(task.Resources.MemoryMB), int64(task.Resources.SecretsMB)), }, + // TODO(tgross): how do we populate the AllocatedSharedResources here? + Custom: task.Resources.Custom.CopyTaskOnly(), + } + + if len(taskResources.Custom) > 0 { + // TODO(tgross): probably need to check option.AllocResources for group scope too? + spew.Dump(option) + err := customAllocator.Select(&taskResources.Custom) + + if err != nil { + iter.ctx.Metrics().ExhaustedNode(option.Node, err.Error()) + continue NEXTNODE + } } + if iter.memoryOversubscription { taskResources.Memory.MemoryMaxMB = safemath.Add( int64(task.Resources.MemoryMaxMB), int64(task.Resources.SecretsMB))