Skip to content

Commit 4c25b7d

Browse files
committed
Fix the client connection is closing issue in robustness test
Signed-off-by: Chun-Hung Tseng <[email protected]>
1 parent 926f4c1 commit 4c25b7d

File tree

2 files changed

+144
-2
lines changed

2 files changed

+144
-2
lines changed

tests/robustness/client/client.go

Lines changed: 135 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ import (
3030
"go.etcd.io/etcd/tests/v3/robustness/report"
3131
)
3232

33+
var (
34+
errClientIsClosed = errors.New("client is closed")
35+
)
36+
3337
// RecordingClient provides a semi-etcd client (different interface than
3438
// clientv3.Client) that records all the requests and responses made. Doesn't
3539
// allow for concurrent requests to conform to model.AppendableHistory requirements.
@@ -45,6 +49,9 @@ type RecordingClient struct {
4549
// mux ensures order of request appending.
4650
kvMux sync.Mutex
4751
kvOperations *model.AppendableHistory
52+
53+
isClosed bool
54+
sync.Mutex
4855
}
4956

5057
var _ clientv3.KV = (*RecordingClient)(nil)
@@ -69,11 +76,22 @@ func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time
6976
client: *cc,
7077
kvOperations: model.NewAppendableHistory(ids),
7178
baseTime: baseTime,
79+
isClosed: false,
7280
}, nil
7381
}
7482

7583
func (c *RecordingClient) Close() error {
76-
return c.client.Close()
84+
c.Lock()
85+
defer c.Unlock()
86+
if c.isClosed {
87+
return nil
88+
}
89+
90+
err := c.client.Close()
91+
if err != nil {
92+
c.isClosed = true
93+
}
94+
return err
7795
}
7896

7997
func (c *RecordingClient) Report() report.ClientReport {
@@ -89,11 +107,24 @@ func (c *RecordingClient) Do(ctx context.Context, op clientv3.Op) (clientv3.OpRe
89107
}
90108

91109
func (c *RecordingClient) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
110+
c.Lock()
111+
if c.isClosed {
112+
c.Unlock()
113+
return nil, errClientIsClosed
114+
}
115+
c.Unlock()
116+
92117
op := clientv3.OpGet(key, opts...)
93118
return c.Range(ctx, key, string(op.RangeBytes()), op.Rev(), op.Limit())
94119
}
95120

96121
func (c *RecordingClient) Range(ctx context.Context, start, end string, revision, limit int64) (*clientv3.GetResponse, error) {
122+
c.Lock()
123+
defer c.Unlock()
124+
if c.isClosed {
125+
return nil, errClientIsClosed
126+
}
127+
97128
ops := []clientv3.OpOption{}
98129
if end != "" {
99130
ops = append(ops, clientv3.WithRange(end))
@@ -114,6 +145,12 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision
114145
}
115146

116147
func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clientv3.OpOption) (*clientv3.PutResponse, error) {
148+
c.Lock()
149+
defer c.Unlock()
150+
if c.isClosed {
151+
return nil, errClientIsClosed
152+
}
153+
117154
c.kvMux.Lock()
118155
defer c.kvMux.Unlock()
119156
callTime := time.Since(c.baseTime)
@@ -124,6 +161,12 @@ func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clien
124161
}
125162

126163
func (c *RecordingClient) Delete(ctx context.Context, key string, _ ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
164+
c.Lock()
165+
defer c.Unlock()
166+
if c.isClosed {
167+
return nil, errClientIsClosed
168+
}
169+
127170
c.kvMux.Lock()
128171
defer c.kvMux.Unlock()
129172
callTime := time.Since(c.baseTime)
@@ -172,10 +215,22 @@ func (w *wrappedTxn) Commit() (*clientv3.TxnResponse, error) {
172215
}
173216

174217
func (c *RecordingClient) Txn(ctx context.Context) clientv3.Txn {
218+
c.Lock()
219+
defer c.Unlock()
220+
if c.isClosed {
221+
return nil
222+
}
223+
175224
return &wrappedTxn{txn: c.client.Txn(ctx), c: c}
176225
}
177226

178227
func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) {
228+
c.Lock()
229+
defer c.Unlock()
230+
if c.isClosed {
231+
return nil, errClientIsClosed
232+
}
233+
179234
c.kvMux.Lock()
180235
defer c.kvMux.Unlock()
181236
callTime := time.Since(c.baseTime)
@@ -186,6 +241,12 @@ func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.
186241
}
187242

188243
func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clientv3.LeaseRevokeResponse, error) {
244+
c.Lock()
245+
defer c.Unlock()
246+
if c.isClosed {
247+
return nil, errClientIsClosed
248+
}
249+
189250
c.kvMux.Lock()
190251
defer c.kvMux.Unlock()
191252
callTime := time.Since(c.baseTime)
@@ -196,6 +257,12 @@ func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clie
196257
}
197258

198259
func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseID int64) (*clientv3.PutResponse, error) {
260+
c.Lock()
261+
defer c.Unlock()
262+
if c.isClosed {
263+
return nil, errClientIsClosed
264+
}
265+
199266
opts := clientv3.WithLease(clientv3.LeaseID(leaseID))
200267
c.kvMux.Lock()
201268
defer c.kvMux.Unlock()
@@ -207,6 +274,12 @@ func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value st
207274
}
208275

209276
func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentResponse, error) {
277+
c.Lock()
278+
defer c.Unlock()
279+
if c.isClosed {
280+
return nil, errClientIsClosed
281+
}
282+
210283
c.kvMux.Lock()
211284
defer c.kvMux.Unlock()
212285
callTime := time.Since(c.baseTime)
@@ -217,6 +290,12 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
217290
}
218291

219292
func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3.CompactOption) (*clientv3.CompactResponse, error) {
293+
c.Lock()
294+
defer c.Unlock()
295+
if c.isClosed {
296+
return nil, errClientIsClosed
297+
}
298+
220299
c.kvMux.Lock()
221300
defer c.kvMux.Unlock()
222301
callTime := time.Since(c.baseTime)
@@ -227,48 +306,90 @@ func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3.
227306
}
228307

229308
func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) {
309+
c.Lock()
310+
defer c.Unlock()
311+
if c.isClosed {
312+
return nil, errClientIsClosed
313+
}
314+
230315
c.kvMux.Lock()
231316
defer c.kvMux.Unlock()
232317
resp, err := c.client.MemberList(ctx, opts...)
233318
return resp, err
234319
}
235320

236321
func (c *RecordingClient) MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
322+
c.Lock()
323+
defer c.Unlock()
324+
if c.isClosed {
325+
return nil, errClientIsClosed
326+
}
327+
237328
c.kvMux.Lock()
238329
defer c.kvMux.Unlock()
239330
resp, err := c.client.MemberAdd(ctx, peerAddrs)
240331
return resp, err
241332
}
242333

243334
func (c *RecordingClient) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
335+
c.Lock()
336+
defer c.Unlock()
337+
if c.isClosed {
338+
return nil, errClientIsClosed
339+
}
340+
244341
c.kvMux.Lock()
245342
defer c.kvMux.Unlock()
246343
resp, err := c.client.MemberAddAsLearner(ctx, peerAddrs)
247344
return resp, err
248345
}
249346

250347
func (c *RecordingClient) MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) {
348+
c.Lock()
349+
defer c.Unlock()
350+
if c.isClosed {
351+
return nil, errClientIsClosed
352+
}
353+
251354
c.kvMux.Lock()
252355
defer c.kvMux.Unlock()
253356
resp, err := c.client.MemberRemove(ctx, id)
254357
return resp, err
255358
}
256359

257360
func (c *RecordingClient) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*clientv3.MemberUpdateResponse, error) {
361+
c.Lock()
362+
defer c.Unlock()
363+
if c.isClosed {
364+
return nil, errClientIsClosed
365+
}
366+
258367
c.kvMux.Lock()
259368
defer c.kvMux.Unlock()
260369
resp, err := c.client.MemberUpdate(ctx, id, peerAddrs)
261370
return resp, err
262371
}
263372

264373
func (c *RecordingClient) MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) {
374+
c.Lock()
375+
defer c.Unlock()
376+
if c.isClosed {
377+
return nil, errClientIsClosed
378+
}
379+
265380
c.kvMux.Lock()
266381
defer c.kvMux.Unlock()
267382
resp, err := c.client.MemberPromote(ctx, id)
268383
return resp, err
269384
}
270385

271386
func (c *RecordingClient) Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) {
387+
c.Lock()
388+
defer c.Unlock()
389+
if c.isClosed {
390+
return nil, errClientIsClosed
391+
}
392+
272393
c.kvMux.Lock()
273394
defer c.kvMux.Unlock()
274395
resp, err := c.client.Status(ctx, endpoint)
@@ -280,6 +401,12 @@ func (c *RecordingClient) Endpoints() []string {
280401
}
281402

282403
func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan {
404+
c.Lock()
405+
defer c.Unlock()
406+
if c.isClosed {
407+
return nil
408+
}
409+
283410
request := model.WatchRequest{
284411
Key: key,
285412
Revision: rev,
@@ -333,6 +460,12 @@ func (c *RecordingClient) watch(ctx context.Context, request model.WatchRequest)
333460
}
334461

335462
func (c *RecordingClient) RequestProgress(ctx context.Context) error {
463+
c.Lock()
464+
defer c.Unlock()
465+
if c.isClosed {
466+
return errClientIsClosed
467+
}
468+
336469
return c.client.RequestProgress(ctx)
337470
}
338471

@@ -434,7 +567,7 @@ func (cs *ClientSet) close() {
434567
return
435568
}
436569
for _, c := range cs.clients {
437-
c.Close()
570+
_ = c.Close()
438571
}
439572
cs.closed = true
440573
}

tests/robustness/client/watch.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ resetWatch:
103103
return nil
104104
}
105105
watch := c.Watch(ctx, "", lastRevision+1, true, true, false)
106+
if watch == nil {
107+
return nil
108+
}
106109
for {
107110
select {
108111
case revision, ok := <-maxRevisionChan:
@@ -161,13 +164,19 @@ func openWatchPeriodically(ctx context.Context, g *errgroup.Group, c *RecordingC
161164
g.Go(func() error {
162165
resp, err := c.Get(ctx, "/key")
163166
if err != nil {
167+
if errors.Is(err, errClientIsClosed) {
168+
return nil
169+
}
164170
return err
165171
}
166172
rev := resp.Header.Revision + backgroundWatchConfig.RevisionOffset
167173

168174
watchCtx, cancel := context.WithCancel(ctx)
169175
defer cancel()
170176
w := c.Watch(watchCtx, "", rev, true, true, true)
177+
if w == nil {
178+
return nil
179+
}
171180
for {
172181
select {
173182
case <-ctx.Done():

0 commit comments

Comments
 (0)