Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 98 additions & 5 deletions tests/robustness/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/report"
)

var (
errClientIsClosed = errors.New("client is closed")
)

// RecordingClient provides a semi-etcd client (different interface than
// clientv3.Client) that records all the requests and responses made. Doesn't
// allow for concurrent requests to conform to model.AppendableHistory requirements.
Expand All @@ -45,6 +49,8 @@ type RecordingClient struct {
// mux ensures order of request appending.
kvMux sync.Mutex
kvOperations *model.AppendableHistory

isClosed bool
}

var _ clientv3.KV = (*RecordingClient)(nil)
Expand All @@ -69,11 +75,20 @@ func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time
client: *cc,
kvOperations: model.NewAppendableHistory(ids),
baseTime: baseTime,
isClosed: false,
}, nil
}

func (c *RecordingClient) Close() error {
return c.client.Close()
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil
}

err := c.client.Close()
c.isClosed = true // if we set to true only if there is no error, we need to handle the retry in defer
return err
}

func (c *RecordingClient) Report() report.ClientReport {
Expand All @@ -94,6 +109,12 @@ func (c *RecordingClient) Get(ctx context.Context, key string, opts ...clientv3.
}

func (c *RecordingClient) Range(ctx context.Context, start, end string, revision, limit int64) (*clientv3.GetResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

ops := []clientv3.OpOption{}
if end != "" {
ops = append(ops, clientv3.WithRange(end))
Expand All @@ -104,8 +125,6 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision
if limit != 0 {
ops = append(ops, clientv3.WithLimit(limit))
}
c.kvMux.Lock()
defer c.kvMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Get(ctx, start, ops...)
returnTime := time.Since(c.baseTime)
Expand All @@ -116,6 +135,10 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision
func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clientv3.OpOption) (*clientv3.PutResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

callTime := time.Since(c.baseTime)
resp, err := c.client.Put(ctx, key, value)
returnTime := time.Since(c.baseTime)
Expand All @@ -126,6 +149,10 @@ func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clien
func (c *RecordingClient) Delete(ctx context.Context, key string, _ ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

callTime := time.Since(c.baseTime)
resp, err := c.client.Delete(ctx, key)
returnTime := time.Since(c.baseTime)
Expand Down Expand Up @@ -172,12 +199,22 @@ func (w *wrappedTxn) Commit() (*clientv3.TxnResponse, error) {
}

func (c *RecordingClient) Txn(ctx context.Context) clientv3.Txn {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't return nil as it breaks contract. Instead implement interface that returns error when we execute Txn.

}

return &wrappedTxn{txn: c.client.Txn(ctx), c: c}
}

func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

callTime := time.Since(c.baseTime)
resp, err := c.client.Lease.Grant(ctx, ttl)
returnTime := time.Since(c.baseTime)
Expand All @@ -188,6 +225,10 @@ func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.
func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clientv3.LeaseRevokeResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

callTime := time.Since(c.baseTime)
resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseID))
returnTime := time.Since(c.baseTime)
Expand All @@ -196,9 +237,13 @@ func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clie
}

func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseID int64) (*clientv3.PutResponse, error) {
opts := clientv3.WithLease(clientv3.LeaseID(leaseID))
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

opts := clientv3.WithLease(clientv3.LeaseID(leaseID))
callTime := time.Since(c.baseTime)
resp, err := c.client.Put(ctx, key, value, opts)
returnTime := time.Since(c.baseTime)
Expand All @@ -209,6 +254,10 @@ func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value st
func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

callTime := time.Since(c.baseTime)
resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0])
returnTime := time.Since(c.baseTime)
Expand All @@ -219,6 +268,10 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3.CompactOption) (*clientv3.CompactResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

callTime := time.Since(c.baseTime)
resp, err := c.client.Compact(ctx, rev)
returnTime := time.Since(c.baseTime)
Expand All @@ -229,48 +282,76 @@ func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3.
func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

resp, err := c.client.MemberList(ctx, opts...)
return resp, err
}

func (c *RecordingClient) MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

resp, err := c.client.MemberAdd(ctx, peerAddrs)
return resp, err
}

func (c *RecordingClient) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

resp, err := c.client.MemberAddAsLearner(ctx, peerAddrs)
return resp, err
}

func (c *RecordingClient) MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

resp, err := c.client.MemberRemove(ctx, id)
return resp, err
}

func (c *RecordingClient) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*clientv3.MemberUpdateResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

resp, err := c.client.MemberUpdate(ctx, id, peerAddrs)
return resp, err
}

func (c *RecordingClient) MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

resp, err := c.client.MemberPromote(ctx, id)
return resp, err
}

func (c *RecordingClient) Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil, errClientIsClosed
}

resp, err := c.client.Status(ctx, endpoint)
return resp, err
}
Expand All @@ -280,6 +361,12 @@ func (c *RecordingClient) Endpoints() []string {
}

func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please return error

c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return nil
}

request := model.WatchRequest{
Key: key,
Revision: rev,
Expand Down Expand Up @@ -333,6 +420,12 @@ func (c *RecordingClient) watch(ctx context.Context, request model.WatchRequest)
}

func (c *RecordingClient) RequestProgress(ctx context.Context) error {
c.kvMux.Lock()
defer c.kvMux.Unlock()
if c.isClosed {
return errClientIsClosed
}

return c.client.RequestProgress(ctx)
}

Expand Down Expand Up @@ -434,7 +527,7 @@ func (cs *ClientSet) close() {
return
}
for _, c := range cs.clients {
c.Close()
_ = c.Close()
}
cs.closed = true
}
Expand Down
9 changes: 9 additions & 0 deletions tests/robustness/client/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ resetWatch:
return nil
}
watch := c.Watch(ctx, "", lastRevision+1, true, true, false)
if watch == nil {
return nil
Copy link
Member

@serathius serathius Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct. Please read how this loop exists.

}
for {
select {
case revision, ok := <-maxRevisionChan:
Expand Down Expand Up @@ -157,13 +160,19 @@ func openWatchPeriodically(ctx context.Context, g *errgroup.Group, c *RecordingC
g.Go(func() error {
resp, err := c.Get(ctx, "/key")
if err != nil {
if errors.Is(err, errClientIsClosed) {
return nil
}
return err
}
rev := resp.Header.Revision + backgroundWatchConfig.RevisionOffset

watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
w := c.Watch(watchCtx, "", rev, true, true, true)
if w == nil {
return nil
}
for {
select {
case <-ctx.Done():
Expand Down