Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#8488](https://github.com/thanos-io/thanos/pull/8488) Compact: Add ability to push metrics to a Pushgateway for one-shot runs, including metrics for failed runs.
- [#8366](https://github.com/thanos-io/thanos/pull/8366) Store: optionally ignore Parquet migrated blocks
- [#8359](https://github.com/thanos-io/thanos/pull/8359) Tools: add `--shipper.upload-compacted` flag for uploading compacted blocks to bucket upload-blocks

Expand Down
164 changes: 112 additions & 52 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/push"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -445,6 +446,58 @@ func runCompact(
return nil
}

ps := compact.NewCompactionProgressCalculator(reg, tsdbPlanner)
rs := compact.NewRetentionProgressCalculator(reg, retentionByResolution)
var ds *compact.DownsampleProgressCalculator
if !conf.disableDownsampling {
ds = compact.NewDownsampleProgressCalculator(reg)
}
progressCalcFn := func() error {
if err := sy.SyncMetas(ctx); err != nil {
// The RetryError signals that we hit an retriable error (transient error, no connection).
// You should alert on this being triggered too frequently.
if compact.IsRetryError(err) {
level.Error(logger).Log("msg", "retriable error", "err", err)
compactMetrics.retried.Inc()

return nil
}

return errors.Wrapf(err, "could not sync metas")
}

metas := sy.Metas()
groups, err := grouper.Groups(metas)
if err != nil {
return errors.Wrapf(err, "could not group metadata for compaction")
}

if err = ps.ProgressCalculate(ctx, groups); err != nil {
return errors.Wrapf(err, "could not calculate compaction progress")
}

retGroups, err := grouper.Groups(metas)
if err != nil {
return errors.Wrapf(err, "could not group metadata for retention")
}

if err = rs.ProgressCalculate(ctx, retGroups); err != nil {
return errors.Wrapf(err, "could not calculate retention progress")
}

if !conf.disableDownsampling {
groups, err = grouper.Groups(metas)
if err != nil {
return errors.Wrapf(err, "could not group metadata into downsample groups")
}
if err := ds.ProgressCalculate(ctx, groups); err != nil {
return errors.Wrapf(err, "could not calculate downsampling progress")
}
}

return nil
}

compactMainFn := func() error {
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction")
Expand Down Expand Up @@ -535,7 +588,57 @@ func runCompact(
defer runutil.CloseWithLogOnErr(logger, insBkt, "bucket client")

if !conf.wait {
return compactMainFn()
pushMetrics := func() error {
if conf.pushGatewayURL == "" {
return nil
}
level.Info(logger).Log("msg", "pushing metrics to Pushgateway", "url", conf.pushGatewayURL)

job := conf.pushGatewayJob
if job == "" {
job = component.String()
}

pusher := push.New(conf.pushGatewayURL, job).Gatherer(reg)

hostname, err := os.Hostname()
if err != nil {
level.Warn(logger).Log("msg", "failed to get hostname for pushgateway grouping key", "err", err)
} else {
pusher = pusher.Grouping("instance", hostname)
}

if err := pusher.Push(); err != nil {
return errors.Wrap(err, "failed to push metrics to Pushgateway")
}

level.Info(logger).Log("msg", "successfully pushed metrics to Pushgateway")
return nil
}

err := compactMainFn()
if err != nil {
if compact.IsHaltError(err) {
level.Error(logger).Log("msg", "critical error detected; halting", "err", err)
compactMetrics.halted.Set(1)
} else if compact.IsRetryError(err) {
level.Error(logger).Log("msg", "retriable error", "err", err)
compactMetrics.retried.Inc()
}

if pushErr := pushMetrics(); pushErr != nil {
level.Error(logger).Log("msg", "failed to push metrics on error", "push_err", pushErr)
}
return err // Always exit with an error code.
}

compactMetrics.iterations.Inc()

if err := progressCalcFn(); err != nil {
level.Warn(logger).Log("msg", "failed to calculate progress metrics", "err", err)
}

return pushMetrics()
}

// --wait=true is specified.
Expand Down Expand Up @@ -643,58 +746,8 @@ func runCompact(
// Periodically calculate the progress of compaction, downsampling and retention.
if conf.progressCalculateInterval > 0 {
g.Add(func() error {
ps := compact.NewCompactionProgressCalculator(reg, tsdbPlanner)
rs := compact.NewRetentionProgressCalculator(reg, retentionByResolution)
var ds *compact.DownsampleProgressCalculator
if !conf.disableDownsampling {
ds = compact.NewDownsampleProgressCalculator(reg)
}

return runutil.Repeat(conf.progressCalculateInterval, ctx.Done(), func() error {

if err := sy.SyncMetas(ctx); err != nil {
// The RetryError signals that we hit an retriable error (transient error, no connection).
// You should alert on this being triggered too frequently.
if compact.IsRetryError(err) {
level.Error(logger).Log("msg", "retriable error", "err", err)
compactMetrics.retried.Inc()

return nil
}

return errors.Wrapf(err, "could not sync metas")
}

metas := sy.Metas()
groups, err := grouper.Groups(metas)
if err != nil {
return errors.Wrapf(err, "could not group metadata for compaction")
}

if err = ps.ProgressCalculate(ctx, groups); err != nil {
return errors.Wrapf(err, "could not calculate compaction progress")
}

retGroups, err := grouper.Groups(metas)
if err != nil {
return errors.Wrapf(err, "could not group metadata for retention")
}

if err = rs.ProgressCalculate(ctx, retGroups); err != nil {
return errors.Wrapf(err, "could not calculate retention progress")
}

if !conf.disableDownsampling {
groups, err = grouper.Groups(metas)
if err != nil {
return errors.Wrapf(err, "could not group metadata into downsample groups")
}
if err := ds.ProgressCalculate(ctx, groups); err != nil {
return errors.Wrapf(err, "could not calculate downsampling progress")
}
}

return nil
return progressCalcFn()
})
}, func(err error) {
cancel()
Expand Down Expand Up @@ -742,6 +795,8 @@ type compactConfig struct {
progressCalculateInterval time.Duration
filterConf *store.FilterConfig
disableAdminOperations bool
pushGatewayURL string
pushGatewayJob string
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -855,5 +910,10 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("bucket-web-label", "External block label to use as group title in the bucket web UI").StringVar(&cc.label)

cmd.Flag("push-gateway.url", "Prometheus Pushgateway URL to push metrics to. If not empty, compactor will push metrics on exit when in one-shot mode (--wait=false).").
Default("").StringVar(&cc.pushGatewayURL)
cmd.Flag("push-gateway.job", "Job name to use when pushing metrics to Prometheus Pushgateway.").
Default("thanos-compact").StringVar(&cc.pushGatewayJob)

cmd.Flag("disable-admin-operations", "Disable UI/API admin operations like marking blocks for deletion and no compaction.").Default("false").BoolVar(&cc.disableAdminOperations)
}
Loading