-
Notifications
You must be signed in to change notification settings - Fork 10.2k
Fix the client connection is closing issue in robustness test #20689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,10 @@ import ( | |
| "go.etcd.io/etcd/tests/v3/robustness/report" | ||
| ) | ||
|
|
||
| var ( | ||
| errClientIsClosed = errors.New("client is closed") | ||
| ) | ||
|
|
||
| // RecordingClient provides a semi-etcd client (different interface than | ||
| // clientv3.Client) that records all the requests and responses made. Doesn't | ||
| // allow for concurrent requests to conform to model.AppendableHistory requirements. | ||
|
|
@@ -45,6 +49,8 @@ type RecordingClient struct { | |
| // mux ensures order of request appending. | ||
| kvMux sync.Mutex | ||
| kvOperations *model.AppendableHistory | ||
|
|
||
| isClosed bool | ||
| } | ||
|
|
||
| var _ clientv3.KV = (*RecordingClient)(nil) | ||
|
|
@@ -69,11 +75,20 @@ func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time | |
| client: *cc, | ||
| kvOperations: model.NewAppendableHistory(ids), | ||
| baseTime: baseTime, | ||
| isClosed: false, | ||
| }, nil | ||
| } | ||
|
|
||
| func (c *RecordingClient) Close() error { | ||
| return c.client.Close() | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil | ||
| } | ||
|
|
||
| err := c.client.Close() | ||
| c.isClosed = true // if we set to true only if there is no error, we need to handle the retry in defer | ||
| return err | ||
| } | ||
|
|
||
| func (c *RecordingClient) Report() report.ClientReport { | ||
|
|
@@ -94,6 +109,12 @@ func (c *RecordingClient) Get(ctx context.Context, key string, opts ...clientv3. | |
| } | ||
|
|
||
| func (c *RecordingClient) Range(ctx context.Context, start, end string, revision, limit int64) (*clientv3.GetResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| ops := []clientv3.OpOption{} | ||
| if end != "" { | ||
| ops = append(ops, clientv3.WithRange(end)) | ||
|
|
@@ -104,8 +125,6 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision | |
| if limit != 0 { | ||
| ops = append(ops, clientv3.WithLimit(limit)) | ||
| } | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| callTime := time.Since(c.baseTime) | ||
| resp, err := c.client.Get(ctx, start, ops...) | ||
| returnTime := time.Since(c.baseTime) | ||
|
|
@@ -116,6 +135,10 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision | |
| func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clientv3.OpOption) (*clientv3.PutResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| callTime := time.Since(c.baseTime) | ||
| resp, err := c.client.Put(ctx, key, value) | ||
| returnTime := time.Since(c.baseTime) | ||
|
|
@@ -126,6 +149,10 @@ func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clien | |
| func (c *RecordingClient) Delete(ctx context.Context, key string, _ ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| callTime := time.Since(c.baseTime) | ||
| resp, err := c.client.Delete(ctx, key) | ||
| returnTime := time.Since(c.baseTime) | ||
|
|
@@ -172,12 +199,22 @@ func (w *wrappedTxn) Commit() (*clientv3.TxnResponse, error) { | |
| } | ||
|
|
||
| func (c *RecordingClient) Txn(ctx context.Context) clientv3.Txn { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't return nil as it breaks contract. Instead implement interface that returns error when we execute Txn. |
||
| } | ||
|
|
||
| return &wrappedTxn{txn: c.client.Txn(ctx), c: c} | ||
| } | ||
|
|
||
| func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| callTime := time.Since(c.baseTime) | ||
| resp, err := c.client.Lease.Grant(ctx, ttl) | ||
| returnTime := time.Since(c.baseTime) | ||
|
|
@@ -188,6 +225,10 @@ func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3. | |
| func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clientv3.LeaseRevokeResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| callTime := time.Since(c.baseTime) | ||
| resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseID)) | ||
| returnTime := time.Since(c.baseTime) | ||
|
|
@@ -196,9 +237,13 @@ func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clie | |
| } | ||
|
|
||
| func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseID int64) (*clientv3.PutResponse, error) { | ||
| opts := clientv3.WithLease(clientv3.LeaseID(leaseID)) | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| opts := clientv3.WithLease(clientv3.LeaseID(leaseID)) | ||
| callTime := time.Since(c.baseTime) | ||
| resp, err := c.client.Put(ctx, key, value, opts) | ||
| returnTime := time.Since(c.baseTime) | ||
|
|
@@ -209,6 +254,10 @@ func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value st | |
| func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| callTime := time.Since(c.baseTime) | ||
| resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0]) | ||
| returnTime := time.Since(c.baseTime) | ||
|
|
@@ -219,6 +268,10 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR | |
| func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3.CompactOption) (*clientv3.CompactResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| callTime := time.Since(c.baseTime) | ||
| resp, err := c.client.Compact(ctx, rev) | ||
| returnTime := time.Since(c.baseTime) | ||
|
|
@@ -229,48 +282,76 @@ func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3. | |
| func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| resp, err := c.client.MemberList(ctx, opts...) | ||
| return resp, err | ||
| } | ||
|
|
||
| func (c *RecordingClient) MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| resp, err := c.client.MemberAdd(ctx, peerAddrs) | ||
| return resp, err | ||
| } | ||
|
|
||
| func (c *RecordingClient) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| resp, err := c.client.MemberAddAsLearner(ctx, peerAddrs) | ||
| return resp, err | ||
| } | ||
|
|
||
| func (c *RecordingClient) MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| resp, err := c.client.MemberRemove(ctx, id) | ||
| return resp, err | ||
| } | ||
|
|
||
| func (c *RecordingClient) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*clientv3.MemberUpdateResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| resp, err := c.client.MemberUpdate(ctx, id, peerAddrs) | ||
| return resp, err | ||
| } | ||
|
|
||
| func (c *RecordingClient) MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| resp, err := c.client.MemberPromote(ctx, id) | ||
| return resp, err | ||
| } | ||
|
|
||
| func (c *RecordingClient) Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil, errClientIsClosed | ||
| } | ||
|
|
||
| resp, err := c.client.Status(ctx, endpoint) | ||
| return resp, err | ||
| } | ||
|
|
@@ -280,6 +361,12 @@ func (c *RecordingClient) Endpoints() []string { | |
| } | ||
|
|
||
| func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please return error |
||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return nil | ||
| } | ||
|
|
||
| request := model.WatchRequest{ | ||
| Key: key, | ||
| Revision: rev, | ||
|
|
@@ -333,6 +420,12 @@ func (c *RecordingClient) watch(ctx context.Context, request model.WatchRequest) | |
| } | ||
|
|
||
| func (c *RecordingClient) RequestProgress(ctx context.Context) error { | ||
| c.kvMux.Lock() | ||
| defer c.kvMux.Unlock() | ||
| if c.isClosed { | ||
| return errClientIsClosed | ||
| } | ||
|
|
||
| return c.client.RequestProgress(ctx) | ||
| } | ||
|
|
||
|
|
@@ -434,7 +527,7 @@ func (cs *ClientSet) close() { | |
| return | ||
| } | ||
| for _, c := range cs.clients { | ||
| c.Close() | ||
| _ = c.Close() | ||
| } | ||
| cs.closed = true | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -99,6 +99,9 @@ resetWatch: | |
| return nil | ||
| } | ||
| watch := c.Watch(ctx, "", lastRevision+1, true, true, false) | ||
| if watch == nil { | ||
| return nil | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not correct. Please read how this loop exists. |
||
| } | ||
| for { | ||
| select { | ||
| case revision, ok := <-maxRevisionChan: | ||
|
|
@@ -157,13 +160,19 @@ func openWatchPeriodically(ctx context.Context, g *errgroup.Group, c *RecordingC | |
| g.Go(func() error { | ||
| resp, err := c.Get(ctx, "/key") | ||
| if err != nil { | ||
| if errors.Is(err, errClientIsClosed) { | ||
| return nil | ||
| } | ||
| return err | ||
| } | ||
| rev := resp.Header.Revision + backgroundWatchConfig.RevisionOffset | ||
|
|
||
| watchCtx, cancel := context.WithCancel(ctx) | ||
| defer cancel() | ||
| w := c.Watch(watchCtx, "", rev, true, true, true) | ||
| if w == nil { | ||
| return nil | ||
| } | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wrong too.