Skip to content

Commit 12103e6

Browse files
authored
Merge branch 'concurrent-collect-loop-work-branch' into yingrong/optimize_memoize_fields
2 parents 381744b + b178b80 commit 12103e6

File tree

3 files changed

+60
-21
lines changed

3 files changed

+60
-21
lines changed

app/app_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,34 @@ func newTestAPIServer(t testing.TB) *testAPIServer {
7676

7777
server.server = httptest.NewServer(mux)
7878
t.Cleanup(server.server.Close)
79+
80+
// Health check to ensure the server is ready before returning
81+
require.Eventually(t, func() bool {
82+
testReq, err := http.NewRequest(
83+
"POST",
84+
server.server.URL+"/1/batch/test",
85+
strings.NewReader(`[{"data":{"test":"ready"}}]`),
86+
)
87+
if err != nil {
88+
return false
89+
}
90+
testReq.Header.Set("Content-Type", "application/json")
91+
92+
resp, err := server.server.Client().Do(testReq)
93+
if err != nil {
94+
return false
95+
}
96+
defer resp.Body.Close()
97+
io.Copy(io.Discard, resp.Body)
98+
99+
return resp.StatusCode == http.StatusOK
100+
}, 100*time.Millisecond, 10*time.Millisecond, "Test server failed to become ready")
101+
102+
// Clear the health check events from the events array
103+
server.mutex.Lock()
104+
server.events = nil
105+
server.mutex.Unlock()
106+
79107
return server
80108
}
81109

collect/cache/cache.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type DefaultInMemCache struct {
5454
const DefaultInMemCacheCapacity = 10000
5555

5656
var collectCacheMetrics = []metrics.Metadata{
57-
{Name: "collect_cache_entries", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "The number of traces currently stored in the cache"},
57+
{Name: "worker_cache_entries", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "The number of traces currently stored in the trace cache per worker"},
5858
}
5959

6060
func NewInMemCache(
@@ -114,7 +114,7 @@ func (d *DefaultInMemCache) GetCacheCapacity() int {
114114
// It removes and returns them.
115115
// If a filter is provided, it will be called with each trace to determine if it should be skipped.
116116
func (d *DefaultInMemCache) TakeExpiredTraces(now time.Time, max int, filter func(*types.Trace) bool) []*types.Trace {
117-
d.Metrics.Histogram("collect_cache_entries", float64(len(d.cache)))
117+
d.Metrics.Histogram("worker_cache_entries", float64(len(d.cache)))
118118

119119
var expired, skipped []*types.Trace
120120
for !d.pq.IsEmpty() && (max <= 0 || len(expired) < max) {
@@ -158,7 +158,7 @@ func (d *DefaultInMemCache) TakeExpiredTraces(now time.Time, max int, filter fun
158158
// RemoveTraces accepts a set of trace IDs and removes any matching ones from
159159
// the insertion list. This is used in the case of a cache overrun.
160160
func (d *DefaultInMemCache) RemoveTraces(toDelete generics.Set[string]) {
161-
d.Metrics.Histogram("collect_cache_entries", float64(len(d.cache)))
161+
d.Metrics.Histogram("worker_cache_entries", float64(len(d.cache)))
162162

163163
for _, traceID := range toDelete.Members() {
164164
delete(d.cache, traceID)

collect/collect.go

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"os"
88
"runtime"
9+
rtmetrics "runtime/metrics"
910
"sync"
1011
"time"
1112

@@ -37,6 +38,7 @@ const (
3738
defaultKeptDecisionTickerInterval = 1 * time.Second
3839

3940
collectorHealthKey = "collector"
41+
memMetricName = "/memory/classes/heap/objects:bytes"
4042
)
4143

4244
var ErrWouldBlock = errors.New("Dropping span as channel buffer is full. Span will not be processed and will be lost.")
@@ -113,6 +115,8 @@ type InMemCollector struct {
113115
done chan struct{}
114116

115117
hostname string
118+
119+
memMetricSample []rtmetrics.Sample // Memory monitoring using runtime/metrics
116120
}
117121

118122
// These are the names of the metrics we use to track the number of events sent to peers through the router.
@@ -130,6 +134,7 @@ var inMemCollectorMetrics = []metrics.Metadata{
130134
{Name: "collector_incoming_queue_length", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of spans in the incoming queue"},
131135
{Name: "collector_peer_queue", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of spans currently in the peer queue"},
132136
{Name: "collector_cache_size", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of traces currently stored in the trace cache"},
137+
{Name: "collect_cache_entries", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "Total number of traces currently stored in the cache from all workers"},
133138
{Name: "memory_heap_allocation", Type: metrics.Gauge, Unit: metrics.Bytes, Description: "current heap allocation"},
134139
{Name: "span_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of spans received by the collector"},
135140
{Name: "span_processed", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of spans processed by the collector"},
@@ -209,6 +214,10 @@ func (i *InMemCollector) Start() error {
209214
}
210215
}
211216

217+
// Initialize runtime/metrics sample for efficient memory monitoring
218+
i.memMetricSample = make([]rtmetrics.Sample, 1)
219+
i.memMetricSample[0].Name = memMetricName
220+
212221
i.workers = make([]*CollectorWorker, numWorkers)
213222

214223
for loopID := range i.workers {
@@ -261,29 +270,31 @@ func (i *InMemCollector) reloadConfigs() {
261270
// TODO add resizing the LRU sent trace cache on config reload
262271
}
263272

273+
// checkAlloc performs memory monitoring using runtime/metrics instead of
274+
// runtime.ReadMemStats. This avoids stop-the-world pauses that can impact performance.
264275
func (i *InMemCollector) checkAlloc(ctx context.Context) {
265-
_, span := otelutil.StartSpan(ctx, i.Tracer, "checkAlloc")
266-
defer span.End()
267-
268276
inMemConfig := i.Config.GetCollectionConfig()
269277
maxAlloc := inMemConfig.GetMaxAlloc()
270278
i.Metrics.Store("MEMORY_MAX_ALLOC", float64(maxAlloc))
271279

272-
var mem runtime.MemStats
273-
runtime.ReadMemStats(&mem)
274-
i.Metrics.Gauge("memory_heap_allocation", float64(mem.Alloc))
275-
if maxAlloc == 0 || mem.Alloc < uint64(maxAlloc) {
280+
rtmetrics.Read(i.memMetricSample)
281+
currentAlloc := i.memMetricSample[0].Value.Uint64()
282+
283+
i.Metrics.Gauge("memory_heap_allocation", float64(currentAlloc))
284+
285+
// Check if we're within memory budget
286+
if maxAlloc == 0 || currentAlloc < uint64(maxAlloc) {
276287
return
277288
}
289+
290+
// Memory over budget - trigger cache eviction
278291
i.Metrics.Increment("collector_cache_eviction")
279292

280-
// Figure out what fraction of the total cache we should remove. We'd like it to be
281-
// enough to get us below the max capacity, but not TOO much below.
282-
// Because our impact numbers are only the data size, reducing by enough to reach
283-
// max alloc will actually do more than that.
284-
totalToRemove := mem.Alloc - uint64(maxAlloc)
293+
// Calculate how much to remove from each worker
294+
totalToRemove := currentAlloc - uint64(maxAlloc)
285295
perWorkerToRemove := int(totalToRemove) / len(i.workers)
286296

297+
// Coordinate eviction across all workers
287298
var wg sync.WaitGroup
288299
var cacheSizeBefore, cacheSizeAfter int
289300
wg.Add(len(i.workers))
@@ -296,21 +307,20 @@ func (i *InMemCollector) checkAlloc(ctx context.Context) {
296307
}
297308
wg.Wait()
298309

310+
// Calculate actual eviction results
299311
for _, worker := range i.workers {
300312
cacheSizeAfter += worker.GetCacheSize()
301313
}
302314

303-
// Treat any MaxAlloc overage as an error so we know it's happening
315+
// Log the eviction
304316
i.Logger.Warn().
305-
WithField("alloc", mem.Alloc).
317+
WithField("alloc", currentAlloc).
306318
WithField("old_trace_count", cacheSizeBefore).
307319
WithField("new_trace_count", cacheSizeAfter).
308320
Logf("Making some trace decisions early due to memory overrun.")
309321

310-
// Manually GC here - without this we can easily end up evicting more than we
311-
// need to, since total alloc won't be updated until after a GC pass.
322+
// Manually trigger GC to reclaim memory immediately
312323
runtime.GC()
313-
return
314324
}
315325

316326
// monitor runs background maintenance tasks including:
@@ -347,6 +357,7 @@ func (i *InMemCollector) monitor() {
347357

348358
i.Metrics.Histogram("collector_incoming_queue", float64(totalIncoming))
349359
i.Metrics.Histogram("collector_peer_queue", float64(totalPeer))
360+
i.Metrics.Histogram("collect_cache_entries", float64(totalCacheSize))
350361
i.Metrics.Gauge("collector_incoming_queue_length", float64(totalIncoming))
351362
i.Metrics.Gauge("collector_peer_queue_length", float64(totalPeer))
352363
i.Metrics.Gauge("collector_cache_size", float64(totalCacheSize))
@@ -356,7 +367,7 @@ func (i *InMemCollector) monitor() {
356367
i.Metrics.Count("span_received", totalReceived)
357368
i.Metrics.Count("spans_waiting", totalWaiting)
358369

359-
// Send traces early if we're over memory budget
370+
// Check memory and evict if needed (using runtime/metrics - no STW)
360371
i.checkAlloc(ctx)
361372
case <-i.reload:
362373
i.reloadConfigs()

0 commit comments

Comments
 (0)