Skip to content

Commit b5db0e6

Browse files
committed
update all dependencies to latest versions
Code changes to accommodate API updates: - Fixed tablewriter API: SetHeader() -> Header() - Fixed gRPC balancer test mocks to work with v1.73+ requirements by using wrapper pattern instead of direct interface implementation - Simplified test structures by removing unnecessary wrapper layer - Fixed some incorrect usages of legacy `errors` package.
1 parent 830f3d5 commit b5db0e6

File tree

11 files changed

+442
-635
lines changed

11 files changed

+442
-635
lines changed

broker/protocol/dispatcher.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,10 @@ func (d *dispatcher) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
266266
// and terminates the period sweep channel.
267267
func (d *dispatcher) Close() { close(d.sweepDoneCh) }
268268

269+
// ExitIdle is a no-op for dispatcher as it manages SubConn lifecycle directly
270+
// based on dispatch routes rather than connection state.
271+
func (d *dispatcher) ExitIdle() {}
272+
269273
// less defines an ordering over ProcessSpec_ID preferences used by dispatcher.
270274
func (d *dispatcher) less(lhs, rhs ProcessSpec_ID) bool {
271275
// Always prefer a defined ProcessSpec_ID over the zero-valued one

broker/protocol/dispatcher_test.go

Lines changed: 93 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"google.golang.org/grpc/balancer"
88
"google.golang.org/grpc/codes"
99
"google.golang.org/grpc/connectivity"
10+
"google.golang.org/grpc/experimental/stats"
1011
"google.golang.org/grpc/resolver"
1112
"google.golang.org/grpc/status"
1213
gc "gopkg.in/check.v1"
@@ -43,8 +44,8 @@ func (s *DispatcherSuite) TestContextAdapters(c *gc.C) {
4344
}
4445

4546
func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
46-
var cc mockClientConn
47-
var disp = dispatcherBuilder{zone: "local"}.Build(&cc, balancer.BuildOptions{}).(*dispatcher)
47+
var cc = newMockClientConn()
48+
var disp = dispatcherBuilder{zone: "local"}.Build(cc, balancer.BuildOptions{}).(*dispatcher)
4849
cc.disp = disp
4950
close(disp.sweepDoneCh) // Disable async sweeping.
5051

@@ -68,7 +69,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
6869
result, err := disp.Pick(balancer.PickInfo{Ctx: ctx})
6970
c.Check(err, gc.IsNil)
7071
c.Check(result.Done, gc.IsNil)
71-
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "default.addr:80", disp: disp})
72+
c.Check(result.SubConn.(*testSubConnWrapper).name, gc.Equals, "default.addr:80")
7273

7374
// Case: Specific remote peer is dispatched to.
7475
ctx = WithDispatchRoute(context.Background(),
@@ -84,7 +85,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
8485
result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
8586
c.Check(err, gc.IsNil)
8687
c.Check(result.Done, gc.IsNil)
87-
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "remote.addr:80", disp: disp})
88+
c.Check(result.SubConn.(*testSubConnWrapper).name, gc.Equals, "remote.addr:80")
8889

8990
// Case: Route allows for multiple members. A local one is now dialed.
9091
ctx = WithDispatchRoute(context.Background(), buildRouteFixture(), ProcessSpec_ID{})
@@ -99,7 +100,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
99100
result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
100101
c.Check(err, gc.IsNil)
101102
c.Check(result.Done, gc.IsNil)
102-
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "local.addr:80", disp: disp})
103+
c.Check(result.SubConn.(*testSubConnWrapper).name, gc.Equals, "local.addr:80")
103104

104105
// Case: One local addr is marked as failed. Another is dialed.
105106
mockSubConn{Name: "local.addr:80", disp: disp}.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
@@ -114,7 +115,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
114115
result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
115116
c.Check(err, gc.IsNil)
116117
c.Check(result.Done, gc.IsNil)
117-
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "local.otherAddr:80", disp: disp})
118+
c.Check(result.SubConn.(*testSubConnWrapper).name, gc.Equals, "local.otherAddr:80")
118119

119120
// Case: otherAddr is also failed. Expect that an error is returned,
120121
// rather than dispatch to remote addr. (Eg we prefer to wait for a
@@ -151,7 +152,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
151152
result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
152153
c.Check(err, gc.IsNil)
153154
c.Check(result.Done, gc.NotNil)
154-
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "local.addr:80", disp: disp})
155+
c.Check(result.SubConn.(*testSubConnWrapper).name, gc.Equals, "local.addr:80")
155156

156157
// Closure callback with an Unavailable error (only) will trigger an invalidation.
157158
result.Done(balancer.DoneInfo{Err: nil})
@@ -163,8 +164,8 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
163164
}
164165

165166
func (s *DispatcherSuite) TestDispatchMarkAndSweep(c *gc.C) {
166-
var cc mockClientConn
167-
var disp = dispatcherBuilder{zone: "local"}.Build(&cc, balancer.BuildOptions{}).(*dispatcher)
167+
var cc = newMockClientConn()
168+
var disp = dispatcherBuilder{zone: "local"}.Build(cc, balancer.BuildOptions{}).(*dispatcher)
168169
cc.disp = disp
169170
defer disp.Close()
170171

@@ -233,45 +234,103 @@ func (s *DispatcherSuite) TestDispatchMarkAndSweep(c *gc.C) {
233234
c.Check(err, gc.IsNil)
234235
}
235236

236-
type mockClientConn struct {
237-
err error
238-
created []mockSubConn
239-
removed []mockSubConn
240-
disp *dispatcher
237+
// testSubConnWrapper wraps a test SubConn to track operations
238+
type testSubConnWrapper struct {
239+
balancer.SubConn
240+
name string
241+
disp *dispatcher
241242
}
242243

244+
// mockSubConn represents a test SubConn for comparisons and state updates
243245
type mockSubConn struct {
244246
Name string
245247
disp *dispatcher
246248
}
247249

248-
func (s1 mockSubConn) Equal(s2 mockSubConn) bool {
249-
return s1.Name == s2.Name
250+
// mockClientConn implements balancer.ClientConn for testing
251+
type mockClientConn struct {
252+
balancer.ClientConn
253+
err error
254+
created []mockSubConn
255+
removed []mockSubConn
256+
disp *dispatcher
257+
subConns map[string]*testSubConnWrapper
258+
target string
250259
}
251260

252-
func (s mockSubConn) UpdateAddresses([]resolver.Address) { panic("deprecated") }
253-
func (s mockSubConn) UpdateState(state balancer.SubConnState) { s.disp.updateSubConnState(s, state) }
254-
func (s mockSubConn) Connect() {}
255-
func (s mockSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
256-
return nil, func() {}
257-
}
258-
func (s mockSubConn) Shutdown() {
259-
var c = s.disp.cc.(*mockClientConn)
260-
c.removed = append(c.removed, s)
261+
func newMockClientConn() *mockClientConn {
262+
return &mockClientConn{
263+
subConns: make(map[string]*testSubConnWrapper),
264+
target: "default.addr:80", // Default target for tests
265+
}
261266
}
262267

263-
func (c *mockClientConn) NewSubConn(a []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
264-
var sc = mockSubConn{Name: a[0].Addr, disp: c.disp}
265-
c.created = append(c.created, sc)
266-
return sc, c.err
268+
func (c *mockClientConn) NewSubConn(a []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
269+
if c.err != nil {
270+
return nil, c.err
271+
}
272+
273+
name := a[0].Addr
274+
sc := &testSubConnWrapper{
275+
name: name,
276+
disp: c.disp,
277+
}
278+
279+
c.subConns[name] = sc
280+
c.created = append(c.created, mockSubConn{Name: name, disp: c.disp})
281+
282+
// StateListener is handled by the gRPC framework
283+
284+
return sc, nil
267285
}
268286

269-
func (c *mockClientConn) UpdateAddresses(balancer.SubConn, []resolver.Address) { panic("deprecated") }
270-
func (c *mockClientConn) UpdateState(balancer.State) {}
271-
func (c *mockClientConn) ResolveNow(resolver.ResolveNowOptions) {}
272-
func (c *mockClientConn) Target() string { return "default.addr:80" }
287+
func (c *mockClientConn) UpdateState(state balancer.State) {}
288+
289+
func (c *mockClientConn) ResolveNow(resolver.ResolveNowOptions) {}
290+
291+
func (c *mockClientConn) Target() string { return c.target }
292+
273293
func (c *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
274-
sc.Shutdown()
294+
if tsc, ok := sc.(*testSubConnWrapper); ok {
295+
c.removed = append(c.removed, mockSubConn{Name: tsc.name, disp: tsc.disp})
296+
delete(c.subConns, tsc.name)
297+
}
298+
}
299+
300+
func (c *mockClientConn) MetricsRecorder() stats.MetricsRecorder { return nil }
301+
302+
// Additional fields for testSubConnWrapper
303+
var _ balancer.SubConn = (*testSubConnWrapper)(nil)
304+
305+
func (s *testSubConnWrapper) UpdateAddresses([]resolver.Address) { panic("deprecated") }
306+
307+
func (s *testSubConnWrapper) UpdateState(state balancer.SubConnState) {
308+
if s.disp != nil {
309+
s.disp.updateSubConnState(s, state)
310+
}
311+
}
312+
313+
func (s *testSubConnWrapper) Connect() {}
314+
315+
func (s *testSubConnWrapper) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
316+
return nil, func() {}
317+
}
318+
319+
func (s *testSubConnWrapper) Shutdown() {
320+
if cc, ok := s.disp.cc.(*mockClientConn); ok {
321+
cc.removed = append(cc.removed, mockSubConn{Name: s.name, disp: s.disp})
322+
}
323+
}
324+
325+
func (s *testSubConnWrapper) RegisterHealthListener(func(balancer.SubConnState)) {}
326+
327+
// Helper to create mockSubConn for UpdateState calls
328+
func (m mockSubConn) UpdateState(state balancer.SubConnState) {
329+
if cc, ok := m.disp.cc.(*mockClientConn); ok {
330+
if sc, found := cc.subConns[m.Name]; found {
331+
sc.UpdateState(state)
332+
}
333+
}
275334
}
276335

277336
type mockRouter struct{ invalidated string }

broker/protocol/journal_spec_extensions.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func (m *JournalSpec_Fragment) Validate() error {
149149
// Ensure the PathPostfixTemplate parses and evaluates without
150150
// error over a zero-valued struct having the proper shape.
151151
if tpl, err := template.New("postfix").Parse(m.PathPostfixTemplate); err != nil {
152-
return ExtendContext(NewValidationError(err.Error()), "PathPostfixTemplate")
152+
return ExtendContext(&ValidationError{Err: err}, "PathPostfixTemplate")
153153
} else if err = tpl.Execute(io.Discard, struct {
154154
Spool struct {
155155
Fragment
@@ -158,7 +158,7 @@ func (m *JournalSpec_Fragment) Validate() error {
158158
}
159159
JournalSpec
160160
}{}); err != nil {
161-
return ExtendContext(NewValidationError(err.Error()), "PathPostfixTemplate")
161+
return ExtendContext(&ValidationError{Err: err}, "PathPostfixTemplate")
162162
}
163163

164164
// Retention requires no explicit validation (all values permitted).

cmd/gazctl/gazctlcmd/journals_fragments.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (cmd *cmdJournalsFragments) outputTable(responses []*pb.FragmentsResponse)
145145
if cmd.SigTTL != 0 {
146146
headers = append(headers, "URL")
147147
}
148-
table.SetHeader(headers)
148+
table.Header(headers)
149149

150150
for _, r := range responses {
151151
if r == nil {

cmd/gazctl/gazctlcmd/journals_list.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (cmd *cmdJournalsList) outputTable(resp *pb.ListResponse) {
106106
headers = append(headers, "Stores")
107107
}
108108
headers = append(headers, cmd.Labels...)
109-
table.SetHeader(headers)
109+
table.Header(headers)
110110

111111
for _, j := range resp.Journals {
112112
var primary = "<none>"

cmd/gazctl/gazctlcmd/shards_list.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (cmd *cmdShardsList) outputTable(resp *pc.ListResponse) {
9191
rjc = ShardsCfg.Broker.MustRoutedJournalClient(ctx)
9292
}
9393

94-
table.SetHeader(headers)
94+
table.Header(headers)
9595

9696
for _, j := range resp.Shards {
9797
var primary = "<none>"

consumer/recovery.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package consumer
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"os"
89
"strings"
910
"time"
1011

11-
"github.com/pkg/errors"
1212
log "github.com/sirupsen/logrus"
1313
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
1414
clientv3 "go.etcd.io/etcd/client/v3"
@@ -38,7 +38,7 @@ func fetchHints(ctx context.Context, spec *pc.ShardSpec, etcd *clientv3.Client)
3838
out.log = spec.RecoveryLog()
3939

4040
if out.txnResp, err = etcd.Txn(ctx).If().Then(ops...).Commit(); err != nil {
41-
err = errors.WithMessage(err, "fetching ShardSpec.HintKeys")
41+
err = fmt.Errorf("fetching ShardSpec.HintKeys: %w", err)
4242
return
4343
}
4444

@@ -53,20 +53,20 @@ func fetchHints(ctx context.Context, spec *pc.ShardSpec, etcd *clientv3.Client)
5353
// Sense whether JSON or proto encoding is used by testing for opening '{'.
5454
if kvs[0].Value[0] != '{' {
5555
if err = h.Unmarshal(kvs[0].Value); err != nil {
56-
err = errors.WithMessage(err, "hints.Unmarshal")
56+
err = fmt.Errorf("hints.Unmarshal: %w", err)
5757
}
5858
} else {
5959
if err = json.Unmarshal(kvs[0].Value, h); err != nil {
60-
err = errors.WithMessage(err, "json.Unmarshal(hints)")
60+
err = fmt.Errorf("json.Unmarshal(hints): %w", err)
6161
}
6262
}
6363

6464
if err != nil {
6565
// Pass.
6666
} else if _, err = recoverylog.NewFSM(*h); err != nil {
67-
err = errors.WithMessage(err, "validating FSMHints")
67+
err = fmt.Errorf("validating FSMHints: %w", err)
6868
} else if h.Log != out.log {
69-
err = errors.Errorf("hints.Log %s != ShardSpec.RecoveryLog %s", h.Log, out.log)
69+
err = fmt.Errorf("hints.Log %s != ShardSpec.RecoveryLog %s", h.Log, out.log)
7070
}
7171

7272
if err != nil {
@@ -100,7 +100,7 @@ func storeRecordedHints(s *shard, hints recoverylog.FSMHints) error {
100100

101101
var val, err = json.Marshal(hints)
102102
if err != nil {
103-
return errors.WithMessage(err, "json.Marshal(hints)")
103+
return fmt.Errorf("json.Marshal(hints): %w", err)
104104
}
105105
// TODO(johnny): Switch over to hints.Marshal() when proto decode support is deployed.
106106
/*
@@ -149,12 +149,12 @@ func storeRecoveredHints(s *shard, hints recoverylog.FSMHints) error {
149149
// rotate the current value at backups[1] => backups[2], and so on.
150150
var val []byte
151151
if val, err = json.Marshal(hints); err != nil {
152-
return errors.WithMessage(err, "json.Marshal(hints)")
152+
return fmt.Errorf("json.Marshal(hints): %w", err)
153153
}
154154
// TODO(johnny): Switch over to hints.Marshal() when proto decode support is deployed.
155155
/*
156156
if val, err = hints.Marshal(); err != nil {
157-
return errors.WithMessage(err, "hints.Marshal")
157+
return fmt.Errorf("hints.Marshal(): %w", err)
158158
}
159159
*/
160160

@@ -203,7 +203,7 @@ func beginRecovery(s *shard) error {
203203
&pc.GetHintsRequest{Shard: spec.Id})
204204

205205
if err == nil && s.recovery.hints.Status != pc.Status_OK {
206-
err = fmt.Errorf(s.recovery.hints.Status.String())
206+
err = errors.New(s.recovery.hints.Status.String())
207207
}
208208
if err != nil {
209209
return fmt.Errorf("GetHints: %w", err)
@@ -212,15 +212,15 @@ func beginRecovery(s *shard) error {
212212

213213
// Verify the |pickedHints| recovery log exists, and is of the correct Content-Type.
214214
if logSpec, err := client.GetJournal(s.ctx, s.ajc, pickedHints.Log); err != nil {
215-
return errors.WithMessage(err, "fetching log spec")
215+
return fmt.Errorf("fetching log spec: %w", err)
216216
} else if ct := logSpec.LabelSet.ValueOf(labels.ContentType); ct != labels.ContentType_RecoveryLog {
217-
return errors.Errorf("expected label %s value %s (got %v)", labels.ContentType, labels.ContentType_RecoveryLog, ct)
217+
return fmt.Errorf("expected label %s value %s (got %v)", labels.ContentType, labels.ContentType_RecoveryLog, ct)
218218
}
219219

220220
// Create local temporary directory into which we recover.
221221
var dir string
222222
if dir, err = os.MkdirTemp("", strings.ReplaceAll(spec.Id.String(), "/", "_")+"-"); err != nil {
223-
return errors.WithMessage(err, "creating shard working directory")
223+
return fmt.Errorf("creating shard working directory: %w", err)
224224
}
225225

226226
log.WithFields(log.Fields{
@@ -231,7 +231,7 @@ func beginRecovery(s *shard) error {
231231

232232
// Finally, play back the log.
233233
if err = s.recovery.player.Play(s.ctx, pickedHints, dir, s.ajc); err != nil {
234-
return errors.WithMessagef(err, "playing log %s", pickedHints.Log)
234+
return fmt.Errorf("playing log %s: %w", pickedHints.Log, err)
235235
}
236236
return nil
237237
}
@@ -260,7 +260,7 @@ func completeRecovery(s *shard) (_ pc.Checkpoint, err error) {
260260

261261
var recovered = s.recovery.player.Resolved
262262
if recovered.FSM == nil {
263-
err = errors.Errorf("completeRecovery aborting due to log playback failure")
263+
err = errors.New("completeRecovery aborting due to log playback failure")
264264
return
265265
}
266266

@@ -284,9 +284,9 @@ func completeRecovery(s *shard) (_ pc.Checkpoint, err error) {
284284
// cleanup of the playback directory now.
285285
_ = os.RemoveAll(s.recovery.recorder.Dir())
286286
}
287-
return cp, errors.WithMessage(err, "app.NewStore")
287+
return cp, fmt.Errorf("app.NewStore: %w", err)
288288
} else if cp, err = s.store.RestoreCheckpoint(s); err != nil {
289-
return cp, errors.WithMessage(err, "store.RestoreCheckpoint")
289+
return cp, fmt.Errorf("store.RestoreCheckpoint: %w", err)
290290
}
291291

292292
// Store |recoveredHints| as a backup. We do this _after_ restoring the

examples/bike-share/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (app *Application) ServeBikeHistory(w http.ResponseWriter, r *http.Request)
6565
}); err != nil {
6666
return
6767
} else if res.Status != pc.Status_OK {
68-
err = fmt.Errorf(res.Status.String())
68+
err = errors.New(res.Status.String())
6969
return
7070
} else if res.Store == nil {
7171
// Shard is assigned to peer.

0 commit comments

Comments
 (0)