Skip to content

Commit 1ad66d6

Browse files
committed
shards_prune: skip unhealthy stores when pruning shards
1 parent 5af7417 commit 1ad66d6

File tree

3 files changed

+71
-2
lines changed

3 files changed

+71
-2
lines changed

broker/stores/health_check.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func checkLoop(stores map[pb.FragmentStore]*ActiveStore, fs pb.FragmentStore, at
6262
"error": err,
6363
"attempt": attempt,
6464
"interval": checkInterval,
65+
"authError": active.IsAuthError(err),
6566
}).Warn("store health check failed")
6667
attempt += 1
6768
}

broker/stores/health_check_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ func TestHealthChecks(t *testing.T) {
6161
require.Equal(t, ".test/health-check", path)
6262
return server.URL + "/signed/test", nil
6363
},
64+
IsAuthErrorFunc: func(_ Store, err error) bool {
65+
return false
66+
},
6467
}, nil
6568
}
6669
},
@@ -77,6 +80,9 @@ func TestHealthChecks(t *testing.T) {
7780
GetFunc: func(_ Store, ctx context.Context, path string) (io.ReadCloser, error) {
7881
return nil, errors.New("simulated GET failure")
7982
},
83+
IsAuthErrorFunc: func(_ Store, err error) bool {
84+
return false
85+
},
8086
}, nil
8187
}
8288
},
@@ -99,6 +105,9 @@ func TestHealthChecks(t *testing.T) {
99105
SignGetFunc: func(_ Store, path string, d time.Duration) (string, error) {
100106
return server.URL + "/wrong", nil
101107
},
108+
IsAuthErrorFunc: func(_ Store, err error) bool {
109+
return false
110+
},
102111
}, nil
103112
}
104113
},
@@ -115,6 +124,9 @@ func TestHealthChecks(t *testing.T) {
115124
GetFunc: func(_ Store, ctx context.Context, path string) (io.ReadCloser, error) {
116125
return nil, nil // Return nil reader without error
117126
},
127+
IsAuthErrorFunc: func(_ Store, err error) bool {
128+
return false
129+
},
118130
}, nil
119131
}
120132
},
@@ -132,6 +144,9 @@ func TestHealthChecks(t *testing.T) {
132144
// Return a reader that fails on Read
133145
return io.NopCloser(errorReader{}), nil
134146
},
147+
IsAuthErrorFunc: func(_ Store, err error) bool {
148+
return false
149+
},
135150
}, nil
136151
}
137152
},
@@ -160,6 +175,9 @@ func TestHealthChecks(t *testing.T) {
160175
SignGetFunc: func(_ Store, path string, d time.Duration) (string, error) {
161176
return server.URL + "/signed/test", nil
162177
},
178+
IsAuthErrorFunc: func(_ Store, err error) bool {
179+
return false
180+
},
163181
}, nil
164182
}
165183
},

cmd/gazctl/gazctlcmd/shards_prune.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,20 @@ func (cmd *cmdShardsPrune) Execute([]string) error {
5555
for _, shard := range listShards(rsc, cmd.Selector).Shards {
5656
metrics.shardsTotal++
5757

58+
var recoveryLog = shard.Spec.RecoveryLog()
59+
60+
// Check if the recovery log's fragment stores are healthy before fetching hints.
61+
// Skip this shard if any store is unhealthy to avoid hanging.
62+
if !checkRecoveryLogStoresHealth(ctx, rjc, recoveryLog) {
63+
skipRecoveryLogs[recoveryLog] = true
64+
continue
65+
}
66+
5867
var allHints, err = consumer.FetchHints(ctx, rsc, &pc.GetHintsRequest{
5968
Shard: shard.Spec.Id,
6069
})
6170
mbp.Must(err, "failed to fetch hints")
6271

63-
var recoveryLog = shard.Spec.RecoveryLog()
6472
rawHintsResponses[recoveryLog] = append(rawHintsResponses[recoveryLog], *allHints)
6573

6674
for _, curHints := range append(allHints.BackupHints, allHints.PrimaryHints) {
@@ -94,7 +102,7 @@ func (cmd *cmdShardsPrune) Execute([]string) error {
94102

95103
for journal, segments := range logSegmentSets {
96104
if skipRecoveryLogs[journal] {
97-
log.WithField("journal", journal).Warn("skipping journal because a shard is missing hints that cover it")
105+
log.WithField("journal", journal).Warn("skipping journal because a shard is missing hints that cover it or has unlhealthy store")
98106
continue
99107
}
100108
log.WithField("journal", journal).Debug("checking fragments of journal")
@@ -162,6 +170,48 @@ func (cmd *cmdShardsPrune) Execute([]string) error {
162170
return nil
163171
}
164172

173+
// checkRecoveryLogStoresHealth checks if all fragment stores for the given recovery log journal are healthy.
174+
// Returns true if all stores are healthy, false otherwise.
175+
func checkRecoveryLogStoresHealth(ctx context.Context, jc pb.JournalClient, recoveryLog pb.Journal) bool {
176+
var journalSpec, err = client.GetJournal(ctx, jc, recoveryLog)
177+
if err != nil {
178+
log.WithFields(log.Fields{
179+
"journal": recoveryLog,
180+
"error": err,
181+
}).Warn("failed to fetch journal spec for recovery log")
182+
return false
183+
}
184+
185+
if len(journalSpec.Fragment.Stores) == 0 {
186+
// No stores configured, consider healthy
187+
return true
188+
}
189+
190+
for _, store := range journalSpec.Fragment.Stores {
191+
var resp, err = client.FragmentStoreHealth(ctx, jc, store)
192+
if err != nil {
193+
log.WithFields(log.Fields{
194+
"journal": recoveryLog,
195+
"store": store,
196+
"error": err,
197+
}).Warn("failed to check fragment store health")
198+
return false
199+
}
200+
201+
if resp.Status != pb.Status_OK {
202+
log.WithFields(log.Fields{
203+
"journal": recoveryLog,
204+
"store": store,
205+
"status": resp.Status,
206+
"storeError": resp.StoreHealthError,
207+
}).Warn("fragment store is unhealthy")
208+
return false
209+
}
210+
}
211+
212+
return true
213+
}
214+
165215
func overlapsAnySegment(segments []recoverylog.Segment, fragment pb.Fragment) bool {
166216
for _, seg := range segments {
167217
if (seg.FirstOffset < fragment.End || fragment.End == 0) &&

0 commit comments

Comments
 (0)