Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions internal/xds/clients/internal/buffer/unbounded.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (b *Unbounded) Load() {
default:
}
} else if b.closing && !b.closed {
b.closed = true
close(b.c)
}
}
Expand Down Expand Up @@ -114,3 +115,23 @@ func (b *Unbounded) Close() {
close(b.c)
}
}

// Reset clears all buffered data in the unbounded buffer. This does not close
// the buffer, and new data may be Put() into it after a call to this method.
//
// It's expected to be used in scenarios where the buffered data is no longer
// relevant, and needs to be cleared.
func (b *Unbounded) Reset() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inherently racy with other calls to Put() (and Load(), of course). I hope we are using it correctly! :) And if we have an external lock that is used for all things that call Put and Reset, I wonder if a different data structure might be more efficient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inherently racy with other calls to Put() (and Load(), of course). I hope we are using it correctly!

I had the same question. My understanding is that Reset acts as an optimization to avoid redundant requests, but it isn't required for correctness.

Even so, you're right about the potential for races. I think adding a godoc comment to warn about this is a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've ended up using a list type from the stdlib for the implementation of the request queue, which will now be guarded by the adsStreamImpls mutex.

While this eliminates the possibility of the race (which existed in the previous approach), the use of a condition variable to know when the request queue is empty (and blocking until it becomes non-empty), and the fact that sync.Cond doesn't allow blocking in a select with other ways of getting unblocked, the code has become a bit uglier and more complicated in my opinion.

Another option would be to create a new type like SynchronizedUnboundedBuffer that takes a sync.Locker and guards access to all of its methods with given lock. But even in that case, since Get returns a channel, it is not a viable option I believe.

Another reason I went with the stdlib list instead of trying to synchronize access to the unbounded buffer (while also supporting a Reset method) was that the unbounded buffer is currently thread-safe and adding a new method which moves the responsibility of synchronization to the caller didn't seem very appealing.

Would like to hear your thoughts on these.

Copy link
Contributor

@arjan-bal arjan-bal Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of a similar approach that uses a channel instead of a sync.Cond.

Add the following fields in adsStreamImpl:

  1. notifySender: make(chan struct{}, 1). This channel will be used by subscribe and unsubscribe to inform the send goroutine that there "may" be buffered requests in the queue.
  2. pendingRequests: []request{}. This will be protected by mu.

subscribe() and unsubscribe will do the following:

  1. Acquire the mu lock.
  2. Append to the pendingRequests slice.
  3. Release the mu lock.
  4. Send to the notifySender channel (with a default case to avoid blocking).

The send goroutine will do the following:

  1. Run a select statement waiting on s.streamCh and s.notifySender.
    1. If the s.streamCh case is selected: Call sendExisting, which will acquire the mu lock and swap out pendingRequests with an empty slice.
    2. If the s.notifySender case is selected: Acquire the mu lock, swap out pendingRequests keeping the original slice, release the mu lock, and then call sendNew for each request in the original slice.

Notice that sending on the notifySender channel is done after appending to the pendingRequests slice. This ensures that the send goroutine doesn't miss any notifications. This is because if there's an object in the notifySender channel, it's guaranteed that at some point in the future, the send goroutine will read from the notifySender channel, acquire mu, and see the new elements in the pendingRequests slice.

Do you think this makes the implementation simpler, and still correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the idea, this is great. This will definitely work and will make things simpler. The godoc on the sync package does say that for most simple cases, you can get away with a channel instead of a condition variable. And when I was trying to make things work with a channel instead of a condition variable, I had already changed the structure of the sending goroutine quite a bit (compared to its earlier version) and I couldn't find a way to use the channel without holding the lock.

b.mu.Lock()
defer b.mu.Unlock()

if b.closing {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we also need to consider (return early) for the case when b.closed is true?
Although this unbound package and method is exported, but it is internal, so I think there would be little room for misuse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm getting rid of this method since it is not needed anymore.

return
}

b.backlog = b.backlog[:0]
select {
case <-b.c:
default:
}
}
35 changes: 35 additions & 0 deletions internal/xds/clients/internal/buffer/unbounded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sort"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/internal/grpctest"
Expand Down Expand Up @@ -146,3 +147,37 @@ func (s) TestClose(t *testing.T) {
}
ub.Close() // ignored
}

// TestReset resets the buffer and makes sure that the buffer can be used after
// the reset.
func (s) TestReset(t *testing.T) {
ub := NewUnbounded()
if err := ub.Put(1); err != nil {
t.Fatalf("Unbounded.Put() = %v; want nil", err)
}
if err := ub.Put(2); err != nil {
t.Fatalf("Unbounded.Put() = %v; want nil", err)
}
if v, ok := <-ub.Get(); !ok {
t.Errorf("Unbounded.Get() = %v, %v, want %v, %v", v, ok, 1, true)
}
ub.Load()
ub.Reset()

// Make sure the buffer is empty after the reset. Wait for a short duration
// to make sure that no value is received on the read channel.
select {
case v, ok := <-ub.Get():
t.Errorf("Unbounded.Get() = %v, %v; want no value", v, ok)
case <-time.After(10 * time.Millisecond):
}

// Make sure the buffer can be used after the reset.
if err := ub.Put(1); err != nil {
t.Fatalf("Unbounded.Put() = %v; want nil", err)
}
if v, ok := <-ub.Get(); !ok {
t.Errorf("Unbounded.Get() = %v, %v, want %v, %v", v, ok, 1, true)
}
ub.Load()
}
Loading