Skip to content

Commit 5aa8bd2

Browse files
fix(release-3.5.x): backport fix to correctly calculate entry metadata size (#19163)
Signed-off-by: Jordan Rushing <[email protected]> Co-authored-by: JordanRushing <[email protected]>
1 parent 58c0ca1 commit 5aa8bd2

File tree

2 files changed

+173
-4
lines changed

2 files changed

+173
-4
lines changed

pkg/loghttp/push/otlp.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl
300300
entryLbs = lbs
301301
}
302302

303+
// Calculate the entry's own metadata size BEFORE adding resource and scope attributes
304+
// This preserves the intent of tracking entry-specific metadata separately without requiring subtraction
305+
entryOwnMetadataSize := int64(loki_util.StructuredMetadataSize(entry.StructuredMetadata))
306+
303307
// if entry.StructuredMetadata doesn't have capacity to add resource and scope attributes, make a new slice with enough capacity
304308
attributesAsStructuredMetadataLen := len(resourceAttributesAsStructuredMetadata) + len(scopeAttributesAsStructuredMetadata)
305309
if cap(entry.StructuredMetadata) < len(entry.StructuredMetadata)+attributesAsStructuredMetadataLen {
@@ -317,19 +321,19 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl
317321
entryRetentionPeriod := streamResolver.RetentionPeriodFor(entryLbs)
318322
entryPolicy := streamResolver.PolicyFor(entryLbs)
319323

320-
metadataSize := int64(loki_util.StructuredMetadataSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize)
321-
322324
if _, ok := stats.StructuredMetadataBytes[entryPolicy]; !ok {
323325
stats.StructuredMetadataBytes[entryPolicy] = make(map[time.Duration]int64)
324326
}
325-
stats.StructuredMetadataBytes[entryPolicy][entryRetentionPeriod] += metadataSize
327+
// Use the entry's own metadata size (calculated before adding resource/scope attributes)
328+
// This keeps the same accounting intention without risk of negative values
329+
stats.StructuredMetadataBytes[entryPolicy][entryRetentionPeriod] += entryOwnMetadataSize
326330

327331
if _, ok := stats.LogLinesBytes[entryPolicy]; !ok {
328332
stats.LogLinesBytes[entryPolicy] = make(map[time.Duration]int64)
329333
}
330334
stats.LogLinesBytes[entryPolicy][entryRetentionPeriod] += int64(len(entry.Line))
331335

332-
totalBytesReceived += metadataSize
336+
totalBytesReceived += entryOwnMetadataSize
333337
totalBytesReceived += int64(len(entry.Line))
334338

335339
stats.PolicyNumLines[entryPolicy]++

pkg/loghttp/push/otlp_test.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -972,6 +972,171 @@ func TestOTLPLogAttributesAsIndexLabels(t *testing.T) {
972972
require.Equal(t, int64(3), stats.PolicyNumLines["test-policy"], "Should have counted 3 log lines")
973973
}
974974

975+
func TestOTLPStructuredMetadataCalculation(t *testing.T) {
976+
now := time.Unix(0, time.Now().UnixNano())
977+
978+
generateLogs := func() plog.Logs {
979+
ld := plog.NewLogs()
980+
981+
// Create resource with attributes
982+
rl := ld.ResourceLogs().AppendEmpty()
983+
rl.Resource().Attributes().PutStr("service.name", "test-service")
984+
rl.Resource().Attributes().PutStr("resource.key", "resource.value")
985+
986+
// Create scope with attributes
987+
sl := rl.ScopeLogs().AppendEmpty()
988+
sl.Scope().SetName("test-scope")
989+
sl.Scope().Attributes().PutStr("scope.key", "scope.value")
990+
991+
// Add a log record with minimal metadata
992+
logRecord := sl.LogRecords().AppendEmpty()
993+
logRecord.Body().SetStr("Test entry with minimal metadata")
994+
logRecord.SetTimestamp(pcommon.Timestamp(now.UnixNano()))
995+
logRecord.Attributes().PutStr("entry.key", "entry.value")
996+
997+
return ld
998+
}
999+
1000+
// Run the test
1001+
stats := NewPushStats()
1002+
tracker := NewMockTracker()
1003+
streamResolver := newMockStreamResolver("fake", &fakeLimits{})
1004+
1005+
streamResolver.policyForOverride = func(_ labels.Labels) string {
1006+
return "test-policy"
1007+
}
1008+
1009+
// Convert OTLP logs to Loki push request
1010+
pushReq := otlpToLokiPushRequest(
1011+
context.Background(),
1012+
generateLogs(),
1013+
"test-user",
1014+
DefaultOTLPConfig(defaultGlobalOTLPConfig),
1015+
[]string{}, // discoverServiceName
1016+
tracker,
1017+
stats,
1018+
false,
1019+
log.NewNopLogger(),
1020+
streamResolver,
1021+
)
1022+
1023+
// Verify there is exactly one stream
1024+
require.Equal(t, 1, len(pushReq.Streams))
1025+
1026+
// Verify we have a single entry with all the expected metadata
1027+
stream := pushReq.Streams[0]
1028+
require.Equal(t, 1, len(stream.Entries))
1029+
1030+
// Verify the structured metadata bytes are positive
1031+
require.Greater(t, stats.StructuredMetadataBytes["test-policy"][time.Hour], int64(0),
1032+
"Structured metadata bytes should be positive")
1033+
1034+
// Verify we can find the resource, scope, and entry metadata in the entry
1035+
entry := stream.Entries[0]
1036+
1037+
resourceMetadataFound := false
1038+
scopeMetadataFound := false
1039+
entryMetadataFound := false
1040+
1041+
for _, metadata := range entry.StructuredMetadata {
1042+
if metadata.Name == "resource_key" && metadata.Value == "resource.value" {
1043+
resourceMetadataFound = true
1044+
}
1045+
if metadata.Name == "scope_key" && metadata.Value == "scope.value" {
1046+
scopeMetadataFound = true
1047+
}
1048+
if metadata.Name == "entry_key" && metadata.Value == "entry.value" {
1049+
entryMetadataFound = true
1050+
}
1051+
}
1052+
1053+
require.True(t, resourceMetadataFound, "Resource metadata should be present in the entry")
1054+
require.True(t, scopeMetadataFound, "Scope metadata should be present in the entry")
1055+
require.True(t, entryMetadataFound, "Entry metadata should be present in the entry")
1056+
}
1057+
1058+
func TestNegativeMetadataScenarioExplicit(t *testing.T) {
1059+
// This test explicitly demonstrates how negative structured metadata size values
1060+
// could occur when subtracting resource/scope attributes from total structured metadata size
1061+
1062+
// Setup: Create metadata with a label that would be excluded from size calculation
1063+
resourceMeta := push.LabelsAdapter{
1064+
{Name: "resource_key", Value: "resource_value"}, // 27 bytes
1065+
{Name: "excluded_label", Value: "value"}, // This would be excluded from size calculation
1066+
}
1067+
1068+
scopeMeta := push.LabelsAdapter{
1069+
{Name: "scope_key", Value: "scope_value"}, // 20 bytes
1070+
}
1071+
1072+
entryMeta := push.LabelsAdapter{
1073+
{Name: "entry_key", Value: "entry_value"}, // 20 bytes
1074+
}
1075+
1076+
// ExcludedStructuredMetadataLabels would exclude certain labels
1077+
// from size calculations.
1078+
calculateSize := func(labels push.LabelsAdapter) int {
1079+
size := 0
1080+
for _, label := range labels {
1081+
// Simulate a label being excluded from size calc
1082+
if label.Name != "excluded_label" {
1083+
size += len(label.Name) + len(label.Value)
1084+
}
1085+
}
1086+
return size
1087+
}
1088+
1089+
// Calculate sizes with simulated exclusions
1090+
resourceSize := calculateSize(resourceMeta) // 27 bytes (excluded_label not counted)
1091+
scopeSize := calculateSize(scopeMeta) // 20 bytes
1092+
entrySize := calculateSize(entryMeta) // 20 bytes
1093+
1094+
// The original approach:
1095+
// 1. Add resource and scope attributes to entry metadata
1096+
combined := make(push.LabelsAdapter, 0)
1097+
combined = append(combined, entryMeta...)
1098+
combined = append(combined, resourceMeta...)
1099+
combined = append(combined, scopeMeta...)
1100+
1101+
// 2. Calculate combined size (with certain labels excluded)
1102+
combinedSize := calculateSize(combined) // Should be 27 + 20 + 20 = 67 bytes
1103+
1104+
// 3. Calculate entry-specific metadata by subtraction
1105+
// metadataSize := int64(combinedSize - resourceSize - scopeSize)
1106+
oldCalculation := combinedSize - resourceSize - scopeSize
1107+
1108+
// Should be: 67 - 27 - 20 = 20 bytes, which equals entrySize
1109+
1110+
t.Logf("Resource size: %d bytes", resourceSize)
1111+
t.Logf("Scope size: %d bytes", scopeSize)
1112+
t.Logf("Entry size: %d bytes", entrySize)
1113+
t.Logf("Combined size: %d bytes", combinedSize)
1114+
t.Logf("Old calculation (combined - resource - scope): %d bytes", oldCalculation)
1115+
1116+
// Now, to demonstrate how this could produce negative values:
1117+
// In reality, due to potential inconsistencies in how labels were excluded/combined/normalized,
1118+
// the combined size could be LESS than the sum of parts
1119+
simulatedRealCombinedSize := resourceSize + scopeSize - 5 // 5 bytes less than sum
1120+
1121+
// Using the original calculation method:
1122+
simulatedRealCalculation := simulatedRealCombinedSize - resourceSize - scopeSize
1123+
// This will be: (27 + 20 - 5) - 27 - 20 = 42 - 47 = -5 bytes
1124+
1125+
t.Logf("Simulated real combined size: %d bytes", simulatedRealCombinedSize)
1126+
t.Logf("Simulated real calculation (old method): %d bytes", simulatedRealCalculation)
1127+
1128+
// This would be a negative value!
1129+
require.Less(t, simulatedRealCalculation, 0,
1130+
"This demonstrates how the old calculation could produce negative values")
1131+
1132+
// Directly use entry's size before combining
1133+
t.Logf("New calculation (direct entry size): %d bytes", entrySize)
1134+
require.Equal(t, entrySize, 20,
1135+
"New calculation provides correct entry size")
1136+
require.Greater(t, entrySize, 0,
1137+
"New calculation always produces non-negative values")
1138+
}
1139+
9751140
func TestOTLPSeverityTextAsLabel(t *testing.T) {
9761141
now := time.Unix(0, time.Now().UnixNano())
9771142

0 commit comments

Comments
 (0)