Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/aggregator/aggregator_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func makeMetricCache(journal *metajournal.MetricsStorage) *metricIndexCache {
journal: journal,
ingestionStatusData: lastMetricData{lastMetricPrekey: -1},
lastMetric: lastMetricData{lastMetricPrekey: -1}, // so if somehow 0 metricID is inserted first, will have no prekey

}
bm := format.BuiltinMetricMetaIngestionStatus
result.ingestionStatusData.lastMetricPrekeyOnly = bm.PreKeyOnly
Expand Down
9 changes: 4 additions & 5 deletions internal/aggregator/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ func getBuiltinShardingMetrics() map[int32]int {

for metricID, metric := range format.BuiltinMetrics {
if metric.ShardStrategy == format.ShardBuiltin {
// For metrics with MetricTagID > 0, use the specified tag
// For metrics with MetricTagID = 0 (like contributors_log), they appear in all shards
if metric.MetricTagID > 0 {
shardingMetrics[metricID] = int(metric.MetricTagID)
// For metrics with MetricTagIndex > 0, use the specified tag
// For metrics with MetricTagIndex = 0 (like contributors_log), they appear in all shards
if metric.MetricTagIndex > 0 {
shardingMetrics[metricID] = int(metric.MetricTagIndex)
} else {
// Special case: metrics with MetricTagID = 0 appear in all shards
shardingMetrics[metricID] = -1 // Use -1 to indicate "appears in all shards"
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/data_model/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,13 @@ func (h *sampler) Run(budget int64) {
h.items[i].metric = h.getMetricMeta(h.items[i].MetricID)
}
}
if h.SampleKeys && len(h.items[i].metric.FairKey) != 0 {
n := len(h.items[i].metric.FairKey)
if h.SampleKeys && len(h.items[i].metric.FairKeyIndex) != 0 {
n := len(h.items[i].metric.FairKeyIndex)
if n > maxFairKeyLen {
n = maxFairKeyLen
}
for j := 0; j < n; j++ {
if x := h.items[i].metric.FairKey[j]; 0 <= x && x < len(h.items[i].Item.Key.Tags) {
if x := h.items[i].metric.FairKeyIndex[j]; 0 <= x && x < len(h.items[i].Item.Key.Tags) {
h.items[i].fairKey[j] = h.items[i].Item.Key.Tags[x]
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/data_model/sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func TestFairKeySampling(t *testing.T) {
Meta: metaStorageMock{
getMetaMetric: func(metricID int32) *format.MetricMetaValue {
return &format.MetricMetaValue{
FairKey: []int{0},
FairKeyIndex: []int{0},
EffectiveWeight: 1,
}
},
Expand Down
30 changes: 15 additions & 15 deletions internal/format/builtin_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ Set only if greater than 1.`,
Description: "agent_shard",
RawKind: "int",
}},
PreKeyTagID: "1",
ShardStrategy: ShardBuiltin, // sharded the same way as metric it describes
MetricTagID: 1,
PreKeyTagID: "1",
ShardStrategy: ShardBuiltin, // sharded the same way as metric it describes
MetricTagIndex: 1,
}

const BuiltinMetricIDAggBucketReceiveDelaySec = -2 // Also approximates insert delay, interesting for historic buckets
Expand Down Expand Up @@ -153,9 +153,9 @@ So avg() of this metric shows estimated full cardinality with or without groupin
Description: "metric",
BuiltinKind: BuiltinKindMetric,
}},
PreKeyTagID: "4",
ShardStrategy: ShardBuiltin, // sharded the same way as metric it describes
MetricTagID: 4,
PreKeyTagID: "4",
ShardStrategy: ShardBuiltin, // sharded the same way as metric it describes
MetricTagIndex: 4,
}

const BuiltinMetricIDAggSamplingFactor = -10
Expand All @@ -179,9 +179,9 @@ Set only if greater than 1.`,
TagValueIDAggSamplingFactorReasonInsertSize: "insert_size",
}),
}},
PreKeyTagID: "4",
ShardStrategy: ShardBuiltin, // sharded the same way as metrtic it describes
MetricTagID: 4,
PreKeyTagID: "4",
ShardStrategy: ShardBuiltin, // sharded the same way as metrtic it describes
MetricTagIndex: 4,
}

const BuiltinMetricIDIngestionStatus = -11
Expand Down Expand Up @@ -242,9 +242,9 @@ This metric uses sampling budgets of metric it refers to, so flooding by errors
}, {
Description: "tag_id",
}},
PreKeyTagID: "1",
ShardStrategy: ShardBuiltin, // sharded the same way as metrtic it describes
MetricTagID: 1,
PreKeyTagID: "1",
ShardStrategy: ShardBuiltin, // sharded the same way as metrtic it describes
MetricTagIndex: 1,
}

var BuiltinMetricMetaAggInsertTime = &MetricMetaValue{
Expand Down Expand Up @@ -701,9 +701,9 @@ var BuiltinMetricMetaBadges = &MetricMetaValue{
Description: "metric",
BuiltinKind: BuiltinKindMetric,
}},
PreKeyTagID: "2",
ShardStrategy: ShardBuiltin, // sharded the same way as metrtic it describes
MetricTagID: 2,
PreKeyTagID: "2",
ShardStrategy: ShardBuiltin, // sharded the same way as metrtic it describes
MetricTagIndex: 2,
}

var BuiltinMetricMetaAutoConfig = &MetricMetaValue{
Expand Down
10 changes: 5 additions & 5 deletions internal/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,11 @@ type MetricMetaValue struct {
ShardNum uint32 `json:"shard_num,omitempty"` // warning: zero-based, contains clickhouse shard - 1 (clickhouse shard_num is 1-based)
PipelineVersion uint8 `json:"pipeline_version,omitempty"`

MetricTagID uint8 `json:"-"` // 0 means no metric tag, only for builtin metrics, can be used to determine shard
MetricTagIndex uint8 `json:"-"` // 0 means no metric tag, only for builtin metrics, can be used to determine shard
name2Tag map[string]*MetricMetaTag // Should be restored from Tags after reading
EffectiveResolution int `json:"-"` // Should be restored from Tags after reading
PreKeyIndex int `json:"-"` // index of tag which goes to 'prekey' column, or <0 if no tag goes
FairKey []int `json:"-"`
FairKeyIndex []int `json:"-"`
EffectiveWeight int64 `json:"-"`
HasPercentiles bool `json:"-"`
RoundSampleFactors bool `json:"-"` // Experimental, set if magic word in description is found
Expand Down Expand Up @@ -586,10 +586,10 @@ func (m *MetricMetaValue) RestoreCachedInfo() error {
}
// restore fair key index
if len(m.FairKeyTagIDs) != 0 {
m.FairKey = make([]int, 0, len(m.FairKeyTagIDs))
m.FairKeyIndex = make([]int, 0, len(m.FairKeyTagIDs))
for _, v := range m.FairKeyTagIDs {
if tag := m.Name2Tag(v); tag != nil {
m.FairKey = append(m.FairKey, int(tag.Index))
m.FairKeyIndex = append(m.FairKeyIndex, int(tag.Index))
}
}
}
Expand Down Expand Up @@ -1252,7 +1252,7 @@ func SameCompactMetric(a, b *MetricMetaValue) bool {
a.Disable != b.Disable ||
a.EffectiveWeight != b.EffectiveWeight ||
a.EffectiveResolution != b.EffectiveResolution ||
!slices.Equal(a.FairKey, b.FairKey) ||
!slices.Equal(a.FairKeyIndex, b.FairKeyIndex) ||
a.ShardStrategy != b.ShardStrategy ||
a.ShardNum != b.ShardNum ||
a.PipelineVersion != b.PipelineVersion ||
Expand Down
4 changes: 2 additions & 2 deletions internal/sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ func Shard(key *data_model.Key, meta *format.MetricMetaValue, numShards int, sha
shard := uint32(key.Metric) % shardByMetricCount
return shard
case format.ShardBuiltin:
tagId := meta.MetricTagID
tagIndex := meta.MetricTagIndex
// for builtin metrics we always use row values
metric := key.Tags[tagId]
metric := key.Tags[tagIndex]
shard := uint32(metric) % shardByMetricCount
return shard
default: // including format.ShardByTagsHsh
Expand Down
Loading