diff --git a/tests/robustness/client/client.go b/tests/robustness/client/client.go index 2c13b25a3058..3a10b0fbd30b 100644 --- a/tests/robustness/client/client.go +++ b/tests/robustness/client/client.go @@ -30,6 +30,10 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/report" ) +var ( + errClientIsClosed = errors.New("client is closed") +) + // RecordingClient provides a semi-etcd client (different interface than // clientv3.Client) that records all the requests and responses made. Doesn't // allow for concurrent requests to conform to model.AppendableHistory requirements. @@ -45,6 +49,8 @@ type RecordingClient struct { // mux ensures order of request appending. kvMux sync.Mutex kvOperations *model.AppendableHistory + + isClosed bool } var _ clientv3.KV = (*RecordingClient)(nil) @@ -69,11 +75,20 @@ func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time client: *cc, kvOperations: model.NewAppendableHistory(ids), baseTime: baseTime, + isClosed: false, }, nil } func (c *RecordingClient) Close() error { - return c.client.Close() + c.kvMux.Lock() + defer c.kvMux.Unlock() + if c.isClosed { + return nil + } + + err := c.client.Close() + c.isClosed = true // if we set to true only if there is no error, we need to handle the retry in defer + return err } func (c *RecordingClient) Report() report.ClientReport { @@ -94,6 +109,12 @@ func (c *RecordingClient) Get(ctx context.Context, key string, opts ...clientv3. } func (c *RecordingClient) Range(ctx context.Context, start, end string, revision, limit int64) (*clientv3.GetResponse, error) { + c.kvMux.Lock() + defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + ops := []clientv3.OpOption{} if end != "" { ops = append(ops, clientv3.WithRange(end)) @@ -104,8 +125,6 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision if limit != 0 { ops = append(ops, clientv3.WithLimit(limit)) } - c.kvMux.Lock() - defer c.kvMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Get(ctx, start, ops...) returnTime := time.Since(c.baseTime) @@ -116,6 +135,10 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clientv3.OpOption) (*clientv3.PutResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + callTime := time.Since(c.baseTime) resp, err := c.client.Put(ctx, key, value) returnTime := time.Since(c.baseTime) @@ -126,6 +149,10 @@ func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clien func (c *RecordingClient) Delete(ctx context.Context, key string, _ ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + callTime := time.Since(c.baseTime) resp, err := c.client.Delete(ctx, key) returnTime := time.Since(c.baseTime) @@ -172,12 +199,22 @@ func (w *wrappedTxn) Commit() (*clientv3.TxnResponse, error) { } func (c *RecordingClient) Txn(ctx context.Context) clientv3.Txn { + c.kvMux.Lock() + defer c.kvMux.Unlock() + if c.isClosed { + return nil + } + return &wrappedTxn{txn: c.client.Txn(ctx), c: c} } func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Grant(ctx, ttl) returnTime := time.Since(c.baseTime) @@ -188,6 +225,10 @@ func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3. func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clientv3.LeaseRevokeResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseID)) returnTime := time.Since(c.baseTime) @@ -196,9 +237,13 @@ func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clie } func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseID int64) (*clientv3.PutResponse, error) { - opts := clientv3.WithLease(clientv3.LeaseID(leaseID)) c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + + opts := clientv3.WithLease(clientv3.LeaseID(leaseID)) callTime := time.Since(c.baseTime) resp, err := c.client.Put(ctx, key, value, opts) returnTime := time.Since(c.baseTime) @@ -209,6 +254,10 @@ func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value st func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + callTime := time.Since(c.baseTime) resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0]) returnTime := time.Since(c.baseTime) @@ -219,6 +268,10 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3.CompactOption) (*clientv3.CompactResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + callTime := time.Since(c.baseTime) resp, err := c.client.Compact(ctx, rev) returnTime := time.Since(c.baseTime) @@ -229,6 +282,10 @@ func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3. func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + resp, err := c.client.MemberList(ctx, opts...) return resp, err } @@ -236,6 +293,10 @@ func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOpt func (c *RecordingClient) MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + resp, err := c.client.MemberAdd(ctx, peerAddrs) return resp, err } @@ -243,6 +304,10 @@ func (c *RecordingClient) MemberAdd(ctx context.Context, peerAddrs []string) (*c func (c *RecordingClient) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + resp, err := c.client.MemberAddAsLearner(ctx, peerAddrs) return resp, err } @@ -250,6 +315,10 @@ func (c *RecordingClient) MemberAddAsLearner(ctx context.Context, peerAddrs []st func (c *RecordingClient) MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + resp, err := c.client.MemberRemove(ctx, id) return resp, err } @@ -257,6 +326,10 @@ func (c *RecordingClient) MemberRemove(ctx context.Context, id uint64) (*clientv func (c *RecordingClient) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*clientv3.MemberUpdateResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + resp, err := c.client.MemberUpdate(ctx, id, peerAddrs) return resp, err } @@ -264,6 +337,10 @@ func (c *RecordingClient) MemberUpdate(ctx context.Context, id uint64, peerAddrs func (c *RecordingClient) MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + resp, err := c.client.MemberPromote(ctx, id) return resp, err } @@ -271,6 +348,10 @@ func (c *RecordingClient) MemberPromote(ctx context.Context, id uint64) (*client func (c *RecordingClient) Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + if c.isClosed { + return nil, errClientIsClosed + } + resp, err := c.client.Status(ctx, endpoint) return resp, err } @@ -280,6 +361,12 @@ func (c *RecordingClient) Endpoints() []string { } func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan { + c.kvMux.Lock() + defer c.kvMux.Unlock() + if c.isClosed { + return nil + } + request := model.WatchRequest{ Key: key, Revision: rev, @@ -333,6 +420,12 @@ func (c *RecordingClient) watch(ctx context.Context, request model.WatchRequest) } func (c *RecordingClient) RequestProgress(ctx context.Context) error { + c.kvMux.Lock() + defer c.kvMux.Unlock() + if c.isClosed { + return errClientIsClosed + } + return c.client.RequestProgress(ctx) } @@ -434,7 +527,7 @@ func (cs *ClientSet) close() { return } for _, c := range cs.clients { - c.Close() + _ = c.Close() } cs.closed = true } diff --git a/tests/robustness/client/watch.go b/tests/robustness/client/watch.go index 63a0f261b0ea..0d8f169971e2 100644 --- a/tests/robustness/client/watch.go +++ b/tests/robustness/client/watch.go @@ -99,6 +99,9 @@ resetWatch: return nil } watch := c.Watch(ctx, "", lastRevision+1, true, true, false) + if watch == nil { + return nil + } for { select { case revision, ok := <-maxRevisionChan: @@ -157,6 +160,9 @@ func openWatchPeriodically(ctx context.Context, g *errgroup.Group, c *RecordingC g.Go(func() error { resp, err := c.Get(ctx, "/key") if err != nil { + if errors.Is(err, errClientIsClosed) { + return nil + } return err } rev := resp.Header.Revision + backgroundWatchConfig.RevisionOffset @@ -164,6 +170,9 @@ func openWatchPeriodically(ctx context.Context, g *errgroup.Group, c *RecordingC watchCtx, cancel := context.WithCancel(ctx) defer cancel() w := c.Watch(watchCtx, "", rev, true, true, true) + if w == nil { + return nil + } for { select { case <-ctx.Done():