Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions broker/protocol/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func (d *dispatcher) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// and terminates the period sweep channel.
func (d *dispatcher) Close() { close(d.sweepDoneCh) }

// ExitIdle is a no-op for dispatcher as it manages SubConn lifecycle directly
// based on dispatch routes rather than connection state.
func (d *dispatcher) ExitIdle() {}

// less defines an ordering over ProcessSpec_ID preferences used by dispatcher.
func (d *dispatcher) less(lhs, rhs ProcessSpec_ID) bool {
// Always prefer a defined ProcessSpec_ID over the zero-valued one
Expand Down
127 changes: 93 additions & 34 deletions broker/protocol/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
gc "gopkg.in/check.v1"
Expand Down Expand Up @@ -43,8 +44,8 @@ func (s *DispatcherSuite) TestContextAdapters(c *gc.C) {
}

func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
var cc mockClientConn
var disp = dispatcherBuilder{zone: "local"}.Build(&cc, balancer.BuildOptions{}).(*dispatcher)
var cc = newMockClientConn()
var disp = dispatcherBuilder{zone: "local"}.Build(cc, balancer.BuildOptions{}).(*dispatcher)
cc.disp = disp
close(disp.sweepDoneCh) // Disable async sweeping.

Expand All @@ -68,7 +69,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
result, err := disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.IsNil)
c.Check(result.Done, gc.IsNil)
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "default.addr:80", disp: disp})
c.Check(result.SubConn.(*testSubConnWrapper).name, gc.Equals, "default.addr:80")

// Case: Specific remote peer is dispatched to.
ctx = WithDispatchRoute(context.Background(),
Expand All @@ -84,7 +85,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.IsNil)
c.Check(result.Done, gc.IsNil)
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "remote.addr:80", disp: disp})
c.Check(result.SubConn.(*testSubConnWrapper).name, gc.Equals, "remote.addr:80")

// Case: Route allows for multiple members. A local one is now dialed.
ctx = WithDispatchRoute(context.Background(), buildRouteFixture(), ProcessSpec_ID{})
Expand All @@ -99,7 +100,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.IsNil)
c.Check(result.Done, gc.IsNil)
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "local.addr:80", disp: disp})
c.Check(result.SubConn.(*testSubConnWrapper).name, gc.Equals, "local.addr:80")

// Case: One local addr is marked as failed. Another is dialed.
mockSubConn{Name: "local.addr:80", disp: disp}.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
Expand All @@ -114,7 +115,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.IsNil)
c.Check(result.Done, gc.IsNil)
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "local.otherAddr:80", disp: disp})
c.Check(result.SubConn.(*testSubConnWrapper).name, gc.Equals, "local.otherAddr:80")

// Case: otherAddr is also failed. Expect that an error is returned,
// rather than dispatch to remote addr. (Eg we prefer to wait for a
Expand Down Expand Up @@ -151,7 +152,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.IsNil)
c.Check(result.Done, gc.NotNil)
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "local.addr:80", disp: disp})
c.Check(result.SubConn.(*testSubConnWrapper).name, gc.Equals, "local.addr:80")

// Closure callback with an Unavailable error (only) will trigger an invalidation.
result.Done(balancer.DoneInfo{Err: nil})
Expand All @@ -163,8 +164,8 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
}

func (s *DispatcherSuite) TestDispatchMarkAndSweep(c *gc.C) {
var cc mockClientConn
var disp = dispatcherBuilder{zone: "local"}.Build(&cc, balancer.BuildOptions{}).(*dispatcher)
var cc = newMockClientConn()
var disp = dispatcherBuilder{zone: "local"}.Build(cc, balancer.BuildOptions{}).(*dispatcher)
cc.disp = disp
defer disp.Close()

Expand Down Expand Up @@ -233,45 +234,103 @@ func (s *DispatcherSuite) TestDispatchMarkAndSweep(c *gc.C) {
c.Check(err, gc.IsNil)
}

type mockClientConn struct {
err error
created []mockSubConn
removed []mockSubConn
disp *dispatcher
// testSubConnWrapper wraps a test SubConn to track operations
type testSubConnWrapper struct {
balancer.SubConn
name string
disp *dispatcher
}

// mockSubConn represents a test SubConn for comparisons and state updates
type mockSubConn struct {
Name string
disp *dispatcher
}

func (s1 mockSubConn) Equal(s2 mockSubConn) bool {
return s1.Name == s2.Name
// mockClientConn implements balancer.ClientConn for testing
type mockClientConn struct {
balancer.ClientConn
err error
created []mockSubConn
removed []mockSubConn
disp *dispatcher
subConns map[string]*testSubConnWrapper
target string
}

func (s mockSubConn) UpdateAddresses([]resolver.Address) { panic("deprecated") }
func (s mockSubConn) UpdateState(state balancer.SubConnState) { s.disp.updateSubConnState(s, state) }
func (s mockSubConn) Connect() {}
func (s mockSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, func() {}
}
func (s mockSubConn) Shutdown() {
var c = s.disp.cc.(*mockClientConn)
c.removed = append(c.removed, s)
func newMockClientConn() *mockClientConn {
return &mockClientConn{
subConns: make(map[string]*testSubConnWrapper),
target: "default.addr:80", // Default target for tests
}
}

func (c *mockClientConn) NewSubConn(a []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
var sc = mockSubConn{Name: a[0].Addr, disp: c.disp}
c.created = append(c.created, sc)
return sc, c.err
func (c *mockClientConn) NewSubConn(a []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if c.err != nil {
return nil, c.err
}

name := a[0].Addr
sc := &testSubConnWrapper{
name: name,
disp: c.disp,
}

c.subConns[name] = sc
c.created = append(c.created, mockSubConn{Name: name, disp: c.disp})

// StateListener is handled by the gRPC framework

return sc, nil
}

func (c *mockClientConn) UpdateAddresses(balancer.SubConn, []resolver.Address) { panic("deprecated") }
func (c *mockClientConn) UpdateState(balancer.State) {}
func (c *mockClientConn) ResolveNow(resolver.ResolveNowOptions) {}
func (c *mockClientConn) Target() string { return "default.addr:80" }
func (c *mockClientConn) UpdateState(state balancer.State) {}

func (c *mockClientConn) ResolveNow(resolver.ResolveNowOptions) {}

func (c *mockClientConn) Target() string { return c.target }

func (c *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
if tsc, ok := sc.(*testSubConnWrapper); ok {
c.removed = append(c.removed, mockSubConn{Name: tsc.name, disp: tsc.disp})
delete(c.subConns, tsc.name)
}
}

func (c *mockClientConn) MetricsRecorder() stats.MetricsRecorder { return nil }

// Additional fields for testSubConnWrapper
var _ balancer.SubConn = (*testSubConnWrapper)(nil)

func (s *testSubConnWrapper) UpdateAddresses([]resolver.Address) { panic("deprecated") }

func (s *testSubConnWrapper) UpdateState(state balancer.SubConnState) {
if s.disp != nil {
s.disp.updateSubConnState(s, state)
}
}

func (s *testSubConnWrapper) Connect() {}

func (s *testSubConnWrapper) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, func() {}
}

func (s *testSubConnWrapper) Shutdown() {
if cc, ok := s.disp.cc.(*mockClientConn); ok {
cc.removed = append(cc.removed, mockSubConn{Name: s.name, disp: s.disp})
}
}

func (s *testSubConnWrapper) RegisterHealthListener(func(balancer.SubConnState)) {}

// Helper to create mockSubConn for UpdateState calls
func (m mockSubConn) UpdateState(state balancer.SubConnState) {
if cc, ok := m.disp.cc.(*mockClientConn); ok {
if sc, found := cc.subConns[m.Name]; found {
sc.UpdateState(state)
}
}
}

type mockRouter struct{ invalidated string }
Expand Down
4 changes: 2 additions & 2 deletions broker/protocol/journal_spec_extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (m *JournalSpec_Fragment) Validate() error {
// Ensure the PathPostfixTemplate parses and evaluates without
// error over a zero-valued struct having the proper shape.
if tpl, err := template.New("postfix").Parse(m.PathPostfixTemplate); err != nil {
return ExtendContext(NewValidationError(err.Error()), "PathPostfixTemplate")
return ExtendContext(&ValidationError{Err: err}, "PathPostfixTemplate")
} else if err = tpl.Execute(io.Discard, struct {
Spool struct {
Fragment
Expand All @@ -158,7 +158,7 @@ func (m *JournalSpec_Fragment) Validate() error {
}
JournalSpec
}{}); err != nil {
return ExtendContext(NewValidationError(err.Error()), "PathPostfixTemplate")
return ExtendContext(&ValidationError{Err: err}, "PathPostfixTemplate")
}

// Retention requires no explicit validation (all values permitted).
Expand Down
2 changes: 1 addition & 1 deletion cmd/gazctl/gazctlcmd/journals_fragments.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (cmd *cmdJournalsFragments) outputTable(responses []*pb.FragmentsResponse)
if cmd.SigTTL != 0 {
headers = append(headers, "URL")
}
table.SetHeader(headers)
table.Header(headers)

for _, r := range responses {
if r == nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/gazctl/gazctlcmd/journals_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (cmd *cmdJournalsList) outputTable(resp *pb.ListResponse) {
headers = append(headers, "Stores")
}
headers = append(headers, cmd.Labels...)
table.SetHeader(headers)
table.Header(headers)

for _, j := range resp.Journals {
var primary = "<none>"
Expand Down
2 changes: 1 addition & 1 deletion cmd/gazctl/gazctlcmd/shards_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (cmd *cmdShardsList) outputTable(resp *pc.ListResponse) {
rjc = ShardsCfg.Broker.MustRoutedJournalClient(ctx)
}

table.SetHeader(headers)
table.Header(headers)

for _, j := range resp.Shards {
var primary = "<none>"
Expand Down
34 changes: 17 additions & 17 deletions consumer/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package consumer
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -38,7 +38,7 @@ func fetchHints(ctx context.Context, spec *pc.ShardSpec, etcd *clientv3.Client)
out.log = spec.RecoveryLog()

if out.txnResp, err = etcd.Txn(ctx).If().Then(ops...).Commit(); err != nil {
err = errors.WithMessage(err, "fetching ShardSpec.HintKeys")
err = fmt.Errorf("fetching ShardSpec.HintKeys: %w", err)
return
}

Expand All @@ -53,20 +53,20 @@ func fetchHints(ctx context.Context, spec *pc.ShardSpec, etcd *clientv3.Client)
// Sense whether JSON or proto encoding is used by testing for opening '{'.
if kvs[0].Value[0] != '{' {
if err = h.Unmarshal(kvs[0].Value); err != nil {
err = errors.WithMessage(err, "hints.Unmarshal")
err = fmt.Errorf("hints.Unmarshal: %w", err)
}
} else {
if err = json.Unmarshal(kvs[0].Value, h); err != nil {
err = errors.WithMessage(err, "json.Unmarshal(hints)")
err = fmt.Errorf("json.Unmarshal(hints): %w", err)
}
}

if err != nil {
// Pass.
} else if _, err = recoverylog.NewFSM(*h); err != nil {
err = errors.WithMessage(err, "validating FSMHints")
err = fmt.Errorf("validating FSMHints: %w", err)
} else if h.Log != out.log {
err = errors.Errorf("hints.Log %s != ShardSpec.RecoveryLog %s", h.Log, out.log)
err = fmt.Errorf("hints.Log %s != ShardSpec.RecoveryLog %s", h.Log, out.log)
}

if err != nil {
Expand Down Expand Up @@ -100,7 +100,7 @@ func storeRecordedHints(s *shard, hints recoverylog.FSMHints) error {

var val, err = json.Marshal(hints)
if err != nil {
return errors.WithMessage(err, "json.Marshal(hints)")
return fmt.Errorf("json.Marshal(hints): %w", err)
}
// TODO(johnny): Switch over to hints.Marshal() when proto decode support is deployed.
/*
Expand Down Expand Up @@ -149,12 +149,12 @@ func storeRecoveredHints(s *shard, hints recoverylog.FSMHints) error {
// rotate the current value at backups[1] => backups[2], and so on.
var val []byte
if val, err = json.Marshal(hints); err != nil {
return errors.WithMessage(err, "json.Marshal(hints)")
return fmt.Errorf("json.Marshal(hints): %w", err)
}
// TODO(johnny): Switch over to hints.Marshal() when proto decode support is deployed.
/*
if val, err = hints.Marshal(); err != nil {
return errors.WithMessage(err, "hints.Marshal")
return fmt.Errorf("hints.Marshal(): %w", err)
}
*/

Expand Down Expand Up @@ -203,7 +203,7 @@ func beginRecovery(s *shard) error {
&pc.GetHintsRequest{Shard: spec.Id})

if err == nil && s.recovery.hints.Status != pc.Status_OK {
err = fmt.Errorf(s.recovery.hints.Status.String())
err = errors.New(s.recovery.hints.Status.String())
}
if err != nil {
return fmt.Errorf("GetHints: %w", err)
Expand All @@ -212,15 +212,15 @@ func beginRecovery(s *shard) error {

// Verify the |pickedHints| recovery log exists, and is of the correct Content-Type.
if logSpec, err := client.GetJournal(s.ctx, s.ajc, pickedHints.Log); err != nil {
return errors.WithMessage(err, "fetching log spec")
return fmt.Errorf("fetching log spec: %w", err)
} else if ct := logSpec.LabelSet.ValueOf(labels.ContentType); ct != labels.ContentType_RecoveryLog {
return errors.Errorf("expected label %s value %s (got %v)", labels.ContentType, labels.ContentType_RecoveryLog, ct)
return fmt.Errorf("expected label %s value %s (got %v)", labels.ContentType, labels.ContentType_RecoveryLog, ct)
}

// Create local temporary directory into which we recover.
var dir string
if dir, err = os.MkdirTemp("", strings.ReplaceAll(spec.Id.String(), "/", "_")+"-"); err != nil {
return errors.WithMessage(err, "creating shard working directory")
return fmt.Errorf("creating shard working directory: %w", err)
}

log.WithFields(log.Fields{
Expand All @@ -231,7 +231,7 @@ func beginRecovery(s *shard) error {

// Finally, play back the log.
if err = s.recovery.player.Play(s.ctx, pickedHints, dir, s.ajc); err != nil {
return errors.WithMessagef(err, "playing log %s", pickedHints.Log)
return fmt.Errorf("playing log %s: %w", pickedHints.Log, err)
}
return nil
}
Expand Down Expand Up @@ -260,7 +260,7 @@ func completeRecovery(s *shard) (_ pc.Checkpoint, err error) {

var recovered = s.recovery.player.Resolved
if recovered.FSM == nil {
err = errors.Errorf("completeRecovery aborting due to log playback failure")
err = errors.New("completeRecovery aborting due to log playback failure")
return
}

Expand All @@ -284,9 +284,9 @@ func completeRecovery(s *shard) (_ pc.Checkpoint, err error) {
// cleanup of the playback directory now.
_ = os.RemoveAll(s.recovery.recorder.Dir())
}
return cp, errors.WithMessage(err, "app.NewStore")
return cp, fmt.Errorf("app.NewStore: %w", err)
} else if cp, err = s.store.RestoreCheckpoint(s); err != nil {
return cp, errors.WithMessage(err, "store.RestoreCheckpoint")
return cp, fmt.Errorf("store.RestoreCheckpoint: %w", err)
}

// Store |recoveredHints| as a backup. We do this _after_ restoring the
Expand Down
2 changes: 1 addition & 1 deletion examples/bike-share/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (app *Application) ServeBikeHistory(w http.ResponseWriter, r *http.Request)
}); err != nil {
return
} else if res.Status != pc.Status_OK {
err = fmt.Errorf(res.Status.String())
err = errors.New(res.Status.String())
return
} else if res.Store == nil {
// Shard is assigned to peer.
Expand Down
Loading