Skip to content

Commit 58f15ea

Browse files
authored
receive: Fix missing exemplar labels receive capnproto (#8486)
* Fix memory error for exemplar label Signed-off-by: Daniel Buchanan <[email protected]> * Test case for capnproto write Signed-off-by: Daniel Buchanan <[email protected]> * Fix test to check content of labels Signed-off-by: Daniel Buchanan <[email protected]> * Running validateLabels on exemplar labels capnproto Signed-off-by: Daniel Buchanan <[email protected]> * Removing repro of capnproto exemplar labels error Signed-off-by: Daniel Buchanan <[email protected]> --------- Signed-off-by: Daniel Buchanan <[email protected]>
1 parent 13fae5e commit 58f15ea

File tree

4 files changed

+223
-3
lines changed

4 files changed

+223
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
1313
### Fixed
1414

1515
- [#8334](https://github.com/thanos-io/thanos/pull/8334) Query: wait for initial endpoint discovery before becoming ready
16+
- [#8486](https://github.com/thanos-io/thanos/pull/8486) Receive: fix exemplar label corruption from Cap'n Proto memory references
1617

1718
### Added
1819

pkg/receive/capnproto_writer.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,23 @@ func (r *CapNProtoWriter) Write(ctx context.Context, tenantID string, wreq *writ
114114
for _, ex := range series.Exemplars {
115115
exLogger := log.With(tLogger, "exemplarLset", ex.Labels)
116116

117+
// Create completely new strings to detach from Cap'n Proto memory
118+
// Do not want to reference Cap'n Proto memory as it will be empty when stored in the TSDB
119+
builder := labels.NewBuilder(labels.EmptyLabels())
120+
ex.Labels.Range(func(l labels.Label) {
121+
builder.Set(string([]byte(l.Name)), string([]byte(l.Value)))
122+
})
123+
copiedLabels := builder.Labels()
124+
125+
// Validate exemplar labels after copying them out of Cap'n Proto memory
126+
// If moved before copying the labels, Cap'n Proto memory may be freed before validation is complete
127+
if err := validateLabels(copiedLabels); err != nil {
128+
exlset := &labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(copiedLabels)}
129+
errorTracker.addLabelsError(err, exlset, exLogger)
130+
continue
131+
}
117132
if _, err = app.AppendExemplar(ref, lset, exemplar.Exemplar{
118-
Labels: ex.Labels,
133+
Labels: copiedLabels,
119134
Value: ex.Value,
120135
Ts: ex.Ts,
121136
HasTs: true,
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package receive
5+
6+
import (
7+
"context"
8+
"testing"
9+
10+
"github.com/efficientgo/core/testutil"
11+
"github.com/pkg/errors"
12+
"github.com/prometheus/prometheus/model/labels"
13+
"github.com/stretchr/testify/require"
14+
15+
"github.com/thanos-io/thanos/pkg/receive/writecapnp"
16+
"github.com/thanos-io/thanos/pkg/store/labelpb"
17+
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
18+
"github.com/thanos-io/thanos/pkg/tenancy"
19+
)
20+
21+
func TestCapNProtoWriter_Write(t *testing.T) {
22+
t.Parallel()
23+
24+
logger, m, app := setupMultitsdb(t, 1000)
25+
writer := NewCapNProtoWriter(logger, m, &CapNProtoWriterOptions{})
26+
27+
// Create test data with valid exemplars
28+
timeseries := []prompb.TimeSeries{
29+
{
30+
Labels: []labelpb.ZLabel{
31+
{Name: "__name__", Value: "test_metric"},
32+
{Name: "job", Value: "test"},
33+
},
34+
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
35+
Exemplars: []prompb.Exemplar{
36+
{
37+
Labels: []labelpb.ZLabel{
38+
{Name: "trace_id", Value: "abc123"},
39+
{Name: "span_id", Value: "def456"},
40+
},
41+
Value: 10.5,
42+
Timestamp: 10,
43+
},
44+
{
45+
Labels: []labelpb.ZLabel{
46+
{Name: "trace_id", Value: "xyz789"},
47+
{Name: "span_id", Value: "uvw012"},
48+
},
49+
Value: 20.5,
50+
Timestamp: 11,
51+
},
52+
},
53+
},
54+
}
55+
56+
// Create capnproto request
57+
capnpReq, err := writecapnp.Build(tenancy.DefaultTenant, timeseries)
58+
require.NoError(t, err)
59+
60+
wr, err := writecapnp.NewRequest(capnpReq)
61+
require.NoError(t, err)
62+
63+
// Write the request
64+
err = writer.Write(context.Background(), tenancy.DefaultTenant, wr)
65+
require.NoError(t, err)
66+
67+
require.NotNil(t, app)
68+
69+
// Query exemplars back from TSDB to verify they were stored correctly
70+
exemplarClients := m.TSDBExemplars()
71+
require.Contains(t, exemplarClients, tenancy.DefaultTenant, "Should have exemplar client for default tenant")
72+
73+
exemplarClient := exemplarClients[tenancy.DefaultTenant]
74+
require.NotNil(t, exemplarClient, "Exemplar client should not be nil")
75+
76+
srv := &exemplarsServer{ctx: context.Background()}
77+
78+
// get matching exemplar
79+
err = exemplarClient.Exemplars(
80+
[][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "__name__", "test_metric")}},
81+
0, // start time
82+
20, // end time
83+
srv,
84+
)
85+
require.NoError(t, err, "Should be able to query exemplars")
86+
87+
// Verify we got exemplar data back
88+
require.Len(t, srv.Data, 1, "Should have one series with exemplars")
89+
90+
seriesData := srv.Data[0]
91+
require.Len(t, seriesData.Exemplars, 2, "Should have 2 exemplars")
92+
93+
// Verify exemplar labels
94+
firstExemplar := seriesData.Exemplars[0]
95+
require.Equal(t, 10.5, firstExemplar.Value, "First exemplar value should match")
96+
require.Equal(t, int64(10), firstExemplar.Ts, "First exemplar timestamp should match")
97+
98+
// Convert ZLabels to map for easier comparison
99+
firstLabels := make(map[string]string)
100+
for _, label := range firstExemplar.Labels.Labels {
101+
firstLabels[label.Name] = label.Value
102+
}
103+
104+
require.Equal(t, "abc123", firstLabels["trace_id"], "First exemplar trace_id should match")
105+
require.Equal(t, "def456", firstLabels["span_id"], "First exemplar span_id should match")
106+
107+
// Verify the second exemplar labels
108+
secondExemplar := seriesData.Exemplars[1]
109+
require.Equal(t, 20.5, secondExemplar.Value, "Second exemplar value should match")
110+
require.Equal(t, int64(11), secondExemplar.Ts, "Second exemplar timestamp should match")
111+
112+
secondLabels := make(map[string]string)
113+
for _, label := range secondExemplar.Labels.Labels {
114+
secondLabels[label.Name] = label.Value
115+
}
116+
117+
require.Equal(t, "xyz789", secondLabels["trace_id"], "Second exemplar trace_id should match")
118+
require.Equal(t, "uvw012", secondLabels["span_id"], "Second exemplar span_id should match")
119+
}
120+
121+
func TestCapNProtoWriter_ValidateExemplarLabels(t *testing.T) {
122+
t.Parallel()
123+
124+
lbls := []labelpb.ZLabel{{Name: "__name__", Value: "test"}}
125+
tests := map[string]struct {
126+
reqs []*prompb.WriteRequest
127+
expectedErr error
128+
expectedIngested []prompb.TimeSeries
129+
maxExemplars int64
130+
opts *WriterOptions
131+
}{
132+
"should succeed on valid series with exemplars": {
133+
reqs: []*prompb.WriteRequest{{
134+
Timeseries: []prompb.TimeSeries{
135+
{
136+
Labels: lbls,
137+
// Ingesting an exemplar requires a sample to create the series first.
138+
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
139+
Exemplars: []prompb.Exemplar{
140+
{
141+
Labels: []labelpb.ZLabel{{Name: "trace_id", Value: "123"}},
142+
Value: 11,
143+
Timestamp: 12,
144+
},
145+
},
146+
},
147+
},
148+
}},
149+
expectedErr: nil,
150+
maxExemplars: 2,
151+
},
152+
"should fail on empty exemplar label name": {
153+
reqs: []*prompb.WriteRequest{{
154+
Timeseries: []prompb.TimeSeries{
155+
{
156+
Labels: lbls,
157+
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
158+
Exemplars: []prompb.Exemplar{
159+
{
160+
Labels: []labelpb.ZLabel{{Name: "", Value: "123"}},
161+
Value: 11,
162+
Timestamp: 12,
163+
},
164+
},
165+
},
166+
},
167+
}},
168+
expectedErr: errors.Wrapf(labelpb.ErrEmptyLabels, "add 1 series"),
169+
maxExemplars: 2,
170+
},
171+
}
172+
173+
for testName, testData := range tests {
174+
t.Run(testName, func(t *testing.T) {
175+
t.Run("capnproto_writer", func(t *testing.T) {
176+
logger, m, app := setupMultitsdb(t, testData.maxExemplars)
177+
178+
opts := &CapNProtoWriterOptions{}
179+
if testData.opts != nil {
180+
opts.TooFarInFutureTimeWindow = testData.opts.TooFarInFutureTimeWindow
181+
}
182+
w := NewCapNProtoWriter(logger, m, opts)
183+
184+
for idx, req := range testData.reqs {
185+
capnpReq, err := writecapnp.Build(tenancy.DefaultTenant, req.Timeseries)
186+
testutil.Ok(t, err)
187+
188+
wr, err := writecapnp.NewRequest(capnpReq)
189+
testutil.Ok(t, err)
190+
err = w.Write(context.Background(), tenancy.DefaultTenant, wr)
191+
192+
if testData.expectedErr == nil || idx < len(testData.reqs)-1 {
193+
testutil.Ok(t, err)
194+
} else {
195+
testutil.NotOk(t, err)
196+
testutil.Equals(t, testData.expectedErr.Error(), err.Error())
197+
}
198+
}
199+
200+
assertWrittenData(t, app, testData.expectedIngested)
201+
})
202+
})
203+
}
204+
}

test/e2e/receive_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1275,10 +1275,10 @@ func TestReceiveCpnp(t *testing.T) {
12751275
},
12761276
}, v)
12771277

1278-
// TODO(GiedriusS): repro for https://github.com/thanos-io/thanos/issues/8224. Fix in following PRs.
1278+
// Verify fix for https://github.com/thanos-io/thanos/issues/8224
12791279
queryExemplars(
12801280
t, context.Background(), q.Endpoint("http"), "myself", timestamp.FromTime(ts), timestamp.FromTime(ts), func(data []*exemplarspb.ExemplarData) error {
1281-
require.Equal(t, "\000\000\000\000\000\000\000", data[0].Exemplars[0].Labels.Labels[0].Name)
1281+
require.Equal(t, "receive", data[0].Exemplars[0].Labels.Labels[0].Name)
12821282
return nil
12831283
},
12841284
)

0 commit comments

Comments
 (0)