Skip to content

Commit 9796c4f

Browse files
committed
use a slice and a channel for the requests queue
1 parent 6ff8f6d commit 9796c4f

File tree

1 file changed

+45
-35
lines changed

1 file changed

+45
-35
lines changed

internal/xds/clients/xdsclient/ads_stream.go

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
igrpclog "google.golang.org/grpc/internal/grpclog"
2929
"google.golang.org/grpc/internal/xds/clients"
3030
"google.golang.org/grpc/internal/xds/clients/internal/backoff"
31-
"google.golang.org/grpc/internal/xds/clients/internal/buffer"
3231
"google.golang.org/grpc/internal/xds/clients/internal/pretty"
3332
"google.golang.org/grpc/internal/xds/clients/xdsclient/internal/xdsresource"
3433

@@ -104,15 +103,16 @@ type adsStreamImpl struct {
104103
// The following fields are initialized in the constructor and are not
105104
// written to afterwards, and hence can be accessed without a mutex.
106105
streamCh chan clients.Stream // New ADS streams are pushed here.
107-
requestCh *buffer.Unbounded // Subscriptions and unsubscriptions are pushed here.
108106
runnerDoneCh chan struct{} // Notify completion of runner goroutine.
109107
cancel context.CancelFunc // To cancel the context passed to the runner goroutine.
110108
fc *adsFlowControl // Flow control for ADS stream.
109+
notifySender chan struct{} // To notify the sending goroutine of a pending request.
111110

112111
// Guards access to the below fields (and to the contents of the map).
113112
mu sync.Mutex
114113
resourceTypeState map[ResourceType]*resourceTypeState // Map of resource types to their state.
115114
firstRequest bool // False after the first request is sent out.
115+
pendingRequests []request // Subscriptions and unsubscriptions are pushed here.
116116
}
117117

118118
// adsStreamOpts contains the options for creating a new ADS Stream.
@@ -137,9 +137,9 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
137137
watchExpiryTimeout: opts.watchExpiryTimeout,
138138

139139
streamCh: make(chan clients.Stream, 1),
140-
requestCh: buffer.NewUnbounded(),
141140
runnerDoneCh: make(chan struct{}),
142141
fc: newADSFlowControl(),
142+
notifySender: make(chan struct{}, 1),
143143
resourceTypeState: make(map[ResourceType]*resourceTypeState),
144144
}
145145

@@ -156,7 +156,6 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
156156
func (s *adsStreamImpl) Stop() {
157157
s.cancel()
158158
s.fc.stop()
159-
s.requestCh.Close()
160159
<-s.runnerDoneCh
161160
s.logger.Infof("Shutdown ADS stream")
162161
}
@@ -171,8 +170,6 @@ func (s *adsStreamImpl) subscribe(typ ResourceType, name string) {
171170
}
172171

173172
s.mu.Lock()
174-
defer s.mu.Unlock()
175-
176173
state, ok := s.resourceTypeState[typ]
177174
if !ok {
178175
// An entry in the type state map is created as part of the first
@@ -186,7 +183,13 @@ func (s *adsStreamImpl) subscribe(typ ResourceType, name string) {
186183
state.subscribedResources[name] = &xdsresource.ResourceWatchState{State: xdsresource.ResourceWatchStateStarted}
187184

188185
// Send a request for the resource type with updated subscriptions.
189-
s.requestCh.Put(request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
186+
s.pendingRequests = append(s.pendingRequests, request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
187+
s.mu.Unlock()
188+
189+
select {
190+
case s.notifySender <- struct{}{}:
191+
default:
192+
}
190193
}
191194

192195
// unsubscribe cancels the subscription to the given resource. It is a no-op if
@@ -199,15 +202,14 @@ func (s *adsStreamImpl) unsubscribe(typ ResourceType, name string) {
199202
}
200203

201204
s.mu.Lock()
202-
defer s.mu.Unlock()
203-
204205
state, ok := s.resourceTypeState[typ]
205206
if !ok {
207+
s.mu.Unlock()
206208
return
207209
}
208-
209210
rs, ok := state.subscribedResources[name]
210211
if !ok {
212+
s.mu.Unlock()
211213
return
212214
}
213215
if rs.ExpiryTimer != nil {
@@ -216,7 +218,13 @@ func (s *adsStreamImpl) unsubscribe(typ ResourceType, name string) {
216218
delete(state.subscribedResources, name)
217219

218220
// Send a request for the resource type with updated subscriptions.
219-
s.requestCh.Put(request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
221+
s.pendingRequests = append(s.pendingRequests, request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
222+
s.mu.Unlock()
223+
224+
select {
225+
case s.notifySender <- struct{}{}:
226+
default:
227+
}
220228
}
221229

222230
// runner is a long-running goroutine that handles the lifecycle of the ADS
@@ -280,41 +288,43 @@ func (s *adsStreamImpl) send(ctx context.Context) {
280288
stream = nil
281289
continue
282290
}
283-
case r, ok := <-s.requestCh.Get():
284-
if !ok {
285-
return
291+
case <-s.notifySender:
292+
// If there's no stream yet, skip the request. This request will be resent
293+
// when a new stream is created. If no stream is created, the watcher will
294+
// timeout (same as server not sending response back).
295+
if stream == nil {
296+
continue
286297
}
287-
s.requestCh.Load()
288298

289-
req := r.(request)
290-
if err := s.sendNew(stream, req.typ, req.resourceNames); err != nil {
299+
// Resetting the pendingRequests slice to nil works for both cases:
300+
// - When we successfully sends the requests out on the wire.
301+
// - When sending fails. This can happen only when the stream fails,
302+
// and in this case, we rely on the `sendExisting` to send out
303+
// requests for all subscriptions when the stream is recreated.
304+
s.mu.Lock()
305+
if err := s.sendNewLocked(stream, s.pendingRequests); err != nil {
291306
stream = nil
292-
continue
293307
}
308+
s.pendingRequests = nil
309+
s.mu.Unlock()
294310
}
295311
}
296312
}
297313

298-
// sendNew attempts to send a discovery request based on a new subscription or
314+
// sendNewLocked attempts to send a discovery request based on a new subscription or
299315
// unsubscription. This method also starts the watch expiry timer for resources
300316
// that were sent in the request for the first time, i.e. their watch state is
301317
// `watchStateStarted`.
302-
func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType, names []string) error {
303-
s.mu.Lock()
304-
defer s.mu.Unlock()
305-
306-
// If there's no stream yet, skip the request. This request will be resent
307-
// when a new stream is created. If no stream is created, the watcher will
308-
// timeout (same as server not sending response back).
309-
if stream == nil {
310-
return nil
311-
}
312-
313-
state := s.resourceTypeState[typ]
314-
if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil {
315-
return err
318+
//
319+
// Caller needs to hold c.mu.
320+
func (s *adsStreamImpl) sendNewLocked(stream clients.Stream, requests []request) error {
321+
for _, req := range requests {
322+
state := s.resourceTypeState[req.typ]
323+
if err := s.sendMessageLocked(stream, req.resourceNames, req.typ.TypeURL, state.version, state.nonce, nil); err != nil {
324+
return err
325+
}
326+
s.startWatchTimersLocked(req.typ, req.resourceNames)
316327
}
317-
s.startWatchTimersLocked(typ, names)
318328
return nil
319329
}
320330

@@ -328,7 +338,7 @@ func (s *adsStreamImpl) sendExisting(stream clients.Stream) error {
328338

329339
// Clear any queued requests. Previously subscribed to resources will be
330340
// resent below.
331-
s.requestCh.Reset()
341+
s.pendingRequests = nil
332342

333343
for typ, state := range s.resourceTypeState {
334344
// Reset only the nonces map when the stream restarts.

0 commit comments

Comments
 (0)