Skip to content

Commit e217163

Browse files
authored
[clickhouse] Add handling for complex attributes to ClickHouse storage (#7627)
## Which problem is this PR solving? - Towards #7134 and #7135 ## Description of the changes - This PR adds support for map and slice attributes in ClickHouse storage. It does so by leveraging the new API added in open-telemetry/opentelemetry-collector#13945 to Marshal the attributes to JSON and then storing them as Base64 encoded strings ## How was this change tested? - CI / unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Mahad Zaryab <[email protected]>
1 parent 528476f commit e217163

File tree

8 files changed

+514
-235
lines changed

8 files changed

+514
-235
lines changed

internal/storage/v2/clickhouse/tracestore/assert_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/stretchr/testify/require"
1313
"go.opentelemetry.io/collector/pdata/pcommon"
1414
"go.opentelemetry.io/collector/pdata/ptrace"
15+
"go.opentelemetry.io/collector/pdata/xpdata"
1516

1617
"github.com/jaegertracing/jaeger/internal/jptrace"
1718
"github.com/jaegertracing/jaeger/internal/storage/v2/clickhouse/tracestore/dbmodel"
@@ -126,6 +127,24 @@ func requireComplexAttrs(t *testing.T, expectedKeys []string, expectedVals []str
126127
decoded, err := base64.StdEncoding.DecodeString(expectedVals[i])
127128
require.NoError(t, err)
128129
require.Equal(t, decoded, val.Bytes().AsRaw())
130+
case strings.HasPrefix(k, "@map@"):
131+
key := strings.TrimPrefix(expectedKeys[i], "@map@")
132+
val, ok := attrs.Get(key)
133+
require.True(t, ok)
134+
135+
m := &xpdata.JSONUnmarshaler{}
136+
expectedVal, err := m.UnmarshalValue([]byte(expectedVals[i]))
137+
require.NoError(t, err)
138+
require.True(t, expectedVal.Map().Equal(val.Map()))
139+
case strings.HasPrefix(k, "@slice@"):
140+
key := strings.TrimPrefix(expectedKeys[i], "@slice@")
141+
val, ok := attrs.Get(key)
142+
require.True(t, ok)
143+
144+
m := &xpdata.JSONUnmarshaler{}
145+
expectedVal, err := m.UnmarshalValue([]byte(expectedVals[i]))
146+
require.NoError(t, err)
147+
require.True(t, expectedVal.Slice().Equal(val.Slice()))
129148
default:
130149
t.Fatalf("unsupported complex attribute key: %s", k)
131150
}

internal/storage/v2/clickhouse/tracestore/dbmodel/dbmodel_test.go

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/stretchr/testify/require"
1212
"go.opentelemetry.io/collector/pdata/pcommon"
1313
"go.opentelemetry.io/collector/pdata/ptrace"
14+
"go.opentelemetry.io/collector/pdata/xpdata"
1415

1516
"github.com/jaegertracing/jaeger/internal/telemetry/otelsemconv"
1617
)
@@ -32,7 +33,7 @@ func TestRoundTrip(t *testing.T) {
3233
})
3334

3435
t.Run("FromRow->ToRow", func(t *testing.T) {
35-
spanRow := createTestSpanRow(now, duration)
36+
spanRow := createTestSpanRow(t, now, duration)
3637

3738
trace := FromRow(spanRow)
3839
rs := trace.ResourceSpans().At(0).Resource()
@@ -114,10 +115,27 @@ func addTestAttributes(attrs pcommon.Map) {
114115
attrs.PutInt("int_attr", 42)
115116
attrs.PutStr("string_attr", "string_value")
116117
attrs.PutEmptyBytes("bytes_attr").FromRaw([]byte("bytes_value"))
118+
attrs.PutEmptyMap("map_attr").FromRaw(map[string]any{"key": "value"})
119+
attrs.PutEmptySlice("slice_attr").FromRaw([]any{1, 2, 3})
117120
}
118121

119-
func createTestSpanRow(now time.Time, duration time.Duration) *SpanRow {
122+
func createTestSpanRow(t *testing.T, now time.Time, duration time.Duration) *SpanRow {
123+
t.Helper()
120124
encodedBytes := base64.StdEncoding.EncodeToString([]byte("bytes_value"))
125+
126+
vm := pcommon.NewValueMap()
127+
vm.Map().PutStr("key", "value")
128+
m := &xpdata.JSONMarshaler{}
129+
vmJSON, err := m.MarshalValue(vm)
130+
require.NoError(t, err)
131+
132+
vs := pcommon.NewValueSlice()
133+
vs.Slice().AppendEmpty().SetInt(1)
134+
vs.Slice().AppendEmpty().SetInt(2)
135+
vs.Slice().AppendEmpty().SetInt(3)
136+
vsJSON, err := m.MarshalValue(vs)
137+
require.NoError(t, err)
138+
121139
return &SpanRow{
122140
ID: "0000000000000001",
123141
TraceID: "00000000000000000000000000000001",
@@ -138,8 +156,8 @@ func createTestSpanRow(now time.Time, duration time.Duration) *SpanRow {
138156
IntValues: []int64{42},
139157
StrKeys: []string{"string_attr"},
140158
StrValues: []string{"string_value"},
141-
ComplexKeys: []string{"@bytes@bytes_attr"},
142-
ComplexValues: []string{encodedBytes},
159+
ComplexKeys: []string{"@bytes@bytes_attr", "@map@map_attr", "@slice@slice_attr"},
160+
ComplexValues: []string{encodedBytes, string(vmJSON), string(vsJSON)},
143161
},
144162
EventNames: []string{"test-event"},
145163
EventTimestamps: []time.Time{now},
@@ -152,8 +170,8 @@ func createTestSpanRow(now time.Time, duration time.Duration) *SpanRow {
152170
IntValues: [][]int64{{42}},
153171
StrKeys: [][]string{{"string_attr"}},
154172
StrValues: [][]string{{"string_value"}},
155-
ComplexKeys: [][]string{{"@bytes@bytes_attr"}},
156-
ComplexValues: [][]string{{encodedBytes}},
173+
ComplexKeys: [][]string{{"@bytes@bytes_attr", "@map@map_attr", "@slice@slice_attr"}},
174+
ComplexValues: [][]string{{encodedBytes, string(vmJSON), string(vsJSON)}},
157175
},
158176
LinkTraceIDs: []string{"00000000000000000000000000000003"},
159177
LinkSpanIDs: []string{"0000000000000004"},
@@ -167,8 +185,8 @@ func createTestSpanRow(now time.Time, duration time.Duration) *SpanRow {
167185
IntValues: [][]int64{{42}},
168186
StrKeys: [][]string{{"string_attr"}},
169187
StrValues: [][]string{{"string_value"}},
170-
ComplexKeys: [][]string{{"@bytes@bytes_attr"}},
171-
ComplexValues: [][]string{{encodedBytes}},
188+
ComplexKeys: [][]string{{"@bytes@bytes_attr", "@map@map_attr", "@slice@slice_attr"}},
189+
ComplexValues: [][]string{{encodedBytes, string(vmJSON), string(vsJSON)}},
172190
},
173191
ServiceName: "test-service",
174192
ResourceAttributes: Attributes{
@@ -180,8 +198,8 @@ func createTestSpanRow(now time.Time, duration time.Duration) *SpanRow {
180198
IntValues: []int64{42},
181199
StrKeys: []string{"service.name", "string_attr"},
182200
StrValues: []string{"test-service", "string_value"},
183-
ComplexKeys: []string{"@bytes@bytes_attr"},
184-
ComplexValues: []string{encodedBytes},
201+
ComplexKeys: []string{"@bytes@bytes_attr", "@map@map_attr", "@slice@slice_attr"},
202+
ComplexValues: []string{encodedBytes, string(vmJSON), string(vsJSON)},
185203
},
186204
ScopeName: "test-scope",
187205
ScopeVersion: "v1.0.0",
@@ -194,8 +212,8 @@ func createTestSpanRow(now time.Time, duration time.Duration) *SpanRow {
194212
IntValues: []int64{42},
195213
StrKeys: []string{"string_attr"},
196214
StrValues: []string{"string_value"},
197-
ComplexKeys: []string{"@bytes@bytes_attr"},
198-
ComplexValues: []string{encodedBytes},
215+
ComplexKeys: []string{"@bytes@bytes_attr", "@map@map_attr", "@slice@slice_attr"},
216+
ComplexValues: []string{encodedBytes, string(vmJSON), string(vsJSON)},
199217
},
200218
}
201219
}

internal/storage/v2/clickhouse/tracestore/dbmodel/from.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"go.opentelemetry.io/collector/pdata/pcommon"
1414
"go.opentelemetry.io/collector/pdata/ptrace"
15+
"go.opentelemetry.io/collector/pdata/xpdata"
1516

1617
"github.com/jaegertracing/jaeger/internal/jptrace"
1718
"github.com/jaegertracing/jaeger/internal/telemetry/otelsemconv"
@@ -169,14 +170,51 @@ func putAttributes(
169170
attrs.PutStr(storedAttrs.StrKeys[i], storedAttrs.StrValues[i])
170171
}
171172
for i := 0; i < len(storedAttrs.ComplexKeys); i++ {
172-
if strings.HasPrefix(storedAttrs.ComplexKeys[i], "@bytes@") {
173+
switch {
174+
case strings.HasPrefix(storedAttrs.ComplexKeys[i], "@bytes@"):
173175
decoded, err := base64.StdEncoding.DecodeString(storedAttrs.ComplexValues[i])
174176
if err != nil {
175177
jptrace.AddWarnings(spanForWarnings, fmt.Sprintf("failed to decode bytes attribute %q: %s", storedAttrs.ComplexKeys[i], err.Error()))
176178
continue
177179
}
178180
k := strings.TrimPrefix(storedAttrs.ComplexKeys[i], "@bytes@")
179181
attrs.PutEmptyBytes(k).FromRaw(decoded)
182+
case strings.HasPrefix(storedAttrs.ComplexKeys[i], "@slice@"):
183+
k := strings.TrimPrefix(storedAttrs.ComplexKeys[i], "@slice@")
184+
m := &xpdata.JSONUnmarshaler{}
185+
val, err := m.UnmarshalValue([]byte(storedAttrs.ComplexValues[i]))
186+
if err != nil {
187+
jptrace.AddWarnings(
188+
spanForWarnings,
189+
fmt.Sprintf(
190+
"failed to unmarshal slice attribute %q: %s",
191+
storedAttrs.ComplexKeys[i],
192+
err.Error(),
193+
),
194+
)
195+
continue
196+
}
197+
attrs.PutEmptySlice(k).FromRaw(val.Slice().AsRaw())
198+
case strings.HasPrefix(storedAttrs.ComplexKeys[i], "@map@"):
199+
k := strings.TrimPrefix(storedAttrs.ComplexKeys[i], "@map@")
200+
m := &xpdata.JSONUnmarshaler{}
201+
val, err := m.UnmarshalValue([]byte(storedAttrs.ComplexValues[i]))
202+
if err != nil {
203+
jptrace.AddWarnings(
204+
spanForWarnings,
205+
fmt.Sprintf("failed to unmarshal map attribute %q: %s",
206+
storedAttrs.ComplexKeys[i],
207+
err.Error(),
208+
),
209+
)
210+
continue
211+
}
212+
attrs.PutEmptyMap(k).FromRaw(val.Map().AsRaw())
213+
default:
214+
jptrace.AddWarnings(
215+
spanForWarnings,
216+
fmt.Sprintf("unsupported complex attribute key: %q", storedAttrs.ComplexKeys[i]),
217+
)
180218
}
181219
}
182220
}

internal/storage/v2/clickhouse/tracestore/dbmodel/from_test.go

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func TestFromRow(t *testing.T) {
1818
now := time.Now().UTC()
1919
duration := 2 * time.Second
2020

21-
spanRow := createTestSpanRow(now, duration)
21+
spanRow := createTestSpanRow(t, now, duration)
2222

2323
expected := createTestTrace(now, duration)
2424

@@ -89,23 +89,55 @@ func TestFromRow_DecodeID(t *testing.T) {
8989
}
9090

9191
func TestPutAttributes_Warnings(t *testing.T) {
92-
t.Run("bytes attribute with invalid base64", func(t *testing.T) {
93-
span := ptrace.NewSpan()
94-
attributes := pcommon.NewMap()
92+
tests := []struct {
93+
name string
94+
complexKeys []string
95+
complexValues []string
96+
expectedWarnContains string
97+
}{
98+
{
99+
name: "bytes attribute with invalid base64",
100+
complexKeys: []string{"@bytes@bytes-key"},
101+
complexValues: []string{"invalid-base64"},
102+
expectedWarnContains: "failed to decode bytes attribute \"@bytes@bytes-key\"",
103+
},
104+
{
105+
name: "failed to unmarshal slice attribute",
106+
complexKeys: []string{"@slice@slice-key"},
107+
complexValues: []string{"notjson"},
108+
expectedWarnContains: "failed to unmarshal slice attribute \"@slice@slice-key\"",
109+
},
110+
{
111+
name: "failed to unmarshal map attribute",
112+
complexKeys: []string{"@map@map-key"},
113+
complexValues: []string{"notjson"},
114+
expectedWarnContains: "failed to unmarshal map attribute \"@map@map-key\"",
115+
},
116+
{
117+
name: "unsupported complex attribute key",
118+
complexKeys: []string{"unsupported"},
119+
complexValues: []string{"{\"kvlistValue\":{\"values\":[{\"key\":\"key\",\"value\":{\"stringValue\":\"value\"}}]}}"},
120+
expectedWarnContains: "unsupported complex attribute key: \"unsupported\"",
121+
},
122+
}
95123

96-
putAttributes(
97-
attributes,
98-
&Attributes{
99-
ComplexKeys: []string{"@bytes@bytes-key"},
100-
ComplexValues: []string{"invalid-base64"},
101-
},
102-
span,
103-
)
124+
for _, tt := range tests {
125+
t.Run(tt.name, func(t *testing.T) {
126+
span := ptrace.NewSpan()
127+
attributes := pcommon.NewMap()
128+
129+
putAttributes(
130+
attributes,
131+
&Attributes{
132+
ComplexKeys: tt.complexKeys,
133+
ComplexValues: tt.complexValues,
134+
},
135+
span,
136+
)
104137

105-
_, ok := attributes.Get("bytes-key")
106-
require.False(t, ok)
107-
warnings := jptrace.GetWarnings(span)
108-
require.Len(t, warnings, 1)
109-
require.Contains(t, warnings[0], "failed to decode bytes attribute \"@bytes@bytes-key\"")
110-
})
138+
warnings := jptrace.GetWarnings(span)
139+
require.Len(t, warnings, 1)
140+
require.Contains(t, warnings[0], tt.expectedWarnContains)
141+
})
142+
}
111143
}

internal/storage/v2/clickhouse/tracestore/dbmodel/spanrow.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,22 @@ import (
99
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
1010
)
1111

12-
// SpanRow represents a single row in the ClickHouse `spans` table.
12+
// SpanRow represents a single record in the ClickHouse `spans` table.
1313
//
14-
// Complex attributes are attributes that are not of a primitive type and hence need special handling.
15-
// The following OTLP types are stored in the complex attributes fields:
16-
// - AnyValue_BytesValue: This OTLP type is stored as a base64-encoded string. The key
17-
// for this type will begin with `@bytes@`.
18-
// - AnyValue_ArrayValue: This OTLP type is stored as a JSON-encoded string.
19-
// The key for this type will begin with `@array@`.
20-
// - AnyValue_KVListValue: This OTLP type is stored as a JSON-encoded string.
21-
// The key for this type will begin with `@kvlist@`.
14+
// Complex attributes are non-primitive OTLP types that require special serialization
15+
// before being stored. These types are encoded as follows:
16+
//
17+
// - pcommon.ValueTypeBytes:
18+
// Represents raw byte data. The value is Base64-encoded and stored as a string.
19+
// Keys for this type are prefixed with `@bytes@`.
20+
//
21+
// - pcommon.ValueTypeSlice:
22+
// Represents an OTLP slice (array). The value is serialized to JSON and stored
23+
// as a string. Keys for this type are prefixed with `@slice@`.
24+
//
25+
// - pcommon.ValueTypeMap:
26+
// Represents an OTLP map. The value is serialized to JSON and stored
27+
// as a string. Keys for this type are prefixed with `@map@`.
2228
type SpanRow struct {
2329
// --- Span ---
2430
ID string

0 commit comments

Comments
 (0)