Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
subscriptions[typ] = stream.NewDeltaSubscription(nil, nil, nil)
subscriptions[typ] = stream.NewDeltaSubscription(nil, nil, nil, true)
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestDeltaRemoveResources(t *testing.T) {

for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
sub := stream.NewDeltaSubscription(nil, nil, nil)
sub := stream.NewDeltaSubscription(nil, nil, nil, true)
subscriptions[typ] = &sub
// We don't specify any resource name subscriptions here because we want to make sure we test wildcard
// functionality. This means we should receive all resources back without requesting a subscription by name.
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: []string{clusterName},
}, stream.NewDeltaSubscription([]string{clusterName}, nil, nil), responses)
}, stream.NewDeltaSubscription([]string{clusterName}, nil, nil, true), responses)

require.NoError(t, err)
defer cancel()
Expand All @@ -227,7 +227,7 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {

// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
sub := stream.NewDeltaSubscription(names[rsrc.EndpointType], nil, nil)
sub := stream.NewDeltaSubscription(names[rsrc.EndpointType], nil, nil, true)
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
Expand Down Expand Up @@ -277,7 +277,7 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewDeltaSubscription(names[typ], nil, nil), responses)
}, stream.NewDeltaSubscription(names[typ], nil, nil, true), responses)
require.NoError(t, err)

// Cancel the watch
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func hashResource(t *testing.T, resource types.Resource) string {

func createWildcardDeltaWatch(t *testing.T, initialReq bool, c *LinearCache, w chan DeltaResponse) {
t.Helper()
sub := stream.NewDeltaSubscription(nil, nil, nil)
sub := stream.NewDeltaSubscription(nil, nil, nil, true)
req := &DeltaRequest{TypeUrl: testType}
if !initialReq {
req.ResponseNonce = "1"
Expand All @@ -237,7 +237,7 @@ func createWildcardDeltaWatch(t *testing.T, initialReq bool, c *LinearCache, w c
}

func subFromRequest(req *Request) stream.Subscription {
return stream.NewSotwSubscription(req.GetResourceNames())
return stream.NewSotwSubscription(req.GetResourceNames(), true)
}

// This method represents the expected behavior of client and servers regarding the request and the subscription.
Expand All @@ -250,7 +250,7 @@ func updateFromSotwResponse(resp Response, sub *stream.Subscription, req *Reques
}

func subFromDeltaRequest(req *DeltaRequest) stream.Subscription {
return stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions())
return stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions(), true)
}

func TestLinearInitialResources(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/v3/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (group) ID(node *core.Node) string {
}

func subFromRequest(req *cache.Request) stream.Subscription {
return stream.NewSotwSubscription(req.GetResourceNames())
return stream.NewSotwSubscription(req.GetResourceNames(), true)
}

// This method represents the expected behavior of client and servers regarding the request and the subscription.
Expand Down Expand Up @@ -601,7 +601,7 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) {
ResourceNames: []string{"rtds"},
TypeUrl: rsrc.RuntimeType,
}
ss := stream.NewSotwSubscription([]string{"rtds"})
ss := stream.NewSotwSubscription([]string{"rtds"}, true)
ss.SetReturnedResources(map[string]string{"cluster": "abcdef"})
responder := make(chan cache.Response)
_, err := c.CreateWatch(req, ss, responder)
Expand Down
36 changes: 36 additions & 0 deletions pkg/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ type Opts struct {
Ordered bool

Logger log.Logger

// If true, deactivate legacy wildcard mode for all resource types
legacyWildcardDeactivated bool

// Deactivate legacy wildcard mode for specific resource types
legacyWildcardDeactivatedTypes map[string]struct{}
}

func NewOpts() Opts {
Expand All @@ -18,6 +24,20 @@ func NewOpts() Opts {
}
}

// IsLegacyWildcardActive returns whether legacy wildcard mode is active for the given resource type.
// Returns true if legacy wildcard mode is active, false if it has been deactivated.
func (o Opts) IsLegacyWildcardActive(typeURL string) bool {
if o.legacyWildcardDeactivated {
return false
}
if len(o.legacyWildcardDeactivatedTypes) > 0 {
if _, found := o.legacyWildcardDeactivatedTypes[typeURL]; found {
return false
}
}
return true
}

// Each xDS implementation should implement their own functional opts.
// It is recommended that config values be added in this package specifically,
// but the individual opts functions should be in their respective
Expand All @@ -28,3 +48,19 @@ func NewOpts() Opts {
//
// this allows for easy inference as to which opt applies to what implementation.
type XDSOption func(*Opts)

func DeactivateLegacyWildcard() XDSOption {
return func(o *Opts) {
o.legacyWildcardDeactivated = true
}
}

func DeactivateLegacyWildcardForTypes(types []string) XDSOption {
return func(o *Opts) {
typeMap := make(map[string]struct{}, len(types))
for _, t := range types {
typeMap[t] = struct{}{}
}
o.legacyWildcardDeactivatedTypes = typeMap
}
}
20 changes: 19 additions & 1 deletion pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ func WithLogger(logger log.Logger) config.XDSOption {
}
}

// DeactivateLegacyWildcard deactivates legacy wildcard mode for all resource types.
// In legacy wildcard mode, empty requests to a stream, are treated as wildcard requests as long
// as there is no request made with resources or explicit wildcard requests on the same stream.
// When deactivated, empty requests are treated as a request with no subscriptions to any resource.
// This is recommended for when you are using the go-control-plane to serve grpc-xds clients.
// These clients never want to treat an empty request as a wildcard subscription.
func DeactivateLegacyWildcard() config.XDSOption {
return config.DeactivateLegacyWildcard()
}

// DeactivateLegacyWildcardForTypes deactivates legacy wildcard mode for specific resource types.
// In legacy wildcard mode, empty requests to a stream, are treated as wildcard requests as long
// as there is no request made with resources or explicit wildcard requests on the same stream.
// When deactivated, empty requests are treated as a request with no subscriptions to any resource.
func DeactivateLegacyWildcardForTypes(types []string) config.XDSOption {
return config.DeactivateLegacyWildcardForTypes(types)
}

// NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks.
func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server {
s := &server{
Expand Down Expand Up @@ -218,7 +236,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
// We also set the subscription as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe).
// If the subscription starts with this legacy mode, adding new resources will not unsubscribe from wildcard.
// It can still be done by explicitly unsubscribing from "*"
watch.subscription = stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions())
watch.subscription = stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions(), s.opts.IsLegacyWildcardActive(typeURL))
} else {
watch.Cancel()

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/sotw/v3/ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
subscription.SetResourceSubscription(req.GetResourceNames())
} else {
s.opts.Logger.Debugf("[sotw ads] New subscription for type %s and stream %d", typeURL, sw.ID)
subscription = stream.NewSotwSubscription(req.GetResourceNames())
subscription = stream.NewSotwSubscription(req.GetResourceNames(), s.opts.IsLegacyWildcardActive(typeURL))
}

cancel, err := s.cache.CreateWatch(req, subscription, respChan)
Expand Down
18 changes: 18 additions & 0 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ func WithLogger(logger log.Logger) config.XDSOption {
}
}

// DeactivateLegacyWildcard deactivates legacy wildcard mode for all resource types.
// In legacy wildcard mode, empty requests to a stream, are treated as wildcard requests as long
// as there is no request made with resources or explicit wildcard requests on the same stream.
// When deactivated, empty requests are treated as a request with no subscriptions to any resource.
// This is recommended for when you are using the go-control-plane to serve grpc-xds clients.
// These clients never want to treat an empty request as a wildcard subscription.
func DeactivateLegacyWildcard() config.XDSOption {
return config.DeactivateLegacyWildcard()
}

// DeactivateLegacyWildcardForTypes deactivates legacy wildcard mode for specific resource types.
// In legacy wildcard mode, empty requests to a stream, are treated as wildcard requests as long
// as there is no request made with resources or explicit wildcard requests on the same stream.
// When deactivated, empty requests are treated as a request with no subscriptions to any resource.
func DeactivateLegacyWildcardForTypes(types []string) config.XDSOption {
return config.DeactivateLegacyWildcardForTypes(types)
}

type server struct {
cache cache.ConfigWatcher
callbacks Callbacks
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/sotw/v3/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
subscription.SetResourceSubscription(req.GetResourceNames())
} else {
s.opts.Logger.Debugf("[sotw] New subscription for type %s and stream %d", typeURL, sw.ID)
subscription = stream.NewSotwSubscription(req.GetResourceNames())
subscription = stream.NewSotwSubscription(req.GetResourceNames(), s.opts.IsLegacyWildcardActive(typeURL))
}

responder := make(chan cache.Response, 1)
Expand Down
19 changes: 13 additions & 6 deletions pkg/server/stream/v3/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@ type Subscription struct {
}

// newSubscription initializes a subscription state.
func newSubscription(wildcard bool, initialResourceVersions map[string]string) Subscription {
func newSubscription(emptyRequest, allowLegacyWildcard bool, initialResourceVersions map[string]string) Subscription {
// By default we set the subscription as a wildcard only if the request was empty
// and in legacy mode. Later on, outside of this constructor, when we actually
// process the request, if the request was non-empty, it may have an
// explicit wildcard subscription, in which case
// we will set the wildcard field on the subscription accordingly.
wildcard := emptyRequest && allowLegacyWildcard

state := Subscription{
wildcard: wildcard,
allowLegacyWildcard: wildcard,
allowLegacyWildcard: allowLegacyWildcard,
subscribedResourceNames: map[string]struct{}{},
returnedResources: initialResourceVersions,
}
Expand All @@ -39,8 +46,8 @@ func newSubscription(wildcard bool, initialResourceVersions map[string]string) S
return state
}

func NewSotwSubscription(subscribed []string) Subscription {
sub := newSubscription(len(subscribed) == 0, nil)
func NewSotwSubscription(subscribed []string, allowLegacyWildcard bool) Subscription {
sub := newSubscription(len(subscribed) == 0, allowLegacyWildcard, nil)
sub.SetResourceSubscription(subscribed)
return sub
}
Expand Down Expand Up @@ -90,8 +97,8 @@ func (s *Subscription) SetResourceSubscription(subscribed []string) {
s.subscribedResourceNames = subscribedResources
}

func NewDeltaSubscription(subscribed, unsubscribed []string, initialResourceVersions map[string]string) Subscription {
sub := newSubscription(len(subscribed) == 0, initialResourceVersions)
func NewDeltaSubscription(subscribed, unsubscribed []string, initialResourceVersions map[string]string, allowLegacyWildcard bool) Subscription {
sub := newSubscription(len(subscribed) == 0, allowLegacyWildcard, initialResourceVersions)
sub.UpdateResourceSubscriptions(subscribed, unsubscribed)
return sub
}
Expand Down
Loading