Skip to content

Commit 3003c0e

Browse files
committed
shards_prune: skip unhealthy stores when pruning shards
1 parent 5af7417 commit 3003c0e

File tree

2 files changed

+53
-2
lines changed

2 files changed

+53
-2
lines changed

broker/stores/health_check.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func checkLoop(stores map[pb.FragmentStore]*ActiveStore, fs pb.FragmentStore, at
6262
"error": err,
6363
"attempt": attempt,
6464
"interval": checkInterval,
65+
"authError": active.IsAuthError(err),
6566
}).Warn("store health check failed")
6667
attempt += 1
6768
}

cmd/gazctl/gazctlcmd/shards_prune.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,20 @@ func (cmd *cmdShardsPrune) Execute([]string) error {
5555
for _, shard := range listShards(rsc, cmd.Selector).Shards {
5656
metrics.shardsTotal++
5757

58+
var recoveryLog = shard.Spec.RecoveryLog()
59+
60+
// Check if the recovery log's fragment stores are healthy before fetching hints.
61+
// Skip this shard if any store is unhealthy to avoid hanging.
62+
if !checkRecoveryLogStoresHealth(ctx, rjc, recoveryLog) {
63+
skipRecoveryLogs[recoveryLog] = true
64+
continue
65+
}
66+
5867
var allHints, err = consumer.FetchHints(ctx, rsc, &pc.GetHintsRequest{
5968
Shard: shard.Spec.Id,
6069
})
6170
mbp.Must(err, "failed to fetch hints")
6271

63-
var recoveryLog = shard.Spec.RecoveryLog()
6472
rawHintsResponses[recoveryLog] = append(rawHintsResponses[recoveryLog], *allHints)
6573

6674
for _, curHints := range append(allHints.BackupHints, allHints.PrimaryHints) {
@@ -94,7 +102,7 @@ func (cmd *cmdShardsPrune) Execute([]string) error {
94102

95103
for journal, segments := range logSegmentSets {
96104
if skipRecoveryLogs[journal] {
97-
log.WithField("journal", journal).Warn("skipping journal because a shard is missing hints that cover it")
105+
log.WithField("journal", journal).Warn("skipping journal because a shard is missing hints that cover it or has unlhealthy store")
98106
continue
99107
}
100108
log.WithField("journal", journal).Debug("checking fragments of journal")
@@ -162,6 +170,48 @@ func (cmd *cmdShardsPrune) Execute([]string) error {
162170
return nil
163171
}
164172

173+
// checkRecoveryLogStoresHealth checks if all fragment stores for the given recovery log journal are healthy.
174+
// Returns true if all stores are healthy, false otherwise.
175+
func checkRecoveryLogStoresHealth(ctx context.Context, jc pb.JournalClient, recoveryLog pb.Journal) bool {
176+
var journalSpec, err = client.GetJournal(ctx, jc, recoveryLog)
177+
if err != nil {
178+
log.WithFields(log.Fields{
179+
"journal": recoveryLog,
180+
"error": err,
181+
}).Warn("failed to fetch journal spec for recovery log")
182+
return false
183+
}
184+
185+
if len(journalSpec.Fragment.Stores) == 0 {
186+
// No stores configured, consider healthy (journal is usable for real-time streaming)
187+
return true
188+
}
189+
190+
for _, store := range journalSpec.Fragment.Stores {
191+
var resp, err = client.FragmentStoreHealth(ctx, jc, store)
192+
if err != nil {
193+
log.WithFields(log.Fields{
194+
"journal": recoveryLog,
195+
"store": store,
196+
"error": err,
197+
}).Warn("failed to check fragment store health")
198+
return false
199+
}
200+
201+
if resp.Status != pb.Status_OK {
202+
log.WithFields(log.Fields{
203+
"journal": recoveryLog,
204+
"store": store,
205+
"status": resp.Status,
206+
"storeError": resp.StoreHealthError,
207+
}).Warn("fragment store is unhealthy")
208+
return false
209+
}
210+
}
211+
212+
return true
213+
}
214+
165215
func overlapsAnySegment(segments []recoverylog.Segment, fragment pb.Fragment) bool {
166216
for _, seg := range segments {
167217
if (seg.FirstOffset < fragment.End || fragment.End == 0) &&

0 commit comments

Comments
 (0)