|
| 1 | +// Copyright (c) The Thanos Authors. |
| 2 | +// Licensed under the Apache License 2.0. |
| 3 | + |
| 4 | +package main |
| 5 | + |
| 6 | +import ( |
| 7 | + "context" |
| 8 | + "crypto/tls" |
| 9 | + "fmt" |
| 10 | + "io" |
| 11 | + "os" |
| 12 | + "time" |
| 13 | + |
| 14 | + "github.com/go-kit/log" |
| 15 | + "github.com/oklog/run" |
| 16 | + "google.golang.org/grpc" |
| 17 | + "google.golang.org/grpc/credentials" |
| 18 | + "google.golang.org/grpc/credentials/insecure" |
| 19 | + |
| 20 | + "github.com/opentracing/opentracing-go" |
| 21 | + "github.com/prometheus/client_golang/prometheus" |
| 22 | + |
| 23 | + apiv1 "github.com/thanos-io/thanos/pkg/api/query" |
| 24 | + "github.com/thanos-io/thanos/pkg/api/query/querypb" |
| 25 | + "github.com/thanos-io/thanos/pkg/extkingpin" |
| 26 | + "github.com/thanos-io/thanos/pkg/model" |
| 27 | + "github.com/thanos-io/thanos/pkg/query" |
| 28 | +) |
| 29 | + |
| 30 | +type toolsQueryConfig struct { |
| 31 | + // grpc URI for server; see https://github.com/grpc/grpc/blob/master/doc/naming.md |
| 32 | + server string |
| 33 | + timeout time.Duration |
| 34 | + insecure bool |
| 35 | + // TODO flags for TLS config |
| 36 | + tlsconfig tls.Config |
| 37 | + maxResolution time.Duration |
| 38 | + promqlOpts query.Opts |
| 39 | + skipChunks bool |
| 40 | + maxLookbackDelta time.Duration |
| 41 | + defaultPromqlEngine string |
| 42 | +} |
| 43 | + |
| 44 | +type toolsQueryRangeConfig struct { |
| 45 | + toolsQueryConfig |
| 46 | + query string |
| 47 | + startTime model.TimeOrDurationValue |
| 48 | + endTime model.TimeOrDurationValue |
| 49 | + step time.Duration |
| 50 | +} |
| 51 | + |
| 52 | +type toolsQueryInstantConfig struct { |
| 53 | + toolsQueryConfig |
| 54 | + query string |
| 55 | + time model.TimeOrDurationValue |
| 56 | +} |
| 57 | + |
| 58 | +func (tqc *toolsQueryConfig) registerToolsQueryFlag(cmd extkingpin.FlagClause) { |
| 59 | + cmd.Flag("server", "gRPC URI of the Thanos endpoint to query, usually a Thanos Query instance. E.g. dns://thanos-query.thanos.svc.cluster.local:10901 . See https://github.com/grpc/grpc/blob/master/doc/naming.md"). |
| 60 | + Short('u').Required().StringVar(&tqc.server) |
| 61 | + cmd.Flag("timeout", "Timeout befsore abandoning query execution").Default("5m").DurationVar(&tqc.timeout) |
| 62 | + cmd.Flag("insecure", "Do not use TLS when connecting to the gRPC server").Default("false").BoolVar(&tqc.insecure) |
| 63 | + |
| 64 | + cmd.Flag("max-resolution", "Maximum resolution (minimum step) to query from the server. This is useful to limit the resolution when querying downsampled data."). |
| 65 | + Default("0s").DurationVar(&tqc.maxResolution) |
| 66 | + cmd.Flag("partial-response", "Whether to accept partial responses from the server when some of the data sources are down"). |
| 67 | + Default("false").BoolVar(&tqc.promqlOpts.PartialResponse) |
| 68 | + cmd.Flag("query.replica-label", "A comma-separated or repeated-argument list of replica labels for deduplication. If omitted, no deduplication is performed"). |
| 69 | + StringsVar(&tqc.promqlOpts.ReplicaLabels) // implies deduplicate if set |
| 70 | + cmd.Flag("query.promql-engine", "Default PromQL engine to use.").Default(string(apiv1.PromqlEnginePrometheus)). |
| 71 | + EnumVar(&tqc.defaultPromqlEngine, string(apiv1.PromqlEnginePrometheus), string(apiv1.PromqlEngineThanos)) |
| 72 | + cmd.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations. PromQL always evaluates the query for the certain timestamp (query range timestamps are deduced by step). Since scrape intervals might be different, PromQL looks back for given amount of time to get latest sample. If it exceeds the maximum lookback delta it assumes series is stale and returns none (a gap). This is why lookback delta should be set to at least 2 times of the slowest scrape interval. If unset it will use the promql default of 5m.").DurationVar(&tqc.maxLookbackDelta) |
| 73 | +} |
| 74 | + |
| 75 | +func (tqrc *toolsQueryRangeConfig) registerToolsQueryRangeFlag(cmd extkingpin.FlagClause) { |
| 76 | + tqrc.registerToolsQueryFlag(cmd) |
| 77 | + cmd.Flag("query", "Query to execute against the Thanos endpoint."). |
| 78 | + Short('q').Required().StringVar(&tqrc.query) |
| 79 | + cmd.Flag("start", "Range-query evaluation start time. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). |
| 80 | + Required().SetValue(&tqrc.startTime) |
| 81 | + cmd.Flag("end", "Range-query evaluation end time. Default current time. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). |
| 82 | + Default("0s").SetValue(&tqrc.endTime) |
| 83 | + cmd.Flag("step", "Range-query step interval").Required().DurationVar(&tqrc.step) |
| 84 | +} |
| 85 | + |
| 86 | +func (tqic *toolsQueryInstantConfig) registerToolsQueryInstantFlag(cmd extkingpin.FlagClause) { |
| 87 | + tqic.registerToolsQueryFlag(cmd) |
| 88 | + cmd.Flag("query", "Query to execute against the Thanos endpoint."). |
| 89 | + Short('q').Required().StringVar(&tqic.query) |
| 90 | + cmd.Flag("time", "Instant-query evaluation start time. Default now. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). |
| 91 | + Default("0s").SetValue(&tqic.time) |
| 92 | +} |
| 93 | + |
| 94 | +func registerToolsQuery(app extkingpin.AppClause) { |
| 95 | + cmd := app.Command("query", "Utility commands for querying Thanos endpoints") |
| 96 | + registerToolsQueryRange(cmd) |
| 97 | + registerToolsQueryInstant(cmd) |
| 98 | +} |
| 99 | + |
| 100 | +func (tqc *toolsQueryConfig) engineType() querypb.EngineType { |
| 101 | + switch tqc.defaultPromqlEngine { |
| 102 | + case string(apiv1.PromqlEnginePrometheus): |
| 103 | + return querypb.EngineType_prometheus |
| 104 | + case string(apiv1.PromqlEngineThanos): |
| 105 | + return querypb.EngineType_thanos |
| 106 | + default: |
| 107 | + fmt.Fprintf(os.Stderr, "warning: unknown promql engine type %q, defaulting to prometheus\n", tqc.defaultPromqlEngine) |
| 108 | + return querypb.EngineType_prometheus |
| 109 | + } |
| 110 | +} |
| 111 | +func (tqc *toolsQueryConfig) createClient() (querypb.QueryClient, *grpc.ClientConn, error) { |
| 112 | + opts := []grpc.DialOption{} |
| 113 | + if tqc.insecure { |
| 114 | + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) |
| 115 | + } else { |
| 116 | + opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(&tqc.tlsconfig))) |
| 117 | + fmt.Fprintf(os.Stderr, "warning: TLS configuration not implemented") |
| 118 | + } |
| 119 | + conn, err := grpc.NewClient(tqc.server, opts...) |
| 120 | + if err != nil { |
| 121 | + return nil, nil, fmt.Errorf("failed to create gRPC client: %w", err) |
| 122 | + } |
| 123 | + return querypb.NewQueryClient(conn), conn, nil |
| 124 | +} |
| 125 | + |
| 126 | +// Run a range query via Thanos gRPC API; like 'promtool query range' |
| 127 | +func registerToolsQueryRange(app extkingpin.AppClause) { |
| 128 | + cmd := app.Command("range", "Run an range query on a Thanos gRPC endpoint.") |
| 129 | + |
| 130 | + tqr := &toolsQueryRangeConfig{} |
| 131 | + tqr.registerToolsQueryRangeFlag(cmd) |
| 132 | + |
| 133 | + cmd.Setup(func(g *run.Group, logger log.Logger, _ *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { |
| 134 | + // Ensure that we exit by providing a placeholder actor to the run group. |
| 135 | + g.Add(func() error { return nil }, func(error) {}) |
| 136 | + |
| 137 | + // Ensure we can cancel the query in response to SIGINT or timeout |
| 138 | + // TODO should use a different timeout for the query than the app |
| 139 | + ctx, cancel := context.WithTimeout(context.Background(), tqr.timeout) |
| 140 | + defer cancel() |
| 141 | + |
| 142 | + warnOnFutureTimestamp(tqr.startTime, "--start") |
| 143 | + warnOnFutureTimestamp(tqr.endTime, "--end") |
| 144 | + if tqr.startTime.PrometheusTimestamp() >= tqr.endTime.PrometheusTimestamp() { |
| 145 | + if tqr.startTime.Dur != nil && tqr.endTime.Dur != nil { |
| 146 | + return fmt.Errorf("start duration must be before end duration, did you mean --start=-%s --end=-%s?", tqr.endTime.Dur.String(), tqr.startTime.Dur.String()) |
| 147 | + } |
| 148 | + return fmt.Errorf("start time must be before end time") |
| 149 | + } |
| 150 | + |
| 151 | + qc, _, err := tqr.createClient() |
| 152 | + if err != nil { |
| 153 | + return err |
| 154 | + } |
| 155 | + req := &querypb.QueryRangeRequest{ |
| 156 | + Query: tqr.query, |
| 157 | + StartTimeSeconds: tqr.startTime.PrometheusTimestamp() / 1000, |
| 158 | + EndTimeSeconds: tqr.endTime.PrometheusTimestamp() / 1000, |
| 159 | + IntervalSeconds: int64(tqr.step.Seconds()), |
| 160 | + TimeoutSeconds: int64(tqr.timeout.Seconds()), |
| 161 | + MaxResolutionSeconds: int64(tqr.maxResolution.Seconds()), |
| 162 | + ReplicaLabels: tqr.promqlOpts.ReplicaLabels, |
| 163 | + EnableDedup: len(tqr.promqlOpts.ReplicaLabels) > 0, |
| 164 | + EnablePartialResponse: tqr.promqlOpts.PartialResponse, |
| 165 | + SkipChunks: tqr.skipChunks, |
| 166 | + LookbackDeltaSeconds: int64(tqr.maxLookbackDelta.Seconds()), |
| 167 | + Engine: tqr.engineType(), |
| 168 | + } |
| 169 | + fmt.Fprintf(os.Stderr, "QueryRangeRequest: %+v\n", req) |
| 170 | + fmt.Fprintf(os.Stderr, "Issuing gRPC QueryRangeRequest: %+v\n", req) |
| 171 | + resp, err := qc.QueryRange(ctx, req) |
| 172 | + if err != nil { |
| 173 | + return fmt.Errorf("QueryRange gRPC error: %v\n", err) |
| 174 | + } |
| 175 | + defer resp.CloseSend() |
| 176 | + |
| 177 | + for { |
| 178 | + msg, err := resp.Recv() |
| 179 | + switch err { |
| 180 | + case nil: |
| 181 | + fmt.Fprintf(os.Stdout, "QueryRange response message: %+v\n", msg) |
| 182 | + case io.EOF: |
| 183 | + fmt.Fprintf(os.Stderr, "QueryRange: EOF\n") |
| 184 | + return nil |
| 185 | + default: |
| 186 | + return fmt.Errorf("QueryRange RecvMsg error: %v\n", err) |
| 187 | + } |
| 188 | + } |
| 189 | + }) |
| 190 | +} |
| 191 | + |
| 192 | +// Run an instant query via Thanos gRPC API; like 'promtool query instant' |
| 193 | +func registerToolsQueryInstant(app extkingpin.AppClause) { |
| 194 | + cmd := app.Command("instant", "Run an instant query on a Thanos gRPC endpoint.") |
| 195 | + |
| 196 | + tqi := &toolsQueryInstantConfig{} |
| 197 | + tqi.registerToolsQueryInstantFlag(cmd) |
| 198 | + |
| 199 | + cmd.Setup(func(g *run.Group, logger log.Logger, _ *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { |
| 200 | + // Ensure that we exit by providing a placeholder actor to the run group. |
| 201 | + g.Add(func() error { return nil }, func(error) {}) |
| 202 | + |
| 203 | + // Ensure we can cancel the query in response to SIGINT or timeout |
| 204 | + // TODO should use a different timeout for the query than the app |
| 205 | + ctx, cancel := context.WithTimeout(context.Background(), tqi.timeout) |
| 206 | + defer cancel() |
| 207 | + |
| 208 | + warnOnFutureTimestamp(tqi.time, "--time") |
| 209 | + |
| 210 | + qc, _, err := tqi.createClient() |
| 211 | + if err != nil { |
| 212 | + return err |
| 213 | + } |
| 214 | + fmt.Fprintf(os.Stderr, "t=%+v\n", tqi.time) |
| 215 | + req := &querypb.QueryRequest{ |
| 216 | + Query: tqi.query, |
| 217 | + TimeSeconds: tqi.time.PrometheusTimestamp() / 1000, |
| 218 | + TimeoutSeconds: int64(tqi.timeout.Seconds()), |
| 219 | + MaxResolutionSeconds: int64(tqi.maxResolution.Seconds()), |
| 220 | + ReplicaLabels: tqi.promqlOpts.ReplicaLabels, |
| 221 | + EnableDedup: len(tqi.promqlOpts.ReplicaLabels) > 0, |
| 222 | + EnablePartialResponse: tqi.promqlOpts.PartialResponse, |
| 223 | + SkipChunks: tqi.skipChunks, |
| 224 | + LookbackDeltaSeconds: int64(tqi.maxLookbackDelta.Seconds()), |
| 225 | + Engine: tqi.engineType(), |
| 226 | + } |
| 227 | + fmt.Fprintf(os.Stderr, "QueryRequest: %+v\n", req) |
| 228 | + fmt.Fprintf(os.Stderr, "Issuing gRPC QueryRequest: %+v\n", req) |
| 229 | + resp, err := qc.Query(ctx, req) |
| 230 | + if err != nil { |
| 231 | + return fmt.Errorf("Query gRPC error: %v\n", err) |
| 232 | + } |
| 233 | + defer resp.CloseSend() |
| 234 | + |
| 235 | + for { |
| 236 | + msg, err := resp.Recv() |
| 237 | + switch err { |
| 238 | + case nil: |
| 239 | + fmt.Fprintf(os.Stdout, "QueryRange response message: %+v\n", msg) |
| 240 | + case io.EOF: |
| 241 | + fmt.Fprintf(os.Stderr, "QueryRange: EOF\n") |
| 242 | + return nil |
| 243 | + default: |
| 244 | + return fmt.Errorf("QueryRange RecvMsg error: %v\n", err) |
| 245 | + } |
| 246 | + } |
| 247 | + }) |
| 248 | +} |
| 249 | + |
| 250 | +func warnOnFutureTimestamp(t model.TimeOrDurationValue, desc string) { |
| 251 | + if t.PrometheusTimestamp()/1000 > time.Now().Unix() { |
| 252 | + if t.Dur != nil { |
| 253 | + fmt.Fprintf(os.Stderr, "warning: %s is in the future, did you mean --time=-%s?\n", desc, t.Dur.String()) |
| 254 | + } else { |
| 255 | + fmt.Fprintf(os.Stderr, "warning: %s is in the future\n", desc) |
| 256 | + } |
| 257 | + } |
| 258 | +} |
0 commit comments