Skip to content

Commit 614dd35

Browse files
committed
Refactor RunTrafficLoop, RunCompactLoop, and CollectClusterWatchEvents in Robustness test.
In light of ever-increasing number of configuration options that we have passed in and will be passing in, a refactoring is performed for improving readability and maintainability. Signed-off-by: Chun-Hung Tseng <[email protected]>
1 parent ab3f19a commit 614dd35

File tree

6 files changed

+119
-55
lines changed

6 files changed

+119
-55
lines changed

tests/antithesis/test-template/robustness/traffic/main.go

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,13 @@ func runTraffic(ctx context.Context, lg *zap.Logger, tf traffic.Traffic, hosts [
120120
watchSet := client.NewSet(ids, baseTime)
121121
defer watchSet.Close()
122122
g.Go(func() error {
123-
err := client.CollectClusterWatchEvents(ctx, lg, hosts, maxRevisionChan, watchConfig, watchSet)
123+
err := client.CollectClusterWatchEvents(ctx, client.CollectClusterWatchEventsParam{
124+
Lg: lg,
125+
Endpoints: hosts,
126+
MaxRevisionChan: maxRevisionChan,
127+
Cfg: watchConfig,
128+
ClientSet: watchSet,
129+
})
124130
return err
125131
})
126132
if err := g.Wait(); err != nil {
@@ -152,13 +158,15 @@ func simulateTraffic(ctx context.Context, tf traffic.Traffic, hosts []string, cl
152158
go func(c *client.RecordingClient) {
153159
defer wg.Done()
154160
defer c.Close()
155-
tf.RunTrafficLoop(ctx, c, limiter,
156-
clientSet.IdentityProvider(),
157-
storage,
158-
concurrencyLimiter,
159-
keyStore,
160-
finish,
161-
)
161+
tf.RunTrafficLoop(ctx, traffic.RunTrafficLoopParam{
162+
Client: c,
163+
QPSLimiter: limiter,
164+
IDs: clientSet.IdentityProvider(),
165+
LeaseIDStorage: storage,
166+
NonUniqueRequestConcurrencyLimiter: concurrencyLimiter,
167+
KeyStore: keyStore,
168+
Finish: finish,
169+
})
162170
}(c)
163171
}
164172
for range profile.ClusterClientCount {
@@ -167,21 +175,27 @@ func simulateTraffic(ctx context.Context, tf traffic.Traffic, hosts []string, cl
167175
go func(c *client.RecordingClient) {
168176
defer wg.Done()
169177
defer c.Close()
170-
tf.RunTrafficLoop(ctx, c, limiter,
171-
clientSet.IdentityProvider(),
172-
storage,
173-
concurrencyLimiter,
174-
keyStore,
175-
finish,
176-
)
178+
tf.RunTrafficLoop(ctx, traffic.RunTrafficLoopParam{
179+
Client: c,
180+
QPSLimiter: limiter,
181+
IDs: clientSet.IdentityProvider(),
182+
LeaseIDStorage: storage,
183+
NonUniqueRequestConcurrencyLimiter: concurrencyLimiter,
184+
KeyStore: keyStore,
185+
Finish: finish,
186+
})
177187
}(c)
178188
}
179189
wg.Add(1)
180190
compactClient := connect(clientSet, hosts)
181191
go func(c *client.RecordingClient) {
182192
defer wg.Done()
183193
defer c.Close()
184-
tf.RunCompactLoop(ctx, c, traffic.DefaultCompactionPeriod, finish)
194+
tf.RunCompactLoop(ctx, traffic.RunCompactLoopParam{
195+
Client: c,
196+
Period: traffic.DefaultCompactionPeriod,
197+
Finish: finish,
198+
})
185199
}(compactClient)
186200
defragPeriod := traffic.DefaultCompactionPeriod * time.Duration(len(hosts))
187201
for _, h := range hosts {

tests/robustness/client/watch.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,27 +25,35 @@ import (
2525
"go.etcd.io/etcd/tests/v3/robustness/report"
2626
)
2727

28-
func CollectClusterWatchEvents(ctx context.Context, lg *zap.Logger, endpoints []string, maxRevisionChan <-chan int64, cfg WatchConfig, clientSet *ClientSet) error {
28+
type CollectClusterWatchEventsParam struct {
29+
Lg *zap.Logger
30+
Endpoints []string
31+
MaxRevisionChan <-chan int64
32+
Cfg WatchConfig
33+
ClientSet *ClientSet
34+
}
35+
36+
func CollectClusterWatchEvents(ctx context.Context, param CollectClusterWatchEventsParam) error {
2937
var g errgroup.Group
30-
reports := make([]report.ClientReport, len(endpoints))
31-
memberMaxRevisionChans := make([]chan int64, len(endpoints))
32-
for i, endpoint := range endpoints {
38+
reports := make([]report.ClientReport, len(param.Endpoints))
39+
memberMaxRevisionChans := make([]chan int64, len(param.Endpoints))
40+
for i, endpoint := range param.Endpoints {
3341
memberMaxRevisionChan := make(chan int64, 1)
3442
memberMaxRevisionChans[i] = memberMaxRevisionChan
3543
g.Go(func() error {
36-
c, err := clientSet.NewClient([]string{endpoint})
44+
c, err := param.ClientSet.NewClient([]string{endpoint})
3745
if err != nil {
3846
return err
3947
}
4048
defer c.Close()
41-
err = watchUntilRevision(ctx, lg, c, memberMaxRevisionChan, cfg)
49+
err = watchUntilRevision(ctx, param.Lg, c, memberMaxRevisionChan, param.Cfg)
4250
reports[i] = c.Report()
4351
return err
4452
})
4553
}
4654

4755
g.Go(func() error {
48-
maxRevision := <-maxRevisionChan
56+
maxRevision := <-param.MaxRevisionChan
4957
for _, memberChan := range memberMaxRevisionChans {
5058
memberChan <- maxRevision
5159
}

tests/robustness/main_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,13 @@ func runScenario(ctx context.Context, t *testing.T, s scenarios.TestScenario, lg
167167
defer watchSet.Close()
168168
g.Go(func() error {
169169
endpoints := processEndpoints(clus)
170-
err := client.CollectClusterWatchEvents(ctx, lg, endpoints, maxRevisionChan, s.Watch, watchSet)
170+
err := client.CollectClusterWatchEvents(ctx, client.CollectClusterWatchEventsParam{
171+
Lg: lg,
172+
Endpoints: endpoints,
173+
MaxRevisionChan: maxRevisionChan,
174+
Cfg: s.Watch,
175+
ClientSet: watchSet,
176+
})
171177
return err
172178
})
173179
err := g.Wait()

tests/robustness/traffic/etcd.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -109,23 +109,23 @@ func (t etcdTraffic) Name() string {
109109
return "Etcd"
110110
}
111111

112-
func (t etcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, keyStore *keyStore, finish <-chan struct{}) {
112+
func (t etcdTraffic) RunTrafficLoop(ctx context.Context, p RunTrafficLoopParam) {
113113
lastOperationSucceeded := true
114114
var lastRev int64
115115
var requestType etcdRequestType
116116
client := etcdTrafficClient{
117117
etcdTraffic: t,
118-
keyStore: keyStore,
119-
client: c,
120-
limiter: limiter,
121-
idProvider: ids,
122-
leaseStorage: lm,
118+
keyStore: p.KeyStore,
119+
client: p.Client,
120+
limiter: p.QPSLimiter,
121+
idProvider: p.IDs,
122+
leaseStorage: p.LeaseIDStorage,
123123
}
124124
for {
125125
select {
126126
case <-ctx.Done():
127127
return
128-
case <-finish:
128+
case <-p.Finish:
129129
return
130130
default:
131131
}
@@ -134,7 +134,7 @@ func (t etcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClie
134134
// Avoid multiple failed writes in a row
135135
if lastOperationSucceeded {
136136
choices := t.requests
137-
if shouldReturn = nonUniqueWriteLimiter.Take(); !shouldReturn {
137+
if shouldReturn = p.NonUniqueRequestConcurrencyLimiter.Take(); !shouldReturn {
138138
choices = filterOutNonUniqueEtcdWrites(choices)
139139
}
140140
requestType = random.PickRandom(choices)
@@ -143,7 +143,7 @@ func (t etcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClie
143143
}
144144
rev, err := client.Request(ctx, requestType, lastRev)
145145
if shouldReturn {
146-
nonUniqueWriteLimiter.Return()
146+
p.NonUniqueRequestConcurrencyLimiter.Return()
147147
}
148148
lastOperationSucceeded = err == nil
149149
if err != nil {
@@ -152,32 +152,32 @@ func (t etcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClie
152152
if rev != 0 {
153153
lastRev = rev
154154
}
155-
limiter.Wait(ctx)
155+
p.QPSLimiter.Wait(ctx)
156156
}
157157
}
158158

159-
func (t etcdTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{}) {
159+
func (t etcdTraffic) RunCompactLoop(ctx context.Context, param RunCompactLoopParam) {
160160
var lastRev int64 = 2
161-
ticker := time.NewTicker(period)
161+
ticker := time.NewTicker(param.Period)
162162
defer ticker.Stop()
163163
for {
164164
select {
165165
case <-ctx.Done():
166166
return
167-
case <-finish:
167+
case <-param.Finish:
168168
return
169169
case <-ticker.C:
170170
}
171171
statusCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
172-
resp, err := c.Status(statusCtx, c.Endpoints()[0])
172+
resp, err := param.Client.Status(statusCtx, param.Client.Endpoints()[0])
173173
cancel()
174174
if err != nil {
175175
continue
176176
}
177177

178178
// Range allows for both revision has been compacted and future revision errors
179179
compactRev := random.RandRange(lastRev, resp.Header.Revision+5)
180-
_, err = c.Compact(ctx, compactRev)
180+
_, err = param.Client.Compact(ctx, compactRev)
181181
if err != nil {
182182
continue
183183
}

tests/robustness/traffic/kubernetes.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ func (t kubernetesTraffic) ExpectUniqueRevision() bool {
8888
return true
8989
}
9090

91-
func (t kubernetesTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, keyStore *keyStore, finish <-chan struct{}) {
92-
kc := kubernetes.Client{Client: &clientv3.Client{KV: c}}
91+
func (t kubernetesTraffic) RunTrafficLoop(ctx context.Context, p RunTrafficLoopParam) {
92+
kc := kubernetes.Client{Client: &clientv3.Client{KV: p.Client}}
9393
s := newStorage()
9494
keyPrefix := "/registry/" + t.resource + "/"
9595
g := errgroup.Group{}
@@ -99,11 +99,11 @@ func (t kubernetesTraffic) RunTrafficLoop(ctx context.Context, c *client.Recordi
9999
select {
100100
case <-ctx.Done():
101101
return ctx.Err()
102-
case <-finish:
102+
case <-p.Finish:
103103
return nil
104104
default:
105105
}
106-
err := t.Read(ctx, c, s, limiter, keyPrefix)
106+
err := t.Read(ctx, p.Client, s, p.QPSLimiter, keyPrefix)
107107
if err != nil {
108108
continue
109109
}
@@ -115,18 +115,18 @@ func (t kubernetesTraffic) RunTrafficLoop(ctx context.Context, c *client.Recordi
115115
select {
116116
case <-ctx.Done():
117117
return ctx.Err()
118-
case <-finish:
118+
case <-p.Finish:
119119
return nil
120120
default:
121121
}
122122
// Avoid multiple failed writes in a row
123123
if lastWriteFailed {
124-
_, err := t.List(ctx, kc, s, limiter, keyPrefix, t.averageKeyCount, 0)
124+
_, err := t.List(ctx, kc, s, p.QPSLimiter, keyPrefix, t.averageKeyCount, 0)
125125
if err != nil {
126126
continue
127127
}
128128
}
129-
err := t.Write(ctx, kc, ids, s, limiter, nonUniqueWriteLimiter)
129+
err := t.Write(ctx, kc, p.IDs, s, p.QPSLimiter, p.NonUniqueRequestConcurrencyLimiter)
130130
lastWriteFailed = err != nil
131131
if err != nil {
132132
continue
@@ -271,21 +271,21 @@ func (t kubernetesTraffic) generateKey() string {
271271
return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5))
272272
}
273273

274-
func (t kubernetesTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClient, interval time.Duration, finish <-chan struct{}) {
274+
func (t kubernetesTraffic) RunCompactLoop(ctx context.Context, param RunCompactLoopParam) {
275275
// Based on https://github.com/kubernetes/apiserver/blob/7dd4904f1896e11244ba3c5a59797697709de6b6/pkg/storage/etcd3/compact.go#L112-L127
276276
var compactTime int64
277277
var rev int64
278278
var err error
279279
for {
280280
select {
281-
case <-time.After(interval):
281+
case <-time.After(param.Period):
282282
case <-ctx.Done():
283283
return
284-
case <-finish:
284+
case <-param.Finish:
285285
return
286286
}
287287

288-
compactTime, rev, err = compact(ctx, c, compactTime, rev)
288+
compactTime, rev, err = compact(ctx, param.Client, compactTime, rev)
289289
if err != nil {
290290
continue
291291
}

tests/robustness/traffic/traffic.go

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,15 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
8484
defer wg.Done()
8585
defer c.Close()
8686

87-
traffic.RunTrafficLoop(ctx, c, limiter, clientSet.IdentityProvider(), lm, nonUniqueWriteLimiter, keyStore, finish)
87+
traffic.RunTrafficLoop(ctx, RunTrafficLoopParam{
88+
Client: c,
89+
QPSLimiter: limiter,
90+
IDs: clientSet.IdentityProvider(),
91+
LeaseIDStorage: lm,
92+
NonUniqueRequestConcurrencyLimiter: nonUniqueWriteLimiter,
93+
KeyStore: keyStore,
94+
Finish: finish,
95+
})
8896
}(c)
8997
}
9098
for range profile.ClusterClientCount {
@@ -96,7 +104,15 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
96104
defer wg.Done()
97105
defer c.Close()
98106

99-
traffic.RunTrafficLoop(ctx, c, limiter, clientSet.IdentityProvider(), lm, nonUniqueWriteLimiter, keyStore, finish)
107+
traffic.RunTrafficLoop(ctx, RunTrafficLoopParam{
108+
Client: c,
109+
QPSLimiter: limiter,
110+
IDs: clientSet.IdentityProvider(),
111+
LeaseIDStorage: lm,
112+
NonUniqueRequestConcurrencyLimiter: nonUniqueWriteLimiter,
113+
KeyStore: keyStore,
114+
Finish: finish,
115+
})
100116
}(c)
101117
}
102118
if !profile.ForbidCompaction {
@@ -114,7 +130,11 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
114130
compactionPeriod = profile.CompactPeriod
115131
}
116132

117-
traffic.RunCompactLoop(ctx, c, compactionPeriod, finish)
133+
traffic.RunCompactLoop(ctx, RunCompactLoopParam{
134+
Client: c,
135+
Period: compactionPeriod,
136+
Finish: finish,
137+
})
118138
}(c)
119139
}
120140
var fr *report.FailpointInjection
@@ -287,9 +307,25 @@ func (p Profile) WithCompactionPeriod(cp time.Duration) Profile {
287307
return p
288308
}
289309

310+
type RunTrafficLoopParam struct {
311+
Client *client.RecordingClient
312+
QPSLimiter *rate.Limiter
313+
IDs identity.Provider
314+
LeaseIDStorage identity.LeaseIDStorage
315+
NonUniqueRequestConcurrencyLimiter ConcurrencyLimiter
316+
KeyStore *keyStore
317+
Finish <-chan struct{}
318+
}
319+
320+
type RunCompactLoopParam struct {
321+
Client *client.RecordingClient
322+
Period time.Duration
323+
Finish <-chan struct{}
324+
}
325+
290326
type Traffic interface {
291-
RunTrafficLoop(ctx context.Context, c *client.RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, keyStore *keyStore, finish <-chan struct{})
292-
RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{})
327+
RunTrafficLoop(ctx context.Context, param RunTrafficLoopParam)
328+
RunCompactLoop(ctx context.Context, param RunCompactLoopParam)
293329
ExpectUniqueRevision() bool
294330
}
295331

0 commit comments

Comments
 (0)