Skip to content

Commit ff10385

Browse files
authored
maint: rename CollectLoop related code to improve readability (#1722)
## Which problem is this PR solving? The current naming for the collector (manager) and collect_loop(worker) doesn't reflect the architecture design. This makes naming metrics and understanding code logic hard. We would like to rename the newly added code to reflect the new architecture so that it's easier to understand and work with ## Short description of the changes Type & Struct Renames: - CollectLoop → CollectorWorker - NewCollectLoop() → NewCollectorWorker() Field & Variable Renames in InMemCollector: - collectLoops []*CollectLoop → workers []*CollectorWorker - collectLoopsWG → workersWG - houseKeepingWG → monitorWG - outgoingTraces → tracesToSend Method Renames: - houseKeeping() → monitor() - getLoopForTrace() → getWorkerIDForTrace() Constant Renames: - collectorLoopHealthPrefix → collectWorkerHealthPrefix File Renames: - collect/collect_loop.go → collect/collector_worker.go
1 parent 4c492e5 commit ff10385

File tree

4 files changed

+170
-164
lines changed

4 files changed

+170
-164
lines changed

collect/collect.go

Lines changed: 75 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ type sendableTrace struct {
7676
samplerSelector string
7777
}
7878

79-
// InMemCollector is a collector that can use multiple concurrent collection loops.
79+
// InMemCollector is a collector that can use multiple concurrent workers to make sampling decision.
8080
type InMemCollector struct {
8181
Config config.Config `inject:""`
8282
Logger logger.Logger `inject:""`
@@ -97,20 +97,20 @@ type InMemCollector struct {
9797
TestMode bool
9898
BlockOnAddSpan bool
9999

100-
// Parallel collection support
101-
collectLoops []*CollectLoop
100+
// Workers for parallel collection - each worker processes a subset of traces
101+
workers []*CollectorWorker
102102

103103
// mutex must be held whenever non-channel internal fields are accessed.
104104
mutex sync.RWMutex
105105

106106
sampleTraceCache cache.TraceSentCache
107107

108-
houseKeepingWG sync.WaitGroup
109-
collectLoopsWG sync.WaitGroup // Separate WaitGroup for collect loops
110-
sendTracesWG sync.WaitGroup
111-
reload chan struct{} // Channel for config reload signals
112-
outgoingTraces chan sendableTrace
113-
done chan struct{}
108+
monitorWG sync.WaitGroup // WaitGroup for background monitoring goroutine
109+
workersWG sync.WaitGroup // Separate WaitGroup for workers
110+
sendTracesWG sync.WaitGroup
111+
reload chan struct{} // Channel for config reload signals
112+
tracesToSend chan sendableTrace // Channel of traces ready for transmission
113+
done chan struct{}
114114

115115
hostname string
116116
}
@@ -175,9 +175,9 @@ func (i *InMemCollector) Start() error {
175175
defer func() { i.Logger.Debug().Logf("Finished starting InMemCollector") }()
176176
imcConfig := i.Config.GetCollectionConfig()
177177

178-
numLoops := imcConfig.GetNumCollectLoops()
178+
numWorkers := imcConfig.GetNumCollectLoops()
179179

180-
i.Logger.Info().WithField("num_loops", numLoops).Logf("Starting InMemCollector with %d collection loops", numLoops)
180+
i.Logger.Info().WithField("num_workers", numWorkers).Logf("Starting InMemCollector with %d workers", numWorkers)
181181

182182
i.StressRelief.UpdateFromConfig()
183183

@@ -199,7 +199,7 @@ func (i *InMemCollector) Start() error {
199199
return err
200200
}
201201

202-
i.outgoingTraces = make(chan sendableTrace, 100_000)
202+
i.tracesToSend = make(chan sendableTrace, 100_000)
203203
i.done = make(chan struct{})
204204
i.reload = make(chan struct{}, 1)
205205

@@ -209,22 +209,22 @@ func (i *InMemCollector) Start() error {
209209
}
210210
}
211211

212-
i.collectLoops = make([]*CollectLoop, numLoops)
212+
i.workers = make([]*CollectorWorker, numWorkers)
213213

214-
for loopID := range i.collectLoops {
215-
loop := NewCollectLoop(loopID, i, imcConfig.GetIncomingQueueSizePerLoop(), imcConfig.GetPeerQueueSizePerLoop())
216-
i.collectLoops[loopID] = loop
214+
for loopID := range i.workers {
215+
worker := NewCollectorWorker(loopID, i, imcConfig.GetIncomingQueueSizePerLoop(), imcConfig.GetPeerQueueSizePerLoop())
216+
i.workers[loopID] = worker
217217

218-
// Start the collect goroutine for this loop
219-
i.collectLoopsWG.Add(1)
220-
go loop.collect()
218+
// Start the collect goroutine for this worker
219+
i.workersWG.Add(1)
220+
go worker.collect()
221221
}
222222

223223
i.sendTracesWG.Add(1)
224224
go i.sendTraces()
225225

226-
i.houseKeepingWG.Add(1)
227-
go i.houseKeeping()
226+
i.monitorWG.Add(1)
227+
go i.monitor()
228228

229229
return nil
230230
}
@@ -249,11 +249,11 @@ func (i *InMemCollector) reloadConfigs() {
249249

250250
i.StressRelief.UpdateFromConfig()
251251

252-
// Send reload signals to all collect loops to clear their local samplers
252+
// Send reload signals to all workers to clear their local samplers
253253
// so that the new configuration will be propagated
254-
for _, loop := range i.collectLoops {
254+
for _, worker := range i.workers {
255255
select {
256-
case loop.reload <- struct{}{}:
256+
case worker.reload <- struct{}{}:
257257
default:
258258
// Channel already has a signal pending, skip
259259
}
@@ -282,22 +282,22 @@ func (i *InMemCollector) checkAlloc(ctx context.Context) {
282282
// Because our impact numbers are only the data size, reducing by enough to reach
283283
// max alloc will actually do more than that.
284284
totalToRemove := mem.Alloc - uint64(maxAlloc)
285-
perLoopToRemove := int(totalToRemove) / len(i.collectLoops)
285+
perWorkerToRemove := int(totalToRemove) / len(i.workers)
286286

287287
var wg sync.WaitGroup
288288
var cacheSizeBefore, cacheSizeAfter int
289-
wg.Add(len(i.collectLoops))
290-
for _, loop := range i.collectLoops {
291-
cacheSizeBefore += loop.GetCacheSize()
292-
loop.sendEarly <- sendEarly{
289+
wg.Add(len(i.workers))
290+
for _, worker := range i.workers {
291+
cacheSizeBefore += worker.GetCacheSize()
292+
worker.sendEarly <- sendEarly{
293293
wg: &wg,
294-
bytesToSend: perLoopToRemove,
294+
bytesToSend: perWorkerToRemove,
295295
}
296296
}
297297
wg.Wait()
298298

299-
for _, loop := range i.collectLoops {
300-
cacheSizeAfter += loop.GetCacheSize()
299+
for _, worker := range i.workers {
300+
cacheSizeAfter += worker.GetCacheSize()
301301
}
302302

303303
// Treat any MaxAlloc overage as an error so we know it's happening
@@ -313,9 +313,12 @@ func (i *InMemCollector) checkAlloc(ctx context.Context) {
313313
return
314314
}
315315

316-
// houseKeeping listens for reload signals and calls reloadConfigs
317-
func (i *InMemCollector) houseKeeping() {
318-
defer i.houseKeepingWG.Done()
316+
// monitor runs background maintenance tasks including:
317+
// - Aggregating metrics from all workers
318+
// - Monitoring memory usage and triggering cache eviction
319+
// - Handling config reload signals
320+
func (i *InMemCollector) monitor() {
321+
defer i.monitorWG.Done()
319322

320323
ctx := context.Background()
321324

@@ -333,21 +336,21 @@ func (i *InMemCollector) houseKeeping() {
333336
var totalWaiting int64
334337
var totalReceived int64
335338

336-
for _, loop := range i.collectLoops {
337-
totalIncoming += len(loop.incoming)
338-
totalPeer += len(loop.fromPeer)
339-
totalCacheSize += loop.GetCacheSize()
339+
for _, worker := range i.workers {
340+
totalIncoming += len(worker.incoming)
341+
totalPeer += len(worker.fromPeer)
342+
totalCacheSize += worker.GetCacheSize()
340343

341-
totalWaiting += loop.localSpansWaiting.Swap(0)
342-
totalReceived += loop.localSpanReceived.Swap(0)
344+
totalWaiting += worker.localSpansWaiting.Swap(0)
345+
totalReceived += worker.localSpanReceived.Swap(0)
343346
}
344347

345348
i.Metrics.Histogram("collector_incoming_queue", float64(totalIncoming))
346349
i.Metrics.Histogram("collector_peer_queue", float64(totalPeer))
347350
i.Metrics.Gauge("collector_incoming_queue_length", float64(totalIncoming))
348351
i.Metrics.Gauge("collector_peer_queue_length", float64(totalPeer))
349352
i.Metrics.Gauge("collector_cache_size", float64(totalCacheSize))
350-
i.Metrics.Gauge("collector_num_loops", float64(len(i.collectLoops)))
353+
i.Metrics.Gauge("collector_num_workers", float64(len(i.workers)))
351354

352355
// Report aggregated thread-local metrics
353356
i.Metrics.Count("span_received", totalReceived)
@@ -364,33 +367,33 @@ func (i *InMemCollector) houseKeeping() {
364367
}
365368
}
366369

367-
// getLoopForTrace determines which CollectLoop should handle a given trace ID
368-
// using consistent hashing to ensure all spans for a trace go to the same loop
369-
func (i *InMemCollector) getLoopForTrace(traceID string) int {
370+
// getWorkerIDForTrace determines which CollectorWorker should handle a given trace ID
371+
// using consistent hashing to ensure all spans for a trace go to the same worker
372+
func (i *InMemCollector) getWorkerIDForTrace(traceID string) int {
370373
// Hash with a seed so that we don't align with any other hashes of this
371374
// trace. We use a different algorithm to assign traces to nodes, but we
372-
// still want to minimize the risk of any synchronization beteween that
375+
// still want to minimize the risk of any synchronization between that
373376
// distribution and this one.
374377
hash := wyhash.Hash([]byte(traceID), 7215963184435617557)
375378

376-
// Map to loop index
377-
loopIndex := int(hash % uint64(len(i.collectLoops)))
379+
// Map to worker index
380+
workerIndex := int(hash % uint64(len(i.workers)))
378381

379-
return loopIndex
382+
return workerIndex
380383
}
381384

382385
// AddSpan accepts the incoming span to a queue and returns immediately
383386
func (i *InMemCollector) AddSpan(sp *types.Span) error {
384-
// Route to the appropriate loop
385-
loopIndex := i.getLoopForTrace(sp.TraceID)
386-
return i.collectLoops[loopIndex].addSpan(sp)
387+
// Route to the appropriate worker
388+
workerIndex := i.getWorkerIDForTrace(sp.TraceID)
389+
return i.workers[workerIndex].addSpan(sp)
387390
}
388391

389392
// AddSpanFromPeer accepts the incoming span from a peer to a queue and returns immediately
390393
func (i *InMemCollector) AddSpanFromPeer(sp *types.Span) error {
391-
// Route to the appropriate loop
392-
loopIndex := i.getLoopForTrace(sp.TraceID)
393-
return i.collectLoops[loopIndex].addSpanFromPeer(sp)
394+
// Route to the appropriate worker
395+
workerIndex := i.getWorkerIDForTrace(sp.TraceID)
396+
return i.workers[workerIndex].addSpanFromPeer(sp)
394397
}
395398

396399
// Stressed returns true if the collector is undergoing significant stress
@@ -463,7 +466,7 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool,
463466
// dealWithSentTrace handles a span that has arrived after the sampling decision
464467
// on the trace has already been made, and it obeys that decision by either
465468
// sending the span immediately or dropping it.
466-
// This method is made public so CollectLoop can access it.
469+
// This method is made public so CollectWorker can access it.
467470
func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSentRecord, keptReason string, sp *types.Span) {
468471
_, span := otelutil.StartSpanMulti(ctx, i.Tracer, "dealWithSentTrace", map[string]interface{}{
469472
"trace_id": sp.TraceID,
@@ -555,7 +558,7 @@ func mergeTraceAndSpanSampleRates(sp *types.Span, traceSampleRate uint, dryRunMo
555558
}
556559

557560
// this is only called when a trace decision is received
558-
// TODO it may be desirable to move this and sendTraes() into the CollectLoop.
561+
// TODO it may be desirable to move this and sendTraes() into the CollectWorker.
559562
func (i *InMemCollector) send(ctx context.Context, trace sendableTrace) {
560563
if trace.Sent {
561564
// someone else already sent this so we shouldn't also send it.
@@ -618,7 +621,7 @@ func (i *InMemCollector) send(ctx context.Context, trace sendableTrace) {
618621
i.Logger.Info().WithFields(logFields).Logf("Sending trace")
619622
}
620623

621-
i.outgoingTraces <- trace
624+
i.tracesToSend <- trace
622625
}
623626

624627
func (i *InMemCollector) Stop() error {
@@ -631,26 +634,26 @@ func (i *InMemCollector) Stop() error {
631634
// stop liveness check so that no new traces are accepted
632635
i.Health.Unregister(collectorHealthKey)
633636

634-
// Stop housekeeping first - we want to make sure we don't start a checkAlloc
635-
// after shutting down the collect loops.
636-
i.houseKeepingWG.Wait()
637+
// Stop maintenance first - we want to make sure we don't start a checkAlloc
638+
// after shutting down the workers.
639+
i.monitorWG.Wait()
637640

638-
// Close all loop input channels, which will cause the loops to stop.
639-
for idx, loop := range i.collectLoops {
640-
i.Logger.Debug().WithField("loop_id", idx).Logf("closing loop channels")
641-
close(loop.incoming)
642-
close(loop.fromPeer)
641+
// Close all worker input channels, which will cause the workers to stop.
642+
for idx, worker := range i.workers {
643+
i.Logger.Debug().WithField("worker_id", idx).Logf("closing worker channels")
644+
close(worker.incoming)
645+
close(worker.fromPeer)
643646
}
644-
i.collectLoopsWG.Wait()
647+
i.workersWG.Wait()
645648

646649
// Stop the sample trace cache
647650
if i.sampleTraceCache != nil {
648651
i.sampleTraceCache.Stop()
649652
}
650653

651-
// Now it's safe to close the outgoing traces channel
654+
// Now it's safe to close the traces to send channel
652655
// No more traces will be sent to it
653-
close(i.outgoingTraces)
656+
close(i.tracesToSend)
654657
i.sendTracesWG.Wait()
655658

656659
i.Logger.Debug().Logf("InMemCollector shutdown complete")
@@ -673,9 +676,9 @@ func (i *InMemCollector) addAdditionalAttributes(sp *types.Span) {
673676
func (i *InMemCollector) sendTraces() {
674677
defer i.sendTracesWG.Done()
675678

676-
for t := range i.outgoingTraces {
677-
i.Metrics.Histogram("collector_outgoing_queue", float64(len(i.outgoingTraces)))
678-
_, span := otelutil.StartSpanMulti(context.Background(), i.Tracer, "sendTrace", map[string]interface{}{"num_spans": t.DescendantCount(), "outgoingTraces_size": len(i.outgoingTraces)})
679+
for t := range i.tracesToSend {
680+
i.Metrics.Histogram("collector_outgoing_queue", float64(len(i.tracesToSend)))
681+
_, span := otelutil.StartSpanMulti(context.Background(), i.Tracer, "sendTrace", map[string]interface{}{"num_spans": t.DescendantCount(), "tracesToSend_size": len(i.tracesToSend)})
679682

680683
// if we have a key replacement rule, we should
681684
// replace the key with the new key

0 commit comments

Comments
 (0)