Skip to content

Commit 50d0f57

Browse files
authored
Merge branch 'concurrent-collect-loop-work-branch' into yingrong/cache_entries_metric
2 parents 336ce2b + 97722e9 commit 50d0f57

File tree

6 files changed

+294
-301
lines changed

6 files changed

+294
-301
lines changed

collect/collect.go

Lines changed: 102 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"os"
88
"runtime"
9+
rtmetrics "runtime/metrics"
910
"sync"
1011
"time"
1112

@@ -37,6 +38,7 @@ const (
3738
defaultKeptDecisionTickerInterval = 1 * time.Second
3839

3940
collectorHealthKey = "collector"
41+
memMetricName = "/memory/classes/heap/objects:bytes"
4042
)
4143

4244
var ErrWouldBlock = errors.New("Dropping span as channel buffer is full. Span will not be processed and will be lost.")
@@ -76,7 +78,7 @@ type sendableTrace struct {
7678
samplerSelector string
7779
}
7880

79-
// InMemCollector is a collector that can use multiple concurrent collection loops.
81+
// InMemCollector is a collector that can use multiple concurrent workers to make sampling decision.
8082
type InMemCollector struct {
8183
Config config.Config `inject:""`
8284
Logger logger.Logger `inject:""`
@@ -97,22 +99,24 @@ type InMemCollector struct {
9799
TestMode bool
98100
BlockOnAddSpan bool
99101

100-
// Parallel collection support
101-
collectLoops []*CollectLoop
102+
// Workers for parallel collection - each worker processes a subset of traces
103+
workers []*CollectorWorker
102104

103105
// mutex must be held whenever non-channel internal fields are accessed.
104106
mutex sync.RWMutex
105107

106108
sampleTraceCache cache.TraceSentCache
107109

108-
houseKeepingWG sync.WaitGroup
109-
collectLoopsWG sync.WaitGroup // Separate WaitGroup for collect loops
110-
sendTracesWG sync.WaitGroup
111-
reload chan struct{} // Channel for config reload signals
112-
outgoingTraces chan sendableTrace
113-
done chan struct{}
110+
monitorWG sync.WaitGroup // WaitGroup for background monitoring goroutine
111+
workersWG sync.WaitGroup // Separate WaitGroup for workers
112+
sendTracesWG sync.WaitGroup
113+
reload chan struct{} // Channel for config reload signals
114+
tracesToSend chan sendableTrace // Channel of traces ready for transmission
115+
done chan struct{}
114116

115117
hostname string
118+
119+
memMetricSample []rtmetrics.Sample // Memory monitoring using runtime/metrics
116120
}
117121

118122
// These are the names of the metrics we use to track the number of events sent to peers through the router.
@@ -176,9 +180,9 @@ func (i *InMemCollector) Start() error {
176180
defer func() { i.Logger.Debug().Logf("Finished starting InMemCollector") }()
177181
imcConfig := i.Config.GetCollectionConfig()
178182

179-
numLoops := imcConfig.GetNumCollectLoops()
183+
numWorkers := imcConfig.GetNumCollectLoops()
180184

181-
i.Logger.Info().WithField("num_loops", numLoops).Logf("Starting InMemCollector with %d collection loops", numLoops)
185+
i.Logger.Info().WithField("num_workers", numWorkers).Logf("Starting InMemCollector with %d workers", numWorkers)
182186

183187
i.StressRelief.UpdateFromConfig()
184188

@@ -200,7 +204,7 @@ func (i *InMemCollector) Start() error {
200204
return err
201205
}
202206

203-
i.outgoingTraces = make(chan sendableTrace, 100_000)
207+
i.tracesToSend = make(chan sendableTrace, 100_000)
204208
i.done = make(chan struct{})
205209
i.reload = make(chan struct{}, 1)
206210

@@ -210,22 +214,26 @@ func (i *InMemCollector) Start() error {
210214
}
211215
}
212216

213-
i.collectLoops = make([]*CollectLoop, numLoops)
217+
// Initialize runtime/metrics sample for efficient memory monitoring
218+
i.memMetricSample = make([]rtmetrics.Sample, 1)
219+
i.memMetricSample[0].Name = memMetricName
220+
221+
i.workers = make([]*CollectorWorker, numWorkers)
214222

215-
for loopID := range i.collectLoops {
216-
loop := NewCollectLoop(loopID, i, imcConfig.GetIncomingQueueSizePerLoop(), imcConfig.GetPeerQueueSizePerLoop())
217-
i.collectLoops[loopID] = loop
223+
for loopID := range i.workers {
224+
worker := NewCollectorWorker(loopID, i, imcConfig.GetIncomingQueueSizePerLoop(), imcConfig.GetPeerQueueSizePerLoop())
225+
i.workers[loopID] = worker
218226

219-
// Start the collect goroutine for this loop
220-
i.collectLoopsWG.Add(1)
221-
go loop.collect()
227+
// Start the collect goroutine for this worker
228+
i.workersWG.Add(1)
229+
go worker.collect()
222230
}
223231

224232
i.sendTracesWG.Add(1)
225233
go i.sendTraces()
226234

227-
i.houseKeepingWG.Add(1)
228-
go i.houseKeeping()
235+
i.monitorWG.Add(1)
236+
go i.monitor()
229237

230238
return nil
231239
}
@@ -250,73 +258,77 @@ func (i *InMemCollector) reloadConfigs() {
250258

251259
i.StressRelief.UpdateFromConfig()
252260

253-
// Send reload signals to all collect loops to clear their local samplers
261+
// Send reload signals to all workers to clear their local samplers
254262
// so that the new configuration will be propagated
255-
for _, loop := range i.collectLoops {
263+
for _, worker := range i.workers {
256264
select {
257-
case loop.reload <- struct{}{}:
265+
case worker.reload <- struct{}{}:
258266
default:
259267
// Channel already has a signal pending, skip
260268
}
261269
}
262270
// TODO add resizing the LRU sent trace cache on config reload
263271
}
264272

273+
// checkAlloc performs memory monitoring using runtime/metrics instead of
274+
// runtime.ReadMemStats. This avoids stop-the-world pauses that can impact performance.
265275
func (i *InMemCollector) checkAlloc(ctx context.Context) {
266-
_, span := otelutil.StartSpan(ctx, i.Tracer, "checkAlloc")
267-
defer span.End()
268-
269276
inMemConfig := i.Config.GetCollectionConfig()
270277
maxAlloc := inMemConfig.GetMaxAlloc()
271278
i.Metrics.Store("MEMORY_MAX_ALLOC", float64(maxAlloc))
272279

273-
var mem runtime.MemStats
274-
runtime.ReadMemStats(&mem)
275-
i.Metrics.Gauge("memory_heap_allocation", float64(mem.Alloc))
276-
if maxAlloc == 0 || mem.Alloc < uint64(maxAlloc) {
280+
rtmetrics.Read(i.memMetricSample)
281+
currentAlloc := i.memMetricSample[0].Value.Uint64()
282+
283+
i.Metrics.Gauge("memory_heap_allocation", float64(currentAlloc))
284+
285+
// Check if we're within memory budget
286+
if maxAlloc == 0 || currentAlloc < uint64(maxAlloc) {
277287
return
278288
}
289+
290+
// Memory over budget - trigger cache eviction
279291
i.Metrics.Increment("collector_cache_eviction")
280292

281-
// Figure out what fraction of the total cache we should remove. We'd like it to be
282-
// enough to get us below the max capacity, but not TOO much below.
283-
// Because our impact numbers are only the data size, reducing by enough to reach
284-
// max alloc will actually do more than that.
285-
totalToRemove := mem.Alloc - uint64(maxAlloc)
286-
perLoopToRemove := int(totalToRemove) / len(i.collectLoops)
293+
// Calculate how much to remove from each worker
294+
totalToRemove := currentAlloc - uint64(maxAlloc)
295+
perWorkerToRemove := int(totalToRemove) / len(i.workers)
287296

297+
// Coordinate eviction across all workers
288298
var wg sync.WaitGroup
289299
var cacheSizeBefore, cacheSizeAfter int
290-
wg.Add(len(i.collectLoops))
291-
for _, loop := range i.collectLoops {
292-
cacheSizeBefore += loop.GetCacheSize()
293-
loop.sendEarly <- sendEarly{
300+
wg.Add(len(i.workers))
301+
for _, worker := range i.workers {
302+
cacheSizeBefore += worker.GetCacheSize()
303+
worker.sendEarly <- sendEarly{
294304
wg: &wg,
295-
bytesToSend: perLoopToRemove,
305+
bytesToSend: perWorkerToRemove,
296306
}
297307
}
298308
wg.Wait()
299309

300-
for _, loop := range i.collectLoops {
301-
cacheSizeAfter += loop.GetCacheSize()
310+
// Calculate actual eviction results
311+
for _, worker := range i.workers {
312+
cacheSizeAfter += worker.GetCacheSize()
302313
}
303314

304-
// Treat any MaxAlloc overage as an error so we know it's happening
315+
// Log the eviction
305316
i.Logger.Warn().
306-
WithField("alloc", mem.Alloc).
317+
WithField("alloc", currentAlloc).
307318
WithField("old_trace_count", cacheSizeBefore).
308319
WithField("new_trace_count", cacheSizeAfter).
309320
Logf("Making some trace decisions early due to memory overrun.")
310321

311-
// Manually GC here - without this we can easily end up evicting more than we
312-
// need to, since total alloc won't be updated until after a GC pass.
322+
// Manually trigger GC to reclaim memory immediately
313323
runtime.GC()
314-
return
315324
}
316325

317-
// houseKeeping listens for reload signals and calls reloadConfigs
318-
func (i *InMemCollector) houseKeeping() {
319-
defer i.houseKeepingWG.Done()
326+
// monitor runs background maintenance tasks including:
327+
// - Aggregating metrics from all workers
328+
// - Monitoring memory usage and triggering cache eviction
329+
// - Handling config reload signals
330+
func (i *InMemCollector) monitor() {
331+
defer i.monitorWG.Done()
320332

321333
ctx := context.Background()
322334

@@ -334,13 +346,13 @@ func (i *InMemCollector) houseKeeping() {
334346
var totalWaiting int64
335347
var totalReceived int64
336348

337-
for _, loop := range i.collectLoops {
338-
totalIncoming += len(loop.incoming)
339-
totalPeer += len(loop.fromPeer)
340-
totalCacheSize += loop.GetCacheSize()
349+
for _, worker := range i.workers {
350+
totalIncoming += len(worker.incoming)
351+
totalPeer += len(worker.fromPeer)
352+
totalCacheSize += worker.GetCacheSize()
341353

342-
totalWaiting += loop.localSpansWaiting.Swap(0)
343-
totalReceived += loop.localSpanReceived.Swap(0)
354+
totalWaiting += worker.localSpansWaiting.Swap(0)
355+
totalReceived += worker.localSpanReceived.Swap(0)
344356
}
345357

346358
i.Metrics.Histogram("collector_incoming_queue", float64(totalIncoming))
@@ -349,13 +361,13 @@ func (i *InMemCollector) houseKeeping() {
349361
i.Metrics.Gauge("collector_incoming_queue_length", float64(totalIncoming))
350362
i.Metrics.Gauge("collector_peer_queue_length", float64(totalPeer))
351363
i.Metrics.Gauge("collector_cache_size", float64(totalCacheSize))
352-
i.Metrics.Gauge("collector_num_loops", float64(len(i.collectLoops)))
364+
i.Metrics.Gauge("collector_num_workers", float64(len(i.workers)))
353365

354366
// Report aggregated thread-local metrics
355367
i.Metrics.Count("span_received", totalReceived)
356368
i.Metrics.Count("spans_waiting", totalWaiting)
357369

358-
// Send traces early if we're over memory budget
370+
// Check memory and evict if needed (using runtime/metrics - no STW)
359371
i.checkAlloc(ctx)
360372
case <-i.reload:
361373
i.reloadConfigs()
@@ -366,33 +378,33 @@ func (i *InMemCollector) houseKeeping() {
366378
}
367379
}
368380

369-
// getLoopForTrace determines which CollectLoop should handle a given trace ID
370-
// using consistent hashing to ensure all spans for a trace go to the same loop
371-
func (i *InMemCollector) getLoopForTrace(traceID string) int {
381+
// getWorkerIDForTrace determines which CollectorWorker should handle a given trace ID
382+
// using consistent hashing to ensure all spans for a trace go to the same worker
383+
func (i *InMemCollector) getWorkerIDForTrace(traceID string) int {
372384
// Hash with a seed so that we don't align with any other hashes of this
373385
// trace. We use a different algorithm to assign traces to nodes, but we
374-
// still want to minimize the risk of any synchronization beteween that
386+
// still want to minimize the risk of any synchronization between that
375387
// distribution and this one.
376388
hash := wyhash.Hash([]byte(traceID), 7215963184435617557)
377389

378-
// Map to loop index
379-
loopIndex := int(hash % uint64(len(i.collectLoops)))
390+
// Map to worker index
391+
workerIndex := int(hash % uint64(len(i.workers)))
380392

381-
return loopIndex
393+
return workerIndex
382394
}
383395

384396
// AddSpan accepts the incoming span to a queue and returns immediately
385397
func (i *InMemCollector) AddSpan(sp *types.Span) error {
386-
// Route to the appropriate loop
387-
loopIndex := i.getLoopForTrace(sp.TraceID)
388-
return i.collectLoops[loopIndex].addSpan(sp)
398+
// Route to the appropriate worker
399+
workerIndex := i.getWorkerIDForTrace(sp.TraceID)
400+
return i.workers[workerIndex].addSpan(sp)
389401
}
390402

391403
// AddSpanFromPeer accepts the incoming span from a peer to a queue and returns immediately
392404
func (i *InMemCollector) AddSpanFromPeer(sp *types.Span) error {
393-
// Route to the appropriate loop
394-
loopIndex := i.getLoopForTrace(sp.TraceID)
395-
return i.collectLoops[loopIndex].addSpanFromPeer(sp)
405+
// Route to the appropriate worker
406+
workerIndex := i.getWorkerIDForTrace(sp.TraceID)
407+
return i.workers[workerIndex].addSpanFromPeer(sp)
396408
}
397409

398410
// Stressed returns true if the collector is undergoing significant stress
@@ -465,7 +477,7 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool,
465477
// dealWithSentTrace handles a span that has arrived after the sampling decision
466478
// on the trace has already been made, and it obeys that decision by either
467479
// sending the span immediately or dropping it.
468-
// This method is made public so CollectLoop can access it.
480+
// This method is made public so CollectWorker can access it.
469481
func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSentRecord, keptReason string, sp *types.Span) {
470482
_, span := otelutil.StartSpanMulti(ctx, i.Tracer, "dealWithSentTrace", map[string]interface{}{
471483
"trace_id": sp.TraceID,
@@ -557,7 +569,7 @@ func mergeTraceAndSpanSampleRates(sp *types.Span, traceSampleRate uint, dryRunMo
557569
}
558570

559571
// this is only called when a trace decision is received
560-
// TODO it may be desirable to move this and sendTraes() into the CollectLoop.
572+
// TODO it may be desirable to move this and sendTraes() into the CollectWorker.
561573
func (i *InMemCollector) send(ctx context.Context, trace sendableTrace) {
562574
if trace.Sent {
563575
// someone else already sent this so we shouldn't also send it.
@@ -620,7 +632,7 @@ func (i *InMemCollector) send(ctx context.Context, trace sendableTrace) {
620632
i.Logger.Info().WithFields(logFields).Logf("Sending trace")
621633
}
622634

623-
i.outgoingTraces <- trace
635+
i.tracesToSend <- trace
624636
}
625637

626638
func (i *InMemCollector) Stop() error {
@@ -633,26 +645,26 @@ func (i *InMemCollector) Stop() error {
633645
// stop liveness check so that no new traces are accepted
634646
i.Health.Unregister(collectorHealthKey)
635647

636-
// Stop housekeeping first - we want to make sure we don't start a checkAlloc
637-
// after shutting down the collect loops.
638-
i.houseKeepingWG.Wait()
648+
// Stop maintenance first - we want to make sure we don't start a checkAlloc
649+
// after shutting down the workers.
650+
i.monitorWG.Wait()
639651

640-
// Close all loop input channels, which will cause the loops to stop.
641-
for idx, loop := range i.collectLoops {
642-
i.Logger.Debug().WithField("loop_id", idx).Logf("closing loop channels")
643-
close(loop.incoming)
644-
close(loop.fromPeer)
652+
// Close all worker input channels, which will cause the workers to stop.
653+
for idx, worker := range i.workers {
654+
i.Logger.Debug().WithField("worker_id", idx).Logf("closing worker channels")
655+
close(worker.incoming)
656+
close(worker.fromPeer)
645657
}
646-
i.collectLoopsWG.Wait()
658+
i.workersWG.Wait()
647659

648660
// Stop the sample trace cache
649661
if i.sampleTraceCache != nil {
650662
i.sampleTraceCache.Stop()
651663
}
652664

653-
// Now it's safe to close the outgoing traces channel
665+
// Now it's safe to close the traces to send channel
654666
// No more traces will be sent to it
655-
close(i.outgoingTraces)
667+
close(i.tracesToSend)
656668
i.sendTracesWG.Wait()
657669

658670
i.Logger.Debug().Logf("InMemCollector shutdown complete")
@@ -675,9 +687,9 @@ func (i *InMemCollector) addAdditionalAttributes(sp *types.Span) {
675687
func (i *InMemCollector) sendTraces() {
676688
defer i.sendTracesWG.Done()
677689

678-
for t := range i.outgoingTraces {
679-
i.Metrics.Histogram("collector_outgoing_queue", float64(len(i.outgoingTraces)))
680-
_, span := otelutil.StartSpanMulti(context.Background(), i.Tracer, "sendTrace", map[string]interface{}{"num_spans": t.DescendantCount(), "outgoingTraces_size": len(i.outgoingTraces)})
690+
for t := range i.tracesToSend {
691+
i.Metrics.Histogram("collector_outgoing_queue", float64(len(i.tracesToSend)))
692+
_, span := otelutil.StartSpanMulti(context.Background(), i.Tracer, "sendTrace", map[string]interface{}{"num_spans": t.DescendantCount(), "tracesToSend_size": len(i.tracesToSend)})
681693

682694
// if we have a key replacement rule, we should
683695
// replace the key with the new key

0 commit comments

Comments
 (0)