Skip to content

Commit 336ce2b

Browse files
committed
add worker_cache_entries
1 parent 779cf5a commit 336ce2b

File tree

2 files changed

+19
-5
lines changed

2 files changed

+19
-5
lines changed

collect/cache/cache.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,29 +44,39 @@ var _ Cache = (*DefaultInMemCache)(nil)
4444
// order) so it is important to have a cache larger than trace throughput *
4545
// longest trace.
4646
type DefaultInMemCache struct {
47-
Logger logger.Logger
47+
Metrics metrics.Metrics
48+
Logger logger.Logger
4849

4950
pq *kpq.KeyedPriorityQueue[string, time.Time]
5051
cache map[string]*types.Trace
5152
}
5253

5354
const DefaultInMemCacheCapacity = 10000
5455

56+
var collectCacheMetrics = []metrics.Metadata{
57+
{Name: "worker_cache_entries", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "The number of traces currently stored in the trace cache per worker"},
58+
}
59+
5560
func NewInMemCache(
5661
met metrics.Metrics,
5762
logger logger.Logger,
5863
) *DefaultInMemCache {
5964
logger.Debug().Logf("Starting DefaultInMemCache")
6065
defer func() { logger.Debug().Logf("Finished starting DefaultInMemCache") }()
6166

67+
for _, metadata := range collectCacheMetrics {
68+
met.Register(metadata)
69+
}
70+
6271
cmp := func(v1, v2 time.Time) bool {
6372
return v1.Before(v2)
6473
}
6574

6675
return &DefaultInMemCache{
67-
Logger: logger,
68-
pq: kpq.NewKeyedPriorityQueue[string](cmp),
69-
cache: make(map[string]*types.Trace),
76+
Metrics: met,
77+
Logger: logger,
78+
pq: kpq.NewKeyedPriorityQueue[string](cmp),
79+
cache: make(map[string]*types.Trace),
7080
}
7181
}
7282

@@ -104,6 +114,8 @@ func (d *DefaultInMemCache) GetCacheCapacity() int {
104114
// It removes and returns them.
105115
// If a filter is provided, it will be called with each trace to determine if it should be skipped.
106116
func (d *DefaultInMemCache) TakeExpiredTraces(now time.Time, max int, filter func(*types.Trace) bool) []*types.Trace {
117+
d.Metrics.Histogram("worker_cache_entries", float64(len(d.cache)))
118+
107119
var expired, skipped []*types.Trace
108120
for !d.pq.IsEmpty() && (max <= 0 || len(expired) < max) {
109121
// pop the the next trace from the queue
@@ -146,6 +158,8 @@ func (d *DefaultInMemCache) TakeExpiredTraces(now time.Time, max int, filter fun
146158
// RemoveTraces accepts a set of trace IDs and removes any matching ones from
147159
// the insertion list. This is used in the case of a cache overrun.
148160
func (d *DefaultInMemCache) RemoveTraces(toDelete generics.Set[string]) {
161+
d.Metrics.Histogram("worker_cache_entries", float64(len(d.cache)))
162+
149163
for _, traceID := range toDelete.Members() {
150164
delete(d.cache, traceID)
151165
d.pq.Remove(traceID)

collect/collect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ var inMemCollectorMetrics = []metrics.Metadata{
130130
{Name: "collector_incoming_queue_length", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of spans in the incoming queue"},
131131
{Name: "collector_peer_queue", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of spans currently in the peer queue"},
132132
{Name: "collector_cache_size", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of traces currently stored in the trace cache"},
133-
{Name: "collect_cache_entries", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "The number of traces currently stored in the cache"},
133+
{Name: "collect_cache_entries", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "Total number of traces currently stored in the cache from all workers"},
134134
{Name: "memory_heap_allocation", Type: metrics.Gauge, Unit: metrics.Bytes, Description: "current heap allocation"},
135135
{Name: "span_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of spans received by the collector"},
136136
{Name: "span_processed", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of spans processed by the collector"},

0 commit comments

Comments
 (0)