Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 77 additions & 70 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/server/http/middleware"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
grpc_tracing "github.com/thanos-io/thanos/pkg/tracing/tracing_middleware"
)
Expand Down Expand Up @@ -117,15 +116,14 @@ func (r remoteEndpoints) Engines() []api.RemoteEngine {
type remoteEngine struct {
opts Opts
logger log.Logger

client Client

mintOnce sync.Once
mint int64
maxtOnce sync.Once
maxt int64
labelSetsOnce sync.Once
labelSets []labels.Labels
initOnce sync.Once

mint int64
maxt int64
labelSets []labels.Labels
partitionLabelSets []labels.Labels
}

func NewRemoteEngine(logger log.Logger, queryClient Client, opts Opts) *remoteEngine {
Expand All @@ -136,100 +134,109 @@ func NewRemoteEngine(logger log.Logger, queryClient Client, opts Opts) *remoteEn
}
}

// MinT returns the minimum timestamp that is safe to query in the remote engine.
// In order to calculate it, we find the highest min time for each label set, and we return
// the lowest of those values.
// Calculating the MinT this way makes remote queries resilient to cases where one tsdb replica would delete
// a block due to retention before other replicas did the same.
// See https://github.com/thanos-io/promql-engine/issues/187.
func (r *remoteEngine) MinT() int64 {
func (r *remoteEngine) init() {
r.initOnce.Do(func() {
replicaLabelSet := make(map[string]struct{})
for _, lbl := range r.opts.ReplicaLabels {
replicaLabelSet[lbl] = struct{}{}
}
partitionLabelsSet := make(map[string]struct{})
for _, lbl := range r.opts.PartitionLabels {
partitionLabelsSet[lbl] = struct{}{}
}

r.mintOnce.Do(func() {
// strip out replica labels and scopes the remaining labels
// onto the partition labels if they are set.

// partitionLabelSets are used to compute how to push down, they are the minimum set of labels
// that form a partition of the remote engines.
// labelSets are all labelsets of the remote engine, they are used for fan-out pruning on labels
// that dont meaningfully contribute to the partitioning but are still useful.
var (
hashBuf = make([]byte, 0, 128)
highestMintByLabelSet = make(map[uint64]int64)

labelSetsBuilder labels.ScratchBuilder
partitionLabelSetsBuilder labels.ScratchBuilder

labelSets = make([]labels.Labels, 0, len(r.client.tsdbInfos))
partitionLabelSets = make([]labels.Labels, 0, len(r.client.tsdbInfos))
)
for _, lset := range r.adjustedInfos() {
key, _ := labelpb.ZLabelsToPromLabels(lset.Labels.Labels).HashWithoutLabels(hashBuf)
for _, info := range r.client.tsdbInfos {
labelSetsBuilder.Reset()
partitionLabelSetsBuilder.Reset()
for _, lbl := range info.Labels.Labels {
if _, ok := replicaLabelSet[lbl.Name]; ok {
continue
}
labelSetsBuilder.Add(lbl.Name, lbl.Value)
if _, ok := partitionLabelsSet[lbl.Name]; !ok && len(partitionLabelsSet) > 0 {
continue
}
partitionLabelSetsBuilder.Add(lbl.Name, lbl.Value)
}

partitionLabelSet := partitionLabelSetsBuilder.Labels()
labelSet := labelSetsBuilder.Labels()
labelSets = append(labelSets, labelSet)
partitionLabelSets = append(partitionLabelSets, partitionLabelSet)

key, _ := partitionLabelSet.HashWithoutLabels(hashBuf)
lsetMinT, ok := highestMintByLabelSet[key]
if !ok {
highestMintByLabelSet[key] = lset.MinTime
highestMintByLabelSet[key] = info.MinTime
continue
}
// If we are querying with overlapping intervals, we want to find the first available timestamp
// otherwise we want to find the last available timestamp.
if r.opts.QueryDistributedWithOverlappingInterval && lset.MinTime < lsetMinT {
highestMintByLabelSet[key] = lset.MinTime
} else if !r.opts.QueryDistributedWithOverlappingInterval && lset.MinTime > lsetMinT {
highestMintByLabelSet[key] = lset.MinTime
if r.opts.QueryDistributedWithOverlappingInterval && info.MinTime < lsetMinT {
highestMintByLabelSet[key] = info.MinTime
} else if !r.opts.QueryDistributedWithOverlappingInterval && info.MinTime > lsetMinT {
highestMintByLabelSet[key] = info.MinTime
}
}
var mint int64 = math.MaxInt64

// mint is the minimum timestamp that is safe to query in the remote engine.
// In order to calculate it, we find the highest min time for each label set, and we return
// the lowest of those values.
// Calculating the MinT this way makes remote queries resilient to cases where one tsdb replica would delete
// a block due to retention before other replicas did the same.
// See https://github.com/thanos-io/promql-engine/issues/187.
var (
mint = int64(math.MaxInt64)
maxt = r.client.tsdbInfos.MaxT()
)
for _, m := range highestMintByLabelSet {
if m < mint {
mint = m
}
}

r.mint = mint
r.maxt = maxt
r.labelSets = labelSets
r.partitionLabelSets = partitionLabelSets
})
}

func (r *remoteEngine) MinT() int64 {
r.init()
return r.mint
}

func (r *remoteEngine) MaxT() int64 {
r.maxtOnce.Do(func() {
r.maxt = r.client.tsdbInfos.MaxT()
})
r.init()
return r.maxt
}

func (r *remoteEngine) PartitionLabelSets() []labels.Labels {
r.labelSetsOnce.Do(func() {
r.labelSets = r.adjustedInfos().LabelSets()
})
return r.labelSets
}

func (r *remoteEngine) LabelSets() []labels.Labels {
r.labelSetsOnce.Do(func() {
r.labelSets = r.adjustedInfos().LabelSets()
})
r.init()
return r.labelSets
}

// adjustedInfos strips out replica labels and scopes the remaining labels
// onto the partition labels if they are set.
func (r *remoteEngine) adjustedInfos() infopb.TSDBInfos {
replicaLabelSet := make(map[string]struct{})
for _, lbl := range r.opts.ReplicaLabels {
replicaLabelSet[lbl] = struct{}{}
}
partitionLabelsSet := make(map[string]struct{})
for _, lbl := range r.opts.PartitionLabels {
partitionLabelsSet[lbl] = struct{}{}
}

// Strip replica labels from the result.
infos := make(infopb.TSDBInfos, 0, len(r.client.tsdbInfos))
var builder labels.ScratchBuilder
for _, info := range r.client.tsdbInfos {
builder.Reset()
for _, lbl := range info.Labels.Labels {
if _, ok := replicaLabelSet[lbl.Name]; ok {
continue
}
if _, ok := partitionLabelsSet[lbl.Name]; !ok && len(partitionLabelsSet) > 0 {
continue
}
builder.Add(lbl.Name, lbl.Value)
}
infos = append(infos, infopb.NewTSDBInfo(
info.MinTime,
info.MaxTime,
labelpb.ZLabelsFromPromLabels(builder.Labels())),
)
}
return infos
func (r *remoteEngine) PartitionLabelSets() []labels.Labels {
r.init()
return r.partitionLabelSets
}

func (r *remoteEngine) NewRangeQuery(_ context.Context, _ promql.QueryOpts, plan api.RemoteQuery, start, end time.Time, interval time.Duration) (promql.Query, error) {
Expand Down
48 changes: 28 additions & 20 deletions pkg/query/remote_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,37 +105,42 @@ func TestRemoteEngine_LabelSets(t *testing.T) {
t.Parallel()

tests := []struct {
name string
tsdbInfos []infopb.TSDBInfo
replicaLabels []string
expected []labels.Labels
partitionLabels []string
name string
tsdbInfos []infopb.TSDBInfo
replicaLabels []string
partitionLabels []string
expectedLabelSets []labels.Labels
expectedPartitionLabelSets []labels.Labels
}{
{
name: "empty label sets",
tsdbInfos: []infopb.TSDBInfo{},
expected: []labels.Labels{},
name: "empty label sets",
tsdbInfos: []infopb.TSDBInfo{},
expectedLabelSets: []labels.Labels{},
expectedPartitionLabelSets: []labels.Labels{},
},
{
name: "empty label sets with replica labels",
tsdbInfos: []infopb.TSDBInfo{},
replicaLabels: []string{"replica"},
expected: []labels.Labels{},
name: "empty label sets with replica labels",
tsdbInfos: []infopb.TSDBInfo{},
replicaLabels: []string{"replica"},
expectedLabelSets: []labels.Labels{},
expectedPartitionLabelSets: []labels.Labels{},
},
{
name: "non-empty label sets",
tsdbInfos: []infopb.TSDBInfo{{
Labels: zLabelSetFromStrings("a", "1"),
}},
expected: []labels.Labels{labels.FromStrings("a", "1")},
expectedLabelSets: []labels.Labels{labels.FromStrings("a", "1")},
expectedPartitionLabelSets: []labels.Labels{labels.FromStrings("a", "1")},
},
{
name: "non-empty label sets with replica labels",
tsdbInfos: []infopb.TSDBInfo{{
Labels: zLabelSetFromStrings("a", "1", "b", "2"),
}},
replicaLabels: []string{"a"},
expected: []labels.Labels{labels.FromStrings("b", "2")},
replicaLabels: []string{"a"},
expectedLabelSets: []labels.Labels{labels.FromStrings("b", "2")},
expectedPartitionLabelSets: []labels.Labels{labels.FromStrings("b", "2")},
},
{
name: "replica labels not in label sets",
Expand All @@ -144,8 +149,9 @@ func TestRemoteEngine_LabelSets(t *testing.T) {
Labels: zLabelSetFromStrings("a", "1", "c", "2"),
},
},
replicaLabels: []string{"a", "b"},
expected: []labels.Labels{labels.FromStrings("c", "2")},
replicaLabels: []string{"a", "b"},
expectedLabelSets: []labels.Labels{labels.FromStrings("c", "2")},
expectedPartitionLabelSets: []labels.Labels{labels.FromStrings("c", "2")},
},
{
name: "non-empty label sets with partition labels",
Expand All @@ -154,8 +160,9 @@ func TestRemoteEngine_LabelSets(t *testing.T) {
Labels: zLabelSetFromStrings("a", "1", "c", "2"),
},
},
partitionLabels: []string{"a"},
expected: []labels.Labels{labels.FromStrings("a", "1")},
partitionLabels: []string{"a"},
expectedLabelSets: []labels.Labels{labels.FromStrings("a", "1", "c", "2")},
expectedPartitionLabelSets: []labels.Labels{labels.FromStrings("a", "1")},
},
}

Expand All @@ -167,7 +174,8 @@ func TestRemoteEngine_LabelSets(t *testing.T) {
PartitionLabels: testCase.partitionLabels,
})

testutil.Equals(t, testCase.expected, engine.LabelSets())
testutil.Equals(t, testCase.expectedPartitionLabelSets, engine.PartitionLabelSets())
testutil.Equals(t, testCase.expectedLabelSets, engine.LabelSets())
})
}
}
Expand Down
Loading