@@ -18,10 +18,11 @@ package sotw
1818import (
1919 "context"
2020 "errors"
21- "reflect"
2221 "strconv"
22+ "sync"
2323 "sync/atomic"
2424
25+ "golang.org/x/sync/errgroup"
2526 "google.golang.org/grpc/codes"
2627 "google.golang.org/grpc/status"
2728
@@ -72,6 +73,24 @@ type lastDiscoveryResponse struct {
7273 resources map [string ]struct {}
7374}
7475
76+ type lastDiscoveryResponses struct {
77+ mu sync.RWMutex
78+ responses map [string ]lastDiscoveryResponse
79+ }
80+
81+ func (l * lastDiscoveryResponses ) Set (key string , value lastDiscoveryResponse ) {
82+ l .mu .Lock ()
83+ l .responses [key ] = value
84+ l .mu .Unlock ()
85+ }
86+
87+ func (l * lastDiscoveryResponses ) Get (key string ) (value lastDiscoveryResponse , ok bool ) {
88+ l .mu .RLock ()
89+ value , ok = l .responses [key ]
90+ l .mu .RUnlock ()
91+ return
92+ }
93+
7594// process handles a bi-di stream request
7695func (s * server ) process (str stream.Stream , reqCh <- chan * discovery.DiscoveryRequest , defaultTypeURL string ) error {
7796 // increment stream count
@@ -82,13 +101,12 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
82101 var streamNonce int64
83102
84103 streamState := stream .NewStreamState (false , map [string ]string {})
85- lastDiscoveryResponses := map [string ]lastDiscoveryResponse { }
104+ lastDiscoveryResponses := lastDiscoveryResponses { responses : make ( map [string ]lastDiscoveryResponse ) }
86105
87106 // a collection of stack allocated watches per request type
88107 watches := newWatches ()
89108
90109 defer func () {
91- watches .close ()
92110 if s .callbacks != nil {
93111 s .callbacks .OnStreamClosed (streamID )
94112 }
@@ -116,7 +134,7 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
116134 for _ , r := range resp .GetRequest ().ResourceNames {
117135 lastResponse .resources [r ] = struct {}{}
118136 }
119- lastDiscoveryResponses [ resp .GetRequest ().TypeUrl ] = lastResponse
137+ lastDiscoveryResponses . Set ( resp .GetRequest ().TypeUrl , lastResponse )
120138
121139 if s .callbacks != nil {
122140 s .callbacks .OnStreamResponse (resp .GetContext (), streamID , resp .GetRequest (), out )
@@ -133,103 +151,100 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
133151 // node may only be set on the first discovery request
134152 var node = & core.Node {}
135153
136- // recompute dynamic channels for this stream
137- watches .recompute (s .ctx , reqCh )
138-
139- for {
140- // The list of select cases looks like this:
141- // 0: <- ctx.Done
142- // 1: <- reqCh
143- // 2...: per type watches
144- index , value , ok := reflect .Select (watches .cases )
145- switch index {
146- // ctx.Done() -> if we receive a value here we return as no further computation is needed
147- case 0 :
148- return nil
149- // Case 1 handles any request inbound on the stream and handles all initialization as needed
150- case 1 :
151- // input stream ended or errored out
152- if ! ok {
153- return nil
154- }
155-
156- req := value .Interface ().(* discovery.DiscoveryRequest )
157- if req == nil {
158- return status .Errorf (codes .Unavailable , "empty request" )
159- }
154+ var resCh = make (chan cache.Response , 1 )
160155
161- // node field in discovery request is delta-compressed
162- if req .Node != nil {
163- node = req .Node
164- } else {
165- req .Node = node
166- }
156+ ctx , cancel := context .WithCancel (s .ctx )
157+ eg , ctx := errgroup .WithContext (ctx )
167158
168- // nonces can be reused across streams; we verify nonce only if nonce is not initialized
169- nonce := req .GetResponseNonce ()
159+ eg .Go (func () error {
160+ defer func () {
161+ watches .close () // this should remove all watches from the cache
162+ close (resCh ) // close resCh and let the second eg.Go drain it
163+ }()
170164
171- // type URL is required for ADS but is implicit for xDS
172- if defaultTypeURL == resource .AnyType {
173- if req .TypeUrl == "" {
174- return status .Errorf (codes .InvalidArgument , "type URL is required for ADS" )
165+ for {
166+ select {
167+ case <- ctx .Done ():
168+ return nil
169+ case req , more := <- reqCh :
170+ if ! more {
171+ return nil
172+ }
173+ if req == nil {
174+ return status .Errorf (codes .Unavailable , "empty request" )
175+ }
176+ // node field in discovery request is delta-compressed
177+ if req .Node != nil {
178+ node = req .Node
179+ } else {
180+ req .Node = node
175181 }
176- } else if req .TypeUrl == "" {
177- req .TypeUrl = defaultTypeURL
178- }
179182
180- if s .callbacks != nil {
181- if err := s .callbacks .OnStreamRequest (streamID , req ); err != nil {
182- return err
183+ // nonces can be reused across streams; we verify nonce only if nonce is not initialized
184+ nonce := req .GetResponseNonce ()
185+
186+ // type URL is required for ADS but is implicit for xDS
187+ if defaultTypeURL == resource .AnyType {
188+ if req .TypeUrl == "" {
189+ return status .Errorf (codes .InvalidArgument , "type URL is required for ADS" )
190+ }
191+ } else if req .TypeUrl == "" {
192+ req .TypeUrl = defaultTypeURL
183193 }
184- }
185194
186- if lastResponse , ok := lastDiscoveryResponses [ req . TypeUrl ]; ok {
187- if lastResponse . nonce == "" || lastResponse . nonce == nonce {
188- // Let's record Resource names that a client has received.
189- streamState . SetKnownResourceNames ( req . TypeUrl , lastResponse . resources )
195+ if s . callbacks != nil {
196+ if err := s . callbacks . OnStreamRequest ( streamID , req ); err != nil {
197+ return err
198+ }
190199 }
191- }
192200
193- typeURL := req .GetTypeUrl ()
194- responder := make (chan cache.Response , 1 )
195- if w , ok := watches .responders [typeURL ]; ok {
196- // We've found a pre-existing watch, lets check and update if needed.
197- // If these requirements aren't satisfied, leave an open watch.
198- if w .nonce == "" || w .nonce == nonce {
199- w .close ()
201+ if lastResponse , ok := lastDiscoveryResponses .Get (req .TypeUrl ); ok {
202+ if lastResponse .nonce == "" || lastResponse .nonce == nonce {
203+ // Let's record Resource names that a client has received.
204+ streamState .SetKnownResourceNames (req .TypeUrl , lastResponse .resources )
205+ }
206+ }
200207
208+ typeURL := req .GetTypeUrl ()
209+ if w := watches .getWatch (typeURL ); w != nil {
210+ // We've found a pre-existing watch, lets check and update if needed.
211+ // If these requirements aren't satisfied, leave an open watch.
212+ if n := w .getNonce (); n == "" || n == nonce {
213+ w .close ()
214+
215+ watches .addWatch (typeURL , & watch {
216+ cancel : s .cache .CreateWatch (req , streamState , resCh ),
217+ })
218+ }
219+ } else {
220+ // No pre-existing watch exists, let's create one.
221+ // We need to precompute the watches first then open a watch in the cache.
201222 watches .addWatch (typeURL , & watch {
202- cancel : s .cache .CreateWatch (req , streamState , responder ),
203- response : responder ,
223+ cancel : s .cache .CreateWatch (req , streamState , resCh ),
204224 })
205225 }
206- } else {
207- // No pre-existing watch exists, let's create one.
208- // We need to precompute the watches first then open a watch in the cache.
209- watches .addWatch (typeURL , & watch {
210- cancel : s .cache .CreateWatch (req , streamState , responder ),
211- response : responder ,
212- })
213226 }
227+ }
228+ })
214229
215- // Recompute the dynamic select cases for this stream.
216- watches .recompute (s .ctx , reqCh )
217- default :
218- // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
219- if ! ok {
220- // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
221- return status .Errorf (codes .Unavailable , "resource watch %d -> failed" , index )
230+ eg .Go (func () (err error ) {
231+ var nonce string
232+ for res := range resCh {
233+ if res == nil || err != nil {
234+ continue // this loop should not exit until resCh closed
222235 }
223-
224- res := value .Interface ().(cache.Response )
225- nonce , err := send (res )
226- if err != nil {
227- return err
236+ if nonce , err = send (res ); err == nil {
237+ if w := watches .getWatch (res .GetRequest ().TypeUrl ); w != nil {
238+ w .setNonce (nonce )
239+ }
240+ } else {
241+ cancel ()
228242 }
229-
230- watches .responders [res .GetRequest ().TypeUrl ].nonce = nonce
231243 }
232- }
244+ return err
245+ })
246+
247+ return eg .Wait ()
233248}
234249
235250// StreamHandler converts a blocking read call to channels and initiates stream processing
0 commit comments