diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index fcdb176a8c..7d26024627 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -39,7 +39,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { // Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - subscriptions[typ] = stream.NewDeltaSubscription(nil, nil, nil) + subscriptions[typ] = stream.NewDeltaSubscription(nil, nil, nil, true) _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", @@ -120,7 +120,7 @@ func TestDeltaRemoveResources(t *testing.T) { for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - sub := stream.NewDeltaSubscription(nil, nil, nil) + sub := stream.NewDeltaSubscription(nil, nil, nil, true) subscriptions[typ] = &sub // We don't specify any resource name subscriptions here because we want to make sure we test wildcard // functionality. This means we should receive all resources back without requesting a subscription by name. @@ -210,7 +210,7 @@ func TestConcurrentSetDeltaWatch(t *testing.T) { }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: []string{clusterName}, - }, stream.NewDeltaSubscription([]string{clusterName}, nil, nil), responses) + }, stream.NewDeltaSubscription([]string{clusterName}, nil, nil, true), responses) require.NoError(t, err) defer cancel() @@ -227,7 +227,7 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.DeltaResponse) - sub := stream.NewDeltaSubscription(names[rsrc.EndpointType], nil, nil) + sub := stream.NewDeltaSubscription(names[rsrc.EndpointType], nil, nil, true) _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, @@ -277,7 +277,7 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) { }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, stream.NewDeltaSubscription(names[typ], nil, nil), responses) + }, stream.NewDeltaSubscription(names[typ], nil, nil, true), responses) require.NoError(t, err) // Cancel the watch diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 50a1a96df1..9e8b57367e 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -223,7 +223,7 @@ func hashResource(t *testing.T, resource types.Resource) string { func createWildcardDeltaWatch(t *testing.T, initialReq bool, c *LinearCache, w chan DeltaResponse) { t.Helper() - sub := stream.NewDeltaSubscription(nil, nil, nil) + sub := stream.NewDeltaSubscription(nil, nil, nil, true) req := &DeltaRequest{TypeUrl: testType} if !initialReq { req.ResponseNonce = "1" @@ -237,7 +237,7 @@ func createWildcardDeltaWatch(t *testing.T, initialReq bool, c *LinearCache, w c } func subFromRequest(req *Request) stream.Subscription { - return stream.NewSotwSubscription(req.GetResourceNames()) + return stream.NewSotwSubscription(req.GetResourceNames(), true) } // This method represents the expected behavior of client and servers regarding the request and the subscription. @@ -250,7 +250,7 @@ func updateFromSotwResponse(resp Response, sub *stream.Subscription, req *Reques } func subFromDeltaRequest(req *DeltaRequest) stream.Subscription { - return stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions()) + return stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions(), true) } func TestLinearInitialResources(t *testing.T) { diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index 7d1ebcc364..a1b742c6cd 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -54,7 +54,7 @@ func (group) ID(node *core.Node) string { } func subFromRequest(req *cache.Request) stream.Subscription { - return stream.NewSotwSubscription(req.GetResourceNames()) + return stream.NewSotwSubscription(req.GetResourceNames(), true) } // This method represents the expected behavior of client and servers regarding the request and the subscription. @@ -601,7 +601,7 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) { ResourceNames: []string{"rtds"}, TypeUrl: rsrc.RuntimeType, } - ss := stream.NewSotwSubscription([]string{"rtds"}) + ss := stream.NewSotwSubscription([]string{"rtds"}, true) ss.SetReturnedResources(map[string]string{"cluster": "abcdef"}) responder := make(chan cache.Response) _, err := c.CreateWatch(req, ss, responder) diff --git a/pkg/server/config/config.go b/pkg/server/config/config.go index 8dd31a2923..1ae129ed13 100644 --- a/pkg/server/config/config.go +++ b/pkg/server/config/config.go @@ -9,6 +9,12 @@ type Opts struct { Ordered bool Logger log.Logger + + // If true, deactivate legacy wildcard mode for all resource types + legacyWildcardDeactivated bool + + // Deactivate legacy wildcard mode for specific resource types + legacyWildcardDeactivatedTypes map[string]struct{} } func NewOpts() Opts { @@ -18,6 +24,20 @@ func NewOpts() Opts { } } +// IsLegacyWildcardActive returns whether legacy wildcard mode is active for the given resource type. +// Returns true if legacy wildcard mode is active, false if it has been deactivated. +func (o Opts) IsLegacyWildcardActive(typeURL string) bool { + if o.legacyWildcardDeactivated { + return false + } + if len(o.legacyWildcardDeactivatedTypes) > 0 { + if _, found := o.legacyWildcardDeactivatedTypes[typeURL]; found { + return false + } + } + return true +} + // Each xDS implementation should implement their own functional opts. // It is recommended that config values be added in this package specifically, // but the individual opts functions should be in their respective @@ -28,3 +48,19 @@ func NewOpts() Opts { // // this allows for easy inference as to which opt applies to what implementation. type XDSOption func(*Opts) + +func DeactivateLegacyWildcard() XDSOption { + return func(o *Opts) { + o.legacyWildcardDeactivated = true + } +} + +func DeactivateLegacyWildcardForTypes(types []string) XDSOption { + return func(o *Opts) { + typeMap := make(map[string]struct{}, len(types)) + for _, t := range types { + typeMap[t] = struct{}{} + } + o.legacyWildcardDeactivatedTypes = typeMap + } +} diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 9da050d3d0..3b7c6b977a 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -57,6 +57,24 @@ func WithLogger(logger log.Logger) config.XDSOption { } } +// DeactivateLegacyWildcard deactivates legacy wildcard mode for all resource types. +// In legacy wildcard mode, empty requests to a stream, are treated as wildcard requests as long +// as there is no request made with resources or explicit wildcard requests on the same stream. +// When deactivated, empty requests are treated as a request with no subscriptions to any resource. +// This is recommended for when you are using the go-control-plane to serve grpc-xds clients. +// These clients never want to treat an empty request as a wildcard subscription. +func DeactivateLegacyWildcard() config.XDSOption { + return config.DeactivateLegacyWildcard() +} + +// DeactivateLegacyWildcardForTypes deactivates legacy wildcard mode for specific resource types. +// In legacy wildcard mode, empty requests to a stream, are treated as wildcard requests as long +// as there is no request made with resources or explicit wildcard requests on the same stream. +// When deactivated, empty requests are treated as a request with no subscriptions to any resource. +func DeactivateLegacyWildcardForTypes(types []string) config.XDSOption { + return config.DeactivateLegacyWildcardForTypes(types) +} + // NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks. func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server { s := &server{ @@ -218,7 +236,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De // We also set the subscription as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe). // If the subscription starts with this legacy mode, adding new resources will not unsubscribe from wildcard. // It can still be done by explicitly unsubscribing from "*" - watch.subscription = stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions()) + watch.subscription = stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions(), s.opts.IsLegacyWildcardActive(typeURL)) } else { watch.Cancel() diff --git a/pkg/server/sotw/v3/ads.go b/pkg/server/sotw/v3/ads.go index 23992a6d8f..705a5a0ec2 100644 --- a/pkg/server/sotw/v3/ads.go +++ b/pkg/server/sotw/v3/ads.go @@ -107,7 +107,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe subscription.SetResourceSubscription(req.GetResourceNames()) } else { s.opts.Logger.Debugf("[sotw ads] New subscription for type %s and stream %d", typeURL, sw.ID) - subscription = stream.NewSotwSubscription(req.GetResourceNames()) + subscription = stream.NewSotwSubscription(req.GetResourceNames(), s.opts.IsLegacyWildcardActive(typeURL)) } cancel, err := s.cache.CreateWatch(req, subscription, respChan) diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index ec214fba6c..3fa992c65d 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -74,6 +74,24 @@ func WithLogger(logger log.Logger) config.XDSOption { } } +// DeactivateLegacyWildcard deactivates legacy wildcard mode for all resource types. +// In legacy wildcard mode, empty requests to a stream, are treated as wildcard requests as long +// as there is no request made with resources or explicit wildcard requests on the same stream. +// When deactivated, empty requests are treated as a request with no subscriptions to any resource. +// This is recommended for when you are using the go-control-plane to serve grpc-xds clients. +// These clients never want to treat an empty request as a wildcard subscription. +func DeactivateLegacyWildcard() config.XDSOption { + return config.DeactivateLegacyWildcard() +} + +// DeactivateLegacyWildcardForTypes deactivates legacy wildcard mode for specific resource types. +// In legacy wildcard mode, empty requests to a stream, are treated as wildcard requests as long +// as there is no request made with resources or explicit wildcard requests on the same stream. +// When deactivated, empty requests are treated as a request with no subscriptions to any resource. +func DeactivateLegacyWildcardForTypes(types []string) config.XDSOption { + return config.DeactivateLegacyWildcardForTypes(types) +} + type server struct { cache cache.ConfigWatcher callbacks Callbacks diff --git a/pkg/server/sotw/v3/xds.go b/pkg/server/sotw/v3/xds.go index acbcf32826..c5704a7caa 100644 --- a/pkg/server/sotw/v3/xds.go +++ b/pkg/server/sotw/v3/xds.go @@ -128,7 +128,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque subscription.SetResourceSubscription(req.GetResourceNames()) } else { s.opts.Logger.Debugf("[sotw] New subscription for type %s and stream %d", typeURL, sw.ID) - subscription = stream.NewSotwSubscription(req.GetResourceNames()) + subscription = stream.NewSotwSubscription(req.GetResourceNames(), s.opts.IsLegacyWildcardActive(typeURL)) } responder := make(chan cache.Response, 1) diff --git a/pkg/server/stream/v3/subscription.go b/pkg/server/stream/v3/subscription.go index 964ea8b44b..7aebfe4a70 100644 --- a/pkg/server/stream/v3/subscription.go +++ b/pkg/server/stream/v3/subscription.go @@ -24,10 +24,17 @@ type Subscription struct { } // newSubscription initializes a subscription state. -func newSubscription(wildcard bool, initialResourceVersions map[string]string) Subscription { +func newSubscription(emptyRequest, allowLegacyWildcard bool, initialResourceVersions map[string]string) Subscription { + // By default we set the subscription as a wildcard only if the request was empty + // and in legacy mode. Later on, outside of this constructor, when we actually + // process the request, if the request was non-empty, it may have an + // explicit wildcard subscription, in which case + // we will set the wildcard field on the subscription accordingly. + wildcard := emptyRequest && allowLegacyWildcard + state := Subscription{ wildcard: wildcard, - allowLegacyWildcard: wildcard, + allowLegacyWildcard: allowLegacyWildcard, subscribedResourceNames: map[string]struct{}{}, returnedResources: initialResourceVersions, } @@ -39,8 +46,8 @@ func newSubscription(wildcard bool, initialResourceVersions map[string]string) S return state } -func NewSotwSubscription(subscribed []string) Subscription { - sub := newSubscription(len(subscribed) == 0, nil) +func NewSotwSubscription(subscribed []string, allowLegacyWildcard bool) Subscription { + sub := newSubscription(len(subscribed) == 0, allowLegacyWildcard, nil) sub.SetResourceSubscription(subscribed) return sub } @@ -90,8 +97,8 @@ func (s *Subscription) SetResourceSubscription(subscribed []string) { s.subscribedResourceNames = subscribedResources } -func NewDeltaSubscription(subscribed, unsubscribed []string, initialResourceVersions map[string]string) Subscription { - sub := newSubscription(len(subscribed) == 0, initialResourceVersions) +func NewDeltaSubscription(subscribed, unsubscribed []string, initialResourceVersions map[string]string, allowLegacyWildcard bool) Subscription { + sub := newSubscription(len(subscribed) == 0, allowLegacyWildcard, initialResourceVersions) sub.UpdateResourceSubscriptions(subscribed, unsubscribed) return sub } diff --git a/pkg/server/stream/v3/subscription_test.go b/pkg/server/stream/v3/subscription_test.go index 23e29ca7c6..1ae01a08e7 100644 --- a/pkg/server/stream/v3/subscription_test.go +++ b/pkg/server/stream/v3/subscription_test.go @@ -4,11 +4,13 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/envoyproxy/go-control-plane/pkg/server/config" ) func TestSotwSubscriptions(t *testing.T) { t.Run("legacy mode properly handled", func(t *testing.T) { - sub := NewSotwSubscription([]string{}) + sub := NewSotwSubscription([]string{}, true) assert.True(t, sub.IsWildcard()) // Requests always set empty in legacy mode @@ -35,7 +37,7 @@ func TestSotwSubscriptions(t *testing.T) { t.Run("new wildcard mode from start", func(t *testing.T) { // A resource is provided so the subscription was created in wildcard - sub := NewSotwSubscription([]string{"*"}) + sub := NewSotwSubscription([]string{"*"}, true) assert.True(t, sub.IsWildcard()) assert.Empty(t, sub.SubscribedResources()) @@ -73,7 +75,7 @@ func TestSotwSubscriptions(t *testing.T) { func TestDeltaSubscriptions(t *testing.T) { t.Run("legacy mode properly handled", func(t *testing.T) { - sub := NewDeltaSubscription([]string{}, []string{}, map[string]string{"resource": "version"}) + sub := NewDeltaSubscription([]string{}, []string{}, map[string]string{"resource": "version"}, true) assert.True(t, sub.IsWildcard()) assert.Empty(t, sub.SubscribedResources()) assert.Equal(t, map[string]string{"resource": "version"}, sub.ReturnedResources()) @@ -102,7 +104,7 @@ func TestDeltaSubscriptions(t *testing.T) { t.Run("new wildcard mode", func(t *testing.T) { // A resource is provided so the subscription was created in wildcard - sub := NewDeltaSubscription([]string{"*"}, []string{}, map[string]string{"resource": "version"}) + sub := NewDeltaSubscription([]string{"*"}, []string{}, map[string]string{"resource": "version"}, true) assert.True(t, sub.IsWildcard()) assert.Empty(t, sub.SubscribedResources()) @@ -157,3 +159,112 @@ func TestDeltaSubscriptions(t *testing.T) { assert.Equal(t, map[string]struct{}{"resource": {}}, sub.SubscribedResources()) }) } + +func TestSotwSubscriptionsWithDeactivatedLegacyWildcard(t *testing.T) { + t.Run("deactivate for all types", func(t *testing.T) { + opts := config.NewOpts() + deactivateOpt := config.DeactivateLegacyWildcard() + deactivateOpt(&opts) + + typeURL := "type.googleapis.com/envoy.config.cluster.v3.Cluster" + // Create subscription with empty resource list (would normally be legacy wildcard) + sub := NewSotwSubscription([]string{}, opts.IsLegacyWildcardActive(typeURL)) + + // With deactivated legacy wildcard, subscription should NOT be wildcard initially + // because allowLegacyWildcard=false means empty list doesn't trigger legacy behavior + assert.False(t, sub.IsWildcard()) + + // Set empty resources - should remain non-wildcard + sub.SetResourceSubscription([]string{}) + assert.False(t, sub.IsWildcard()) + + // Can still explicitly subscribe to wildcard + sub.SetResourceSubscription([]string{"*"}) + assert.True(t, sub.IsWildcard()) + }) +} + +func TestSotwSubscriptionsWithDeactivatedLegacyWildcardForTypes(t *testing.T) { + t.Run("deactivate for multiple types", func(t *testing.T) { + opts := config.NewOpts() + clusterType := "type.googleapis.com/envoy.config.cluster.v3.Cluster" + endpointType := "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment" + routeType := "type.googleapis.com/envoy.config.route.v3.RouteConfiguration" + + deactivateOpt := config.DeactivateLegacyWildcardForTypes([]string{clusterType, endpointType}) + deactivateOpt(&opts) + + // Both cluster and endpoint should have legacy wildcard deactivated + subCluster := NewSotwSubscription([]string{}, opts.IsLegacyWildcardActive(clusterType)) + subCluster.SetResourceSubscription([]string{}) + assert.False(t, subCluster.IsWildcard()) + + subEndpoint := NewSotwSubscription([]string{}, opts.IsLegacyWildcardActive(endpointType)) + subEndpoint.SetResourceSubscription([]string{}) + assert.False(t, subEndpoint.IsWildcard()) + + // Can still explicitly subscribe to wildcard + subEndpoint.SetResourceSubscription([]string{"*"}) + assert.True(t, subEndpoint.IsWildcard()) + + // Route should still have legacy wildcard enabled + subRoute := NewSotwSubscription([]string{}, opts.IsLegacyWildcardActive(routeType)) + subRoute.SetResourceSubscription([]string{}) + assert.True(t, subRoute.IsWildcard()) + }) +} + +func TestDeltaSubscriptionsWithDeactivatedLegacyWildcard(t *testing.T) { + t.Run("deactivate for all types", func(t *testing.T) { + opts := config.NewOpts() + deactivateOpt := config.DeactivateLegacyWildcard() + deactivateOpt(&opts) + + typeURL := "type.googleapis.com/envoy.config.cluster.v3.Cluster" + // Create subscription with empty resource list (would normally be legacy wildcard) + sub := NewDeltaSubscription([]string{}, []string{}, map[string]string{"resource": "version"}, opts.IsLegacyWildcardActive(typeURL)) + + // With deactivated legacy wildcard, subscription should NOT be wildcard initially + assert.False(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + + // New request with no additional subscription + sub.UpdateResourceSubscriptions(nil, nil) + assert.False(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + + // Can still explicitly subscribe to wildcard + sub.UpdateResourceSubscriptions([]string{"*"}, nil) + assert.True(t, sub.IsWildcard()) + }) +} + +func TestDeltaSubscriptionsWithDeactivatedLegacyWildcardForTypes(t *testing.T) { + t.Run("deactivate for multiple types", func(t *testing.T) { + opts := config.NewOpts() + clusterType := "type.googleapis.com/envoy.config.cluster.v3.Cluster" + endpointType := "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment" + routeType := "type.googleapis.com/envoy.config.route.v3.RouteConfiguration" + + deactivateOpt := config.DeactivateLegacyWildcardForTypes([]string{clusterType, endpointType}) + deactivateOpt(&opts) + + // Both cluster and endpoint should have legacy wildcard deactivated + subCluster := NewDeltaSubscription([]string{}, []string{}, map[string]string{}, opts.IsLegacyWildcardActive(clusterType)) + subCluster.UpdateResourceSubscriptions(nil, nil) + assert.False(t, subCluster.IsWildcard()) + + subEndpoint := NewDeltaSubscription([]string{}, []string{}, map[string]string{}, opts.IsLegacyWildcardActive(endpointType)) + subEndpoint.UpdateResourceSubscriptions(nil, nil) + assert.False(t, subEndpoint.IsWildcard()) + + // Can still explicitly subscribe to wildcard + subEndpoint.UpdateResourceSubscriptions([]string{"*"}, nil) + assert.True(t, subEndpoint.IsWildcard()) + + // Route should still have legacy wildcard enabled + subRoute := NewDeltaSubscription([]string{}, []string{}, map[string]string{}, opts.IsLegacyWildcardActive(routeType)) + subRoute.UpdateResourceSubscriptions(nil, nil) + assert.True(t, subRoute.IsWildcard()) + }) +}