Skip to content

Commit 1e1e64b

Browse files
committed
cli: Add 'thanos tools' 'query range' and 'query instant' subcommands
Add commands 'thanos tools query range' and 'thanos tools query instant' that use the Thanos gRPC API to execute PromQL on a Thanos Query endpoint. This works like 'promtool query' for the Thanos gRPC API instead of the Prometheus HTTP API. Using the Thanos gRPC API improves performance and lowers memory use by using protobuf instead of text json serialization. It also streams large result sets to the client, so the whole serialised result does not have to be accumulated in memory before any of it can be sent; this lowers latency and peak memory use. Results cannot be fully streamed, because the Thanos executor still accumulates them into a promql.Result before sending any. But it helps reduce the memory overhead and latency of building the whole serialised response in memory while still holding the result data in memory. This lowers peak Query memory, and latency to first response. Sending queries to Thanos Query Frontend is not currently supported, because Frontend only exposes the Prometheus HTTP API (and only uses the Prometheus HTTP API to talk to its downstreams). This is a WIP, with the following improvements still required: * Needs docs updates * Needs test cases * Needs (m)TLS configuration support * Needs http_proxy support * Checks for gRPC endpoint format needed (support dns:, ipv4:, etc) * Needs formatting options like promtool * --query should be positional parameter Example use: ./thanos tools query range --start='-2h' --step '30s' --insecure \ --server 'thanos-query.monitoring.svc.local:10901' \ --timeout=5m --query.promql-engine=thanos \ --query 'last_over_time({job="kubelet"}[1m])'
1 parent d997eed commit 1e1e64b

File tree

3 files changed

+260
-0
lines changed

3 files changed

+260
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
2020

2121
- [#8366](https://github.com/thanos-io/thanos/pull/8366) Store: optionally ignore Parquet migrated blocks
2222
- [#8359](https://github.com/thanos-io/thanos/pull/8359) Tools: add `--shipper.upload-compacted` flag for uploading compacted blocks to bucket upload-blocks
23+
- [#8501](https://github.com/thanos-io/thanos/pull/8501) Tools: Add 'thanos tools' 'query range' and 'query instant' subcommands for running PromQL queries over Thanos gRPC API
2324

2425
### Changed
2526

cmd/thanos/tools.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func registerTools(app *extkingpin.App) {
2828

2929
registerBucket(cmd)
3030
registerCheckRules(cmd)
31+
registerToolsQuery(cmd)
3132
}
3233

3334
func (tc *checkRulesConfig) registerFlag(cmd extkingpin.FlagClause) *checkRulesConfig {

cmd/thanos/tools_query.go

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package main
5+
6+
import (
7+
"context"
8+
"crypto/tls"
9+
"fmt"
10+
"io"
11+
"os"
12+
"time"
13+
14+
"github.com/go-kit/log"
15+
"github.com/oklog/run"
16+
"google.golang.org/grpc"
17+
"google.golang.org/grpc/credentials"
18+
"google.golang.org/grpc/credentials/insecure"
19+
20+
"github.com/opentracing/opentracing-go"
21+
"github.com/prometheus/client_golang/prometheus"
22+
23+
apiv1 "github.com/thanos-io/thanos/pkg/api/query"
24+
"github.com/thanos-io/thanos/pkg/api/query/querypb"
25+
"github.com/thanos-io/thanos/pkg/extkingpin"
26+
"github.com/thanos-io/thanos/pkg/model"
27+
"github.com/thanos-io/thanos/pkg/query"
28+
)
29+
30+
type toolsQueryConfig struct {
31+
// grpc URI for server; see https://github.com/grpc/grpc/blob/master/doc/naming.md
32+
server string
33+
timeout time.Duration
34+
insecure bool
35+
// TODO flags for TLS config
36+
tlsconfig tls.Config
37+
maxResolution time.Duration
38+
promqlOpts query.Opts
39+
skipChunks bool
40+
maxLookbackDelta time.Duration
41+
defaultPromqlEngine string
42+
}
43+
44+
type toolsQueryRangeConfig struct {
45+
toolsQueryConfig
46+
query string
47+
startTime model.TimeOrDurationValue
48+
endTime model.TimeOrDurationValue
49+
step time.Duration
50+
}
51+
52+
type toolsQueryInstantConfig struct {
53+
toolsQueryConfig
54+
query string
55+
time model.TimeOrDurationValue
56+
}
57+
58+
func (tqc *toolsQueryConfig) registerToolsQueryFlag(cmd extkingpin.FlagClause) {
59+
cmd.Flag("server", "gRPC URI of the Thanos endpoint to query, usually a Thanos Query instance. E.g. dns://thanos-query.thanos.svc.cluster.local:10901 . See https://github.com/grpc/grpc/blob/master/doc/naming.md").
60+
Short('u').Required().StringVar(&tqc.server)
61+
cmd.Flag("timeout", "Timeout befsore abandoning query execution").Default("5m").DurationVar(&tqc.timeout)
62+
cmd.Flag("insecure", "Do not use TLS when connecting to the gRPC server").Default("false").BoolVar(&tqc.insecure)
63+
64+
cmd.Flag("max-resolution", "Maximum resolution (minimum step) to query from the server. This is useful to limit the resolution when querying downsampled data.").
65+
Default("0s").DurationVar(&tqc.maxResolution)
66+
cmd.Flag("partial-response", "Whether to accept partial responses from the server when some of the data sources are down").
67+
Default("false").BoolVar(&tqc.promqlOpts.PartialResponse)
68+
cmd.Flag("query.replica-label", "A comma-separated or repeated-argument list of replica labels for deduplication. If omitted, no deduplication is performed").
69+
StringsVar(&tqc.promqlOpts.ReplicaLabels) // implies deduplicate if set
70+
cmd.Flag("query.promql-engine", "Default PromQL engine to use.").Default(string(apiv1.PromqlEnginePrometheus)).
71+
EnumVar(&tqc.defaultPromqlEngine, string(apiv1.PromqlEnginePrometheus), string(apiv1.PromqlEngineThanos))
72+
cmd.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations. PromQL always evaluates the query for the certain timestamp (query range timestamps are deduced by step). Since scrape intervals might be different, PromQL looks back for given amount of time to get latest sample. If it exceeds the maximum lookback delta it assumes series is stale and returns none (a gap). This is why lookback delta should be set to at least 2 times of the slowest scrape interval. If unset it will use the promql default of 5m.").DurationVar(&tqc.maxLookbackDelta)
73+
}
74+
75+
func (tqrc *toolsQueryRangeConfig) registerToolsQueryRangeFlag(cmd extkingpin.FlagClause) {
76+
tqrc.registerToolsQueryFlag(cmd)
77+
cmd.Flag("query", "Query to execute against the Thanos endpoint.").
78+
Short('q').Required().StringVar(&tqrc.query)
79+
cmd.Flag("start", "Range-query evaluation start time. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
80+
Required().SetValue(&tqrc.startTime)
81+
cmd.Flag("end", "Range-query evaluation end time. Default current time. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
82+
Default("0s").SetValue(&tqrc.endTime)
83+
cmd.Flag("step", "Range-query step interval").Required().DurationVar(&tqrc.step)
84+
}
85+
86+
func (tqic *toolsQueryInstantConfig) registerToolsQueryInstantFlag(cmd extkingpin.FlagClause) {
87+
tqic.registerToolsQueryFlag(cmd)
88+
cmd.Flag("query", "Query to execute against the Thanos endpoint.").
89+
Short('q').Required().StringVar(&tqic.query)
90+
cmd.Flag("time", "Instant-query evaluation start time. Default now. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
91+
Default("0s").SetValue(&tqic.time)
92+
}
93+
94+
func registerToolsQuery(app extkingpin.AppClause) {
95+
cmd := app.Command("query", "Utility commands for querying Thanos endpoints")
96+
registerToolsQueryRange(cmd)
97+
registerToolsQueryInstant(cmd)
98+
}
99+
100+
func (tqc *toolsQueryConfig) engineType() querypb.EngineType {
101+
switch tqc.defaultPromqlEngine {
102+
case string(apiv1.PromqlEnginePrometheus):
103+
return querypb.EngineType_prometheus
104+
case string(apiv1.PromqlEngineThanos):
105+
return querypb.EngineType_thanos
106+
default:
107+
fmt.Fprintf(os.Stderr, "warning: unknown promql engine type %q, defaulting to prometheus\n", tqc.defaultPromqlEngine)
108+
return querypb.EngineType_prometheus
109+
}
110+
}
111+
func (tqc *toolsQueryConfig) createClient() (querypb.QueryClient, *grpc.ClientConn, error) {
112+
opts := []grpc.DialOption{}
113+
if tqc.insecure {
114+
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
115+
} else {
116+
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(&tqc.tlsconfig)))
117+
fmt.Fprintf(os.Stderr, "warning: TLS configuration not implemented")
118+
}
119+
conn, err := grpc.NewClient(tqc.server, opts...)
120+
if err != nil {
121+
return nil, nil, fmt.Errorf("failed to create gRPC client: %w", err)
122+
}
123+
return querypb.NewQueryClient(conn), conn, nil
124+
}
125+
126+
// Run a range query via Thanos gRPC API; like 'promtool query range'
127+
func registerToolsQueryRange(app extkingpin.AppClause) {
128+
cmd := app.Command("range", "Run an range query on a Thanos gRPC endpoint.")
129+
130+
tqr := &toolsQueryRangeConfig{}
131+
tqr.registerToolsQueryRangeFlag(cmd)
132+
133+
cmd.Setup(func(g *run.Group, logger log.Logger, _ *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
134+
// Ensure that we exit by providing a placeholder actor to the run group.
135+
g.Add(func() error { return nil }, func(error) {})
136+
137+
// Ensure we can cancel the query in response to SIGINT or timeout
138+
// TODO should use a different timeout for the query than the app
139+
ctx, cancel := context.WithTimeout(context.Background(), tqr.timeout)
140+
defer cancel()
141+
142+
warnOnFutureTimestamp(tqr.startTime, "--start")
143+
warnOnFutureTimestamp(tqr.endTime, "--end")
144+
if tqr.startTime.PrometheusTimestamp() >= tqr.endTime.PrometheusTimestamp() {
145+
if tqr.startTime.Dur != nil && tqr.endTime.Dur != nil {
146+
return fmt.Errorf("start duration must be before end duration, did you mean --start=-%s --end=-%s?", tqr.endTime.Dur.String(), tqr.startTime.Dur.String())
147+
}
148+
return fmt.Errorf("start time must be before end time")
149+
}
150+
151+
qc, _, err := tqr.createClient()
152+
if err != nil {
153+
return err
154+
}
155+
req := &querypb.QueryRangeRequest{
156+
Query: tqr.query,
157+
StartTimeSeconds: tqr.startTime.PrometheusTimestamp() / 1000,
158+
EndTimeSeconds: tqr.endTime.PrometheusTimestamp() / 1000,
159+
IntervalSeconds: int64(tqr.step.Seconds()),
160+
TimeoutSeconds: int64(tqr.timeout.Seconds()),
161+
MaxResolutionSeconds: int64(tqr.maxResolution.Seconds()),
162+
ReplicaLabels: tqr.promqlOpts.ReplicaLabels,
163+
EnableDedup: len(tqr.promqlOpts.ReplicaLabels) > 0,
164+
EnablePartialResponse: tqr.promqlOpts.PartialResponse,
165+
SkipChunks: tqr.skipChunks,
166+
LookbackDeltaSeconds: int64(tqr.maxLookbackDelta.Seconds()),
167+
Engine: tqr.engineType(),
168+
}
169+
fmt.Fprintf(os.Stderr, "QueryRangeRequest: %+v\n", req)
170+
fmt.Fprintf(os.Stderr, "Issuing gRPC QueryRangeRequest: %+v\n", req)
171+
resp, err := qc.QueryRange(ctx, req)
172+
if err != nil {
173+
return fmt.Errorf("QueryRange gRPC error: %v\n", err)
174+
}
175+
defer resp.CloseSend()
176+
177+
for {
178+
msg, err := resp.Recv()
179+
switch err {
180+
case nil:
181+
fmt.Fprintf(os.Stdout, "QueryRange response message: %+v\n", msg)
182+
case io.EOF:
183+
fmt.Fprintf(os.Stderr, "QueryRange: EOF\n")
184+
return nil
185+
default:
186+
return fmt.Errorf("QueryRange RecvMsg error: %v\n", err)
187+
}
188+
}
189+
})
190+
}
191+
192+
// Run an instant query via Thanos gRPC API; like 'promtool query instant'
193+
func registerToolsQueryInstant(app extkingpin.AppClause) {
194+
cmd := app.Command("instant", "Run an instant query on a Thanos gRPC endpoint.")
195+
196+
tqi := &toolsQueryInstantConfig{}
197+
tqi.registerToolsQueryInstantFlag(cmd)
198+
199+
cmd.Setup(func(g *run.Group, logger log.Logger, _ *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
200+
// Ensure that we exit by providing a placeholder actor to the run group.
201+
g.Add(func() error { return nil }, func(error) {})
202+
203+
// Ensure we can cancel the query in response to SIGINT or timeout
204+
// TODO should use a different timeout for the query than the app
205+
ctx, cancel := context.WithTimeout(context.Background(), tqi.timeout)
206+
defer cancel()
207+
208+
warnOnFutureTimestamp(tqi.time, "--time")
209+
210+
qc, _, err := tqi.createClient()
211+
if err != nil {
212+
return err
213+
}
214+
fmt.Fprintf(os.Stderr, "t=%+v\n", tqi.time)
215+
req := &querypb.QueryRequest{
216+
Query: tqi.query,
217+
TimeSeconds: tqi.time.PrometheusTimestamp() / 1000,
218+
TimeoutSeconds: int64(tqi.timeout.Seconds()),
219+
MaxResolutionSeconds: int64(tqi.maxResolution.Seconds()),
220+
ReplicaLabels: tqi.promqlOpts.ReplicaLabels,
221+
EnableDedup: len(tqi.promqlOpts.ReplicaLabels) > 0,
222+
EnablePartialResponse: tqi.promqlOpts.PartialResponse,
223+
SkipChunks: tqi.skipChunks,
224+
LookbackDeltaSeconds: int64(tqi.maxLookbackDelta.Seconds()),
225+
Engine: tqi.engineType(),
226+
}
227+
fmt.Fprintf(os.Stderr, "QueryRequest: %+v\n", req)
228+
fmt.Fprintf(os.Stderr, "Issuing gRPC QueryRequest: %+v\n", req)
229+
resp, err := qc.Query(ctx, req)
230+
if err != nil {
231+
return fmt.Errorf("Query gRPC error: %v\n", err)
232+
}
233+
defer resp.CloseSend()
234+
235+
for {
236+
msg, err := resp.Recv()
237+
switch err {
238+
case nil:
239+
fmt.Fprintf(os.Stdout, "QueryRange response message: %+v\n", msg)
240+
case io.EOF:
241+
fmt.Fprintf(os.Stderr, "QueryRange: EOF\n")
242+
return nil
243+
default:
244+
return fmt.Errorf("QueryRange RecvMsg error: %v\n", err)
245+
}
246+
}
247+
})
248+
}
249+
250+
func warnOnFutureTimestamp(t model.TimeOrDurationValue, desc string) {
251+
if t.PrometheusTimestamp()/1000 > time.Now().Unix() {
252+
if t.Dur != nil {
253+
fmt.Fprintf(os.Stderr, "warning: %s is in the future, did you mean --time=-%s?\n", desc, t.Dur.String())
254+
} else {
255+
fmt.Fprintf(os.Stderr, "warning: %s is in the future\n", desc)
256+
}
257+
}
258+
}

0 commit comments

Comments
 (0)