Skip to content

Commit 9ebcb61

Browse files
authored
Improve metastore observability (#4580)
* Add basic tracing to metastore endpoints * Add tracing to Raft and boltdb * Add tracing to Raft and boltdb (part 2) * Simplify tracing for Raft handlers * Fix linter errors * Add even more tracing * Minor touch ups * Make propagation safer * Add span events for FSM.Propose * Tidy up some more * Fix tests * Refactor context registry * Refactor metrics
1 parent 13cbb35 commit 9ebcb61

14 files changed

+705
-73
lines changed

pkg/metastore/compaction_raft_handler.go

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
package metastore
22

33
import (
4+
"context"
5+
46
"github.com/go-kit/log"
57
"github.com/go-kit/log/level"
68
"github.com/hashicorp/raft"
9+
"github.com/opentracing/opentracing-go/ext"
710
"go.etcd.io/bbolt"
811

912
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
1013
"github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1/raft_log"
1114
"github.com/grafana/pyroscope/pkg/metastore/compaction"
15+
"github.com/grafana/pyroscope/pkg/metastore/tracing"
1216
)
1317

1418
type IndexReplacer interface {
@@ -43,8 +47,20 @@ func NewCompactionCommandHandler(
4347
}
4448

4549
func (h *CompactionCommandHandler) GetCompactionPlanUpdate(
46-
tx *bbolt.Tx, cmd *raft.Log, req *raft_log.GetCompactionPlanUpdateRequest,
47-
) (*raft_log.GetCompactionPlanUpdateResponse, error) {
50+
ctx context.Context, tx *bbolt.Tx, cmd *raft.Log, req *raft_log.GetCompactionPlanUpdateRequest,
51+
) (resp *raft_log.GetCompactionPlanUpdateResponse, err error) {
52+
span, _ := tracing.StartSpanFromContext(ctx, "raft.GetCompactionPlanUpdate")
53+
span.SetTag("status_updates", len(req.StatusUpdates))
54+
span.SetTag("assign_jobs_max", req.AssignJobsMax)
55+
span.SetTag("raft_log_index", cmd.Index)
56+
span.SetTag("raft_log_term", cmd.Term)
57+
defer func() {
58+
if err != nil {
59+
ext.LogError(span, err)
60+
}
61+
span.Finish()
62+
}()
63+
4864
// We need to generate a plan of the update caused by the new status
4965
// report from the worker. The plan will be used to update the schedule
5066
// after the Raft consensus is reached.
@@ -132,12 +148,26 @@ func (h *CompactionCommandHandler) GetCompactionPlanUpdate(
132148
})
133149
}
134150

151+
span.SetTag("assigned_jobs", len(p.AssignedJobs))
152+
span.SetTag("new_jobs", len(p.NewJobs))
153+
span.SetTag("evicted_jobs", len(p.EvictedJobs))
135154
return &raft_log.GetCompactionPlanUpdateResponse{Term: cmd.Term, PlanUpdate: p}, nil
136155
}
137156

138157
func (h *CompactionCommandHandler) UpdateCompactionPlan(
139-
tx *bbolt.Tx, cmd *raft.Log, req *raft_log.UpdateCompactionPlanRequest,
140-
) (*raft_log.UpdateCompactionPlanResponse, error) {
158+
ctx context.Context, tx *bbolt.Tx, cmd *raft.Log, req *raft_log.UpdateCompactionPlanRequest,
159+
) (resp *raft_log.UpdateCompactionPlanResponse, err error) {
160+
span, _ := tracing.StartSpanFromContext(ctx, "raft.UpdateCompactionPlan")
161+
span.SetTag("raft_log_index", cmd.Index)
162+
span.SetTag("raft_log_term", cmd.Term)
163+
span.SetTag("request_term", req.Term)
164+
defer func() {
165+
if err != nil {
166+
ext.LogError(span, err)
167+
}
168+
span.Finish()
169+
}()
170+
141171
if req.Term != cmd.Term || req.GetPlanUpdate() == nil {
142172
level.Warn(h.logger).Log(
143173
"msg", "rejecting compaction plan update; term mismatch: leader has changed",
@@ -147,18 +177,18 @@ func (h *CompactionCommandHandler) UpdateCompactionPlan(
147177
return new(raft_log.UpdateCompactionPlanResponse), nil
148178
}
149179

150-
if err := h.planner.UpdatePlan(tx, req.PlanUpdate); err != nil {
180+
if err = h.planner.UpdatePlan(tx, req.PlanUpdate); err != nil {
151181
level.Error(h.logger).Log("msg", "failed to update compaction planner", "err", err)
152182
return nil, err
153183
}
154184

155-
if err := h.scheduler.UpdateSchedule(tx, req.PlanUpdate); err != nil {
185+
if err = h.scheduler.UpdateSchedule(tx, req.PlanUpdate); err != nil {
156186
level.Error(h.logger).Log("msg", "failed to update compaction schedule", "err", err)
157187
return nil, err
158188
}
159189

160190
for _, job := range req.PlanUpdate.NewJobs {
161-
if err := h.tombstones.DeleteTombstones(tx, cmd, job.Plan.Tombstones...); err != nil {
191+
if err = h.tombstones.DeleteTombstones(tx, cmd, job.Plan.Tombstones...); err != nil {
162192
level.Error(h.logger).Log("msg", "failed to delete tombstones", "err", err)
163193
return nil, err
164194
}
@@ -170,22 +200,25 @@ func (h *CompactionCommandHandler) UpdateCompactionPlan(
170200
level.Warn(h.logger).Log("msg", "compacted blocks are missing; skipping", "job", job.State.Name)
171201
continue
172202
}
173-
if err := h.tombstones.AddTombstones(tx, cmd, blockTombstonesForCompletedJob(job)); err != nil {
203+
if err = h.tombstones.AddTombstones(tx, cmd, blockTombstonesForCompletedJob(job)); err != nil {
174204
level.Error(h.logger).Log("msg", "failed to add tombstones", "err", err)
175205
return nil, err
176206
}
177207
for _, block := range compacted.NewBlocks {
178-
if err := h.compactor.Compact(tx, compaction.NewBlockEntry(cmd, block)); err != nil {
208+
if err = h.compactor.Compact(tx, compaction.NewBlockEntry(cmd, block)); err != nil {
179209
level.Error(h.logger).Log("msg", "failed to compact block", "err", err)
180210
return nil, err
181211
}
182212
}
183-
if err := h.index.ReplaceBlocks(tx, compacted); err != nil {
213+
if err = h.index.ReplaceBlocks(tx, compacted); err != nil {
184214
level.Error(h.logger).Log("msg", "failed to replace blocks", "err", err)
185215
return nil, err
186216
}
187217
}
188218

219+
span.SetTag("new_jobs", len(req.PlanUpdate.NewJobs))
220+
span.SetTag("completed_jobs", len(req.PlanUpdate.CompletedJobs))
221+
span.SetTag("updated_jobs", len(req.PlanUpdate.UpdatedJobs))
189222
return &raft_log.UpdateCompactionPlanResponse{PlanUpdate: req.PlanUpdate}, nil
190223
}
191224

pkg/metastore/compaction_service.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66

77
"github.com/go-kit/log"
88
"github.com/go-kit/log/level"
9+
"github.com/opentracing/opentracing-go"
10+
"github.com/opentracing/opentracing-go/ext"
911
"google.golang.org/grpc/codes"
1012
"google.golang.org/grpc/status"
1113

@@ -34,9 +36,20 @@ func NewCompactionService(
3436
}
3537

3638
func (svc *CompactionService) PollCompactionJobs(
37-
_ context.Context,
39+
ctx context.Context,
3840
req *metastorev1.PollCompactionJobsRequest,
39-
) (*metastorev1.PollCompactionJobsResponse, error) {
41+
) (resp *metastorev1.PollCompactionJobsResponse, err error) {
42+
span, ctx := opentracing.StartSpanFromContext(ctx, "CompactionService.PollCompactionJobs")
43+
defer func() {
44+
if err != nil {
45+
ext.LogError(span, err)
46+
}
47+
span.Finish()
48+
}()
49+
50+
span.SetTag("status_updates", len(req.GetStatusUpdates()))
51+
span.SetTag("job_capacity", req.GetJobCapacity())
52+
4053
// This is a two-step process. To commit changes to the compaction plan,
4154
// we need to ensure that all replicas apply exactly the same changes.
4255
// Instead of relying on identical behavior across replicas and a
@@ -77,14 +90,14 @@ func (svc *CompactionService) PollCompactionJobs(
7790
}
7891

7992
cmd := fsm.RaftLogEntryType(raft_log.RaftCommand_RAFT_COMMAND_GET_COMPACTION_PLAN_UPDATE)
80-
resp, err := svc.raft.Propose(cmd, req)
93+
proposeResp, err := svc.raft.Propose(ctx, cmd, req)
8194
if err != nil {
8295
if !raftnode.IsRaftLeadershipError(err) {
8396
level.Error(svc.logger).Log("msg", "failed to prepare compaction plan", "err", err)
8497
}
8598
return nil, err
8699
}
87-
prepared := resp.(*raft_log.GetCompactionPlanUpdateResponse)
100+
prepared := proposeResp.(*raft_log.GetCompactionPlanUpdateResponse)
88101
planUpdate := prepared.GetPlanUpdate()
89102

90103
// Copy plan updates to the worker response. The job plan is only sent for
@@ -143,19 +156,22 @@ func (svc *CompactionService) PollCompactionJobs(
143156
// scenario, and we don't want to stop the node/cluster). Instead, an
144157
// empty response would indicate that the plan is rejected.
145158
proposal := &raft_log.UpdateCompactionPlanRequest{Term: prepared.Term, PlanUpdate: planUpdate}
146-
if resp, err = svc.raft.Propose(cmd, proposal); err != nil {
159+
if proposeResp, err = svc.raft.Propose(ctx, cmd, proposal); err != nil {
147160
if !raftnode.IsRaftLeadershipError(err) {
148161
level.Error(svc.logger).Log("msg", "failed to update compaction plan", "err", err)
149162
}
150163
return nil, err
151164
}
152-
accepted := resp.(*raft_log.UpdateCompactionPlanResponse).GetPlanUpdate()
165+
accepted := proposeResp.(*raft_log.UpdateCompactionPlanResponse).GetPlanUpdate()
153166
if accepted == nil {
154167
level.Warn(svc.logger).Log("msg", "compaction plan update rejected")
155168
return nil, status.Error(codes.FailedPrecondition, "failed to update compaction plan")
156169
}
157170

158171
// As of now, accepted plan always matches the proposed one,
159172
// so our prepared worker response is still valid.
173+
174+
span.SetTag("assigned_jobs", len(workerResp.GetCompactionJobs()))
175+
span.SetTag("assignment_updates", len(workerResp.GetAssignments()))
160176
return workerResp, nil
161177
}

pkg/metastore/fsm/fsm.go

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,26 @@ import (
1313
"github.com/go-kit/log"
1414
"github.com/go-kit/log/level"
1515
"github.com/hashicorp/raft"
16+
"github.com/opentracing/opentracing-go"
1617
"github.com/prometheus/client_golang/prometheus"
1718
"go.etcd.io/bbolt"
1819
"go.etcd.io/bbolt/errors"
1920
"golang.org/x/sync/errgroup"
2021
"google.golang.org/protobuf/proto"
22+
23+
"github.com/grafana/pyroscope/pkg/metastore/tracing"
2124
)
2225

26+
type ContextRegistry interface {
27+
Retrieve(id string) (context.Context, bool)
28+
Delete(id string)
29+
Size() int
30+
}
31+
2332
// RaftHandler is a function that processes a Raft command.
2433
// The implementation MUST be idempotent.
25-
type RaftHandler[Req, Resp proto.Message] func(*bbolt.Tx, *raft.Log, Req) (Resp, error)
34+
// The context parameter is used for tracing purposes and is only available on the leader.
35+
type RaftHandler[Req, Resp proto.Message] func(context.Context, *bbolt.Tx, *raft.Log, Req) (Resp, error)
2636

2737
// StateRestorer is called during the FSM initialization
2838
// to restore the state from a snapshot.
@@ -56,9 +66,10 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
5666

5767
// FSM implements the raft.FSM interface.
5868
type FSM struct {
59-
logger log.Logger
60-
config Config
61-
metrics *metrics
69+
logger log.Logger
70+
config Config
71+
contextRegistry ContextRegistry
72+
metrics *metrics
6273

6374
mu sync.RWMutex
6475
txns sync.WaitGroup
@@ -71,14 +82,15 @@ type FSM struct {
7182
appliedIndex uint64
7283
}
7384

74-
type handler func(tx *bbolt.Tx, cmd *raft.Log, raw []byte) (proto.Message, error)
85+
type handler func(ctx context.Context, tx *tracingTx, cmd *raft.Log, raw []byte) (proto.Message, error)
7586

76-
func New(logger log.Logger, reg prometheus.Registerer, config Config) (*FSM, error) {
87+
func New(logger log.Logger, reg prometheus.Registerer, config Config, contextRegistry ContextRegistry) (*FSM, error) {
7788
fsm := FSM{
78-
logger: logger,
79-
config: config,
80-
metrics: newMetrics(reg),
81-
handlers: make(map[RaftLogEntryType]handler),
89+
logger: logger,
90+
config: config,
91+
contextRegistry: contextRegistry,
92+
metrics: newMetrics(reg),
93+
handlers: make(map[RaftLogEntryType]handler),
8294
}
8395
db := newDB(logger, fsm.metrics, config)
8496
if err := db.open(false); err != nil {
@@ -93,12 +105,12 @@ func (fsm *FSM) RegisterRestorer(r ...StateRestorer) {
93105
}
94106

95107
func RegisterRaftCommandHandler[Req, Resp proto.Message](fsm *FSM, t RaftLogEntryType, handler RaftHandler[Req, Resp]) {
96-
fsm.handlers[t] = func(tx *bbolt.Tx, cmd *raft.Log, raw []byte) (proto.Message, error) {
108+
fsm.handlers[t] = func(ctx context.Context, tx *tracingTx, cmd *raft.Log, raw []byte) (proto.Message, error) {
97109
req, err := unmarshal[Req](raw)
98110
if err != nil {
99111
return nil, err
100112
}
101-
return handler(tx, cmd, req)
113+
return handler(ctx, tx.Tx, cmd, req)
102114
}
103115
}
104116

@@ -234,6 +246,18 @@ func (fsm *FSM) applyCommand(cmd *raft.Log) any {
234246
if err := e.UnmarshalBinary(cmd.Data); err != nil {
235247
return errResponse(cmd, err)
236248
}
249+
250+
ctx := context.Background()
251+
if ctxID := string(cmd.Extensions); ctxID != "" {
252+
var found bool
253+
if ctx, found = fsm.contextRegistry.Retrieve(ctxID); found {
254+
defer fsm.contextRegistry.Delete(ctxID)
255+
}
256+
}
257+
258+
span, ctx := tracing.StartSpanFromContext(ctx, "fsm.applyCommand")
259+
defer span.Finish()
260+
237261
if cmd.Index <= fsm.appliedIndex {
238262
// Skip already applied commands at WAL restore.
239263
// Note that the 0 index is a noop and is never applied to FSM.
@@ -253,20 +277,24 @@ func (fsm *FSM) applyCommand(cmd *raft.Log) any {
253277

254278
// Apply is never called concurrently with Restore, so we don't need
255279
// to lock the FSM: db.boltdb is guaranteed to be in a consistent state.
256-
tx, err := fsm.db.boltdb.Begin(true)
280+
rawTx, err := fsm.db.boltdb.Begin(true)
257281
if err != nil {
258282
panic(fmt.Sprint("failed to begin write transaction:", err))
259283
}
260284

261-
data, err := handle(tx, cmd, e.Data)
285+
txSpan, ctx := opentracing.StartSpanFromContext(ctx, "boltdb.transaction")
286+
txSpan.SetTag("writable", rawTx.Writable())
287+
tx := newTracingTx(rawTx, txSpan, ctx)
288+
289+
data, err := handle(ctx, tx, cmd, e.Data)
262290
if err != nil {
263291
_ = tx.Rollback()
264292
// NOTE(kolesnikovae): This has to be a hard failure as we assume
265293
// that the in-memory state might have not been rolled back properly.
266294
panic(fmt.Sprint("failed to apply command:", err))
267295
}
268296

269-
if err = fsm.storeAppliedIndex(tx, cmd.Term, cmd.Index); err != nil {
297+
if err = fsm.storeAppliedIndex(tx.Tx, cmd.Term, cmd.Index); err != nil {
270298
panic(fmt.Sprint("failed to store applied index: %w", err))
271299
}
272300

pkg/metastore/fsm/tracing_tx.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package fsm
2+
3+
import (
4+
"context"
5+
6+
"github.com/opentracing/opentracing-go"
7+
"go.etcd.io/bbolt"
8+
)
9+
10+
// tracingTx wraps a BoltDB transaction to automatically trace transaction lifecycle.
11+
// It holds a span that encompasses the entire transaction, providing visibility into
12+
// transaction timing without requiring manual instrumentation.
13+
//
14+
// The span should be created by the caller and will be finished when the transaction
15+
// is committed or rolled back.
16+
type tracingTx struct {
17+
*bbolt.Tx
18+
span opentracing.Span
19+
spanCtx context.Context // Context with the span, for child operations
20+
}
21+
22+
// newTracingTx creates a tracing transaction wrapper.
23+
// The span parameter can be nil if no tracing is desired (e.g., on follower nodes).
24+
func newTracingTx(tx *bbolt.Tx, span opentracing.Span, spanCtx context.Context) *tracingTx {
25+
return &tracingTx{
26+
Tx: tx,
27+
span: span,
28+
spanCtx: spanCtx,
29+
}
30+
}
31+
32+
// Commit commits the transaction and finishes the span.
33+
func (t *tracingTx) Commit() error {
34+
if t.span != nil {
35+
defer t.span.Finish()
36+
t.span.LogKV("operation", "commit")
37+
}
38+
return t.Tx.Commit()
39+
}
40+
41+
// Rollback rolls back the transaction and finishes the span.
42+
func (t *tracingTx) Rollback() error {
43+
if t.span != nil {
44+
defer t.span.Finish()
45+
t.span.LogKV("operation", "rollback")
46+
}
47+
return t.Tx.Rollback()
48+
}

0 commit comments

Comments
 (0)