-
Notifications
You must be signed in to change notification settings - Fork 4.6k
xdsclient: stop batching writes on the ADS stream #8627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
defa96b
abe69aa
6ff8f6d
9796c4f
d1fba3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -83,6 +83,7 @@ func (b *Unbounded) Load() { | |
| default: | ||
| } | ||
| } else if b.closing && !b.closed { | ||
| b.closed = true | ||
| close(b.c) | ||
| } | ||
| } | ||
|
|
@@ -114,3 +115,23 @@ func (b *Unbounded) Close() { | |
| close(b.c) | ||
| } | ||
| } | ||
|
|
||
| // Reset clears all buffered data in the unbounded buffer. This does not close | ||
| // the buffer, and new data may be Put() into it after a call to this method. | ||
| // | ||
| // It's expected to be used in scenarios where the buffered data is no longer | ||
| // relevant, and needs to be cleared. | ||
| func (b *Unbounded) Reset() { | ||
| b.mu.Lock() | ||
| defer b.mu.Unlock() | ||
|
|
||
| if b.closing { | ||
|
||
| return | ||
| } | ||
|
|
||
| b.backlog = b.backlog[:0] | ||
| select { | ||
| case <-b.c: | ||
| default: | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is inherently racy with other calls to
Put()(andLoad(), of course). I hope we are using it correctly! :) And if we have an external lock that is used for all things that call Put and Reset, I wonder if a different data structure might be more efficient?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same question. My understanding is that Reset acts as an optimization to avoid redundant requests, but it isn't required for correctness.
Even so, you're right about the potential for races. I think adding a godoc comment to warn about this is a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've ended up using a list type from the stdlib for the implementation of the request queue, which will now be guarded by the
adsStreamImpls mutex.While this eliminates the possibility of the race (which existed in the previous approach), the use of a condition variable to know when the request queue is empty (and blocking until it becomes non-empty), and the fact that
sync.Conddoesn't allow blocking in aselectwith other ways of getting unblocked, the code has become a bit uglier and more complicated in my opinion.Another option would be to create a new type like
SynchronizedUnboundedBufferthat takes async.Lockerand guards access to all of its methods with given lock. But even in that case, sinceGetreturns a channel, it is not a viable option I believe.Another reason I went with the stdlib list instead of trying to synchronize access to the unbounded buffer (while also supporting a
Resetmethod) was that the unbounded buffer is currently thread-safe and adding a new method which moves the responsibility of synchronization to the caller didn't seem very appealing.Would like to hear your thoughts on these.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of a similar approach that uses a channel instead of a
sync.Cond.Add the following fields in
adsStreamImpl:notifySender: make(chan struct{}, 1). This channel will be used bysubscribeandunsubscribeto inform thesendgoroutine that there "may" be buffered requests in the queue.pendingRequests: []request{}. This will be protected bymu.subscribe()andunsubscribewill do the following:mulock.pendingRequestsslice.mulock.notifySenderchannel (with adefaultcase to avoid blocking).The
sendgoroutine will do the following:selectstatement waiting ons.streamChands.notifySender.s.streamChcase is selected: CallsendExisting, which will acquire themulock and swap outpendingRequestswith an empty slice.s.notifySendercase is selected: Acquire themulock, swap outpendingRequestskeeping the original slice, release themulock, and then callsendNewfor each request in the original slice.Notice that sending on the
notifySenderchannel is done after appending to thependingRequestsslice. This ensures that thesendgoroutine doesn't miss any notifications. This is because if there's an object in thenotifySenderchannel, it's guaranteed that at some point in the future, thesendgoroutine will read from thenotifySenderchannel, acquiremu, and see the new elements in thependingRequestsslice.Do you think this makes the implementation simpler, and still correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the idea, this is great. This will definitely work and will make things simpler. The godoc on the sync package does say that for most simple cases, you can get away with a channel instead of a condition variable. And when I was trying to make things work with a channel instead of a condition variable, I had already changed the structure of the sending goroutine quite a bit (compared to its earlier version) and I couldn't find a way to use the channel without holding the lock.