@@ -27,7 +27,7 @@ import (
2727 "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
2828)
2929
30- type watches = map [chan Response ]struct {}
30+ type watches = map [chan Response ]stream. StreamState
3131
3232// LinearCache supports collections of opaque resources. This cache has a
3333// single collection indexed by resource names and manages resource versions
@@ -114,24 +114,30 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
114114}
115115
116116func (cache * LinearCache ) respond (value chan Response , staleResources []string ) {
117- var resources []types.ResourceWithTTL
117+ var (
118+ resources []types.ResourceWithTTL
119+ respondResourceNames []string
120+ )
121+
118122 // TODO: optimize the resources slice creations across different clients
119123 if len (staleResources ) == 0 {
120124 resources = make ([]types.ResourceWithTTL , 0 , len (cache .resources ))
121- for _ , resource := range cache .resources {
125+ for name , resource := range cache .resources {
122126 resources = append (resources , types.ResourceWithTTL {Resource : resource })
127+ respondResourceNames = append (respondResourceNames , name )
123128 }
124129 } else {
125130 resources = make ([]types.ResourceWithTTL , 0 , len (staleResources ))
126131 for _ , name := range staleResources {
127132 resource := cache .resources [name ]
128133 if resource != nil {
129134 resources = append (resources , types.ResourceWithTTL {Resource : resource })
135+ respondResourceNames = append (respondResourceNames , name )
130136 }
131137 }
132138 }
133139 value <- & RawResponse {
134- Request : & Request {TypeUrl : cache .typeURL },
140+ Request : & Request {TypeUrl : cache .typeURL , ResourceNames : respondResourceNames },
135141 Resources : resources ,
136142 Version : cache .getVersion (),
137143 }
@@ -141,11 +147,25 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
141147 // de-duplicate watches that need to be responded
142148 notifyList := make (map [chan Response ][]string )
143149 for name := range modified {
144- for watch := range cache .watches [name ] {
145- notifyList [watch ] = append (notifyList [watch ], name )
150+ for watch , streamState := range cache .watches [name ] {
151+ resourceNames := streamState .GetKnownResourceNames (cache .typeURL )
152+ modifiedNameInResourceName := false
153+ for resourceName := range resourceNames {
154+ if ! modifiedNameInResourceName && resourceName == name {
155+ modifiedNameInResourceName = true
156+ }
157+ // To avoid the stale in notifyList becomes empty slice.
158+ // Don't skip resource name that has been deleted here.
159+ // It would be filtered out in respond because the corresponding resource has been deleted.
160+ notifyList [watch ] = append (notifyList [watch ], resourceName )
161+ }
162+ if ! modifiedNameInResourceName {
163+ notifyList [watch ] = append (notifyList [watch ], name )
164+ }
146165 }
147166 delete (cache .watches , name )
148167 }
168+
149169 for value , stale := range notifyList {
150170 cache .respond (value , stale )
151171 }
@@ -293,10 +313,16 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
293313 stale = lastVersion != cache .version
294314 } else {
295315 for _ , name := range request .ResourceNames {
316+ _ , has := streamState .GetKnownResourceNames (request .TypeUrl )[name ]
317+ version , exists := cache .versionVector [name ]
318+
296319 // When a resource is removed, its version defaults 0 and it is not considered stale.
297- if lastVersion < cache . versionVector [ name ] {
320+ if lastVersion < version || ( ! has && exists ) {
298321 stale = true
299- staleResources = append (staleResources , name )
322+
323+ // Here we collect all requested names.
324+ // It would be filtered out in respond if the resource name doesn't appear in cache.
325+ staleResources = request .ResourceNames
300326 }
301327 }
302328 }
@@ -306,7 +332,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
306332 }
307333 // Create open watches since versions are up to date.
308334 if len (request .ResourceNames ) == 0 {
309- cache .watchAll [value ] = struct {}{}
335+ cache .watchAll [value ] = streamState
310336 return func () {
311337 cache .mu .Lock ()
312338 defer cache .mu .Unlock ()
@@ -319,7 +345,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
319345 set = make (watches )
320346 cache .watches [name ] = set
321347 }
322- set [value ] = struct {}{}
348+ set [value ] = streamState
323349 }
324350 return func () {
325351 cache .mu .Lock ()
0 commit comments