Skip to content

Commit 761bcaa

Browse files
Rustamchukru.nazarov
andauthored
SH-470 api rate limiter per host (#2054)
* SH-470 api rate limiter per host * fix test * fix test * disable option * builtin * builtin * default fix --------- Co-authored-by: ru.nazarov <[email protected]>
1 parent eb77f15 commit 761bcaa

File tree

13 files changed

+1471
-47
lines changed

13 files changed

+1471
-47
lines changed

cmd/statshouse-api/statshouse-api.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ func run() int {
175175
ShardByMetricShards: argv.ShardByMetricShards,
176176
MaxShardConnsRatio: argv.CHMaxShardConnsRatio,
177177
DialTimeout: chDialTimeout,
178+
SelectTimeout: argv.QuerySelectTimeout,
179+
RateLimitConfig: argv.RateLimitConfig,
178180
ConnLimits: chutil.ConnLimits{
179181
FastLightMaxConns: argv.chV1MaxConns,
180182
FastHeavyMaxConns: argv.chV1MaxConns,
@@ -196,6 +198,8 @@ func run() int {
196198
ShardByMetricShards: argv.ShardByMetricShards,
197199
MaxShardConnsRatio: argv.CHMaxShardConnsRatio,
198200
DialTimeout: chDialTimeout,
201+
SelectTimeout: argv.QuerySelectTimeout,
202+
RateLimitConfig: argv.RateLimitConfig,
199203
ConnLimits: chutil.ConnLimits{
200204
FastLightMaxConns: argv.chV2MaxLightFastConns,
201205
FastHeavyMaxConns: argv.chV2MaxHeavyFastConns,

internal/api/config.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type Config struct {
3838
AvailableShardsStr string
3939
AvailableShards []uint32
4040
CHMaxShardConnsRatio int
41+
chutil.RateLimitConfig
4142
}
4243

4344
func (argv *Config) ValidateConfig() error {
@@ -116,13 +117,37 @@ func (argv *Config) Bind(f *flag.FlagSet, defaultI config.Config) {
116117
f.StringVar(&argv.BlockedUsersS, "blocked-users", "", "comma-separated list of users that are blocked")
117118
f.StringVar(&argv.AvailableShardsStr, "available-shards", default_.AvailableShardsStr, "comma-separated list of default shards for metrics when namespace doesn't specify shards")
118119
f.IntVar(&argv.CHMaxShardConnsRatio, "clickhouse-max-shard-conns-ratio", default_.CHMaxShardConnsRatio, "maximum number of ClickHouse connections per shard (%)")
120+
121+
f.BoolVar(&argv.RateLimitDisable, "rate-limit-disable", false, "disable rate limiting")
122+
f.DurationVar(&argv.WindowDuration, "rate-limit-window-duration", default_.WindowDuration, "time window for analyzing ClickHouse requests")
123+
f.Uint64Var(&argv.MaxErrorRate, "rate-limit-max-error-rate", default_.MaxErrorRate, "error rate threshold (%) to trigger sleep stage")
124+
f.Uint64Var(&argv.MaxInflightWeight, "rate-limit-max-inflight-weight", default_.MaxInflightWeight, "maximum weight per inflight request (%)")
125+
f.Uint64Var(&argv.RecoverWeightStep, "rate-limit-recover-weight-step", default_.RecoverWeightStep, "recover weight step (%) per recovery cycle")
126+
f.DurationVar(&argv.RecoverGapDuration, "rate-limit-recover-gap-duration", default_.RecoverGapDuration, "recover time gap per recovery cycle")
127+
f.Float64Var(&argv.SleepMultiplier, "rate-limit-sleep-multiplier", default_.SleepMultiplier, "sleep duration multiplier after failed check")
128+
f.DurationVar(&argv.MaxSleepDuration, "rate-limit-max-sleep-duration", default_.MaxSleepDuration, "maximum sleep duration limit")
129+
f.DurationVar(&argv.MinSleepDuration, "rate-limit-min-sleep-duration", default_.MinSleepDuration, "minimum sleep duration when entering sleep")
130+
f.Uint64Var(&argv.CheckCount, "rate-limit-check-count", default_.CheckCount, "successful checks needed to exit check stage")
131+
f.DurationVar(&argv.RecalcInterval, "recalc-interval-sleep-duration", default_.RecalcInterval, "rate limit state recalculation interval")
119132
}
120133

121134
func DefaultConfig() *Config {
122135
return &Config{
123136
ApproxCacheMaxSize: 1_000_000,
124137
AvailableShardsStr: "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16",
125138
CHMaxShardConnsRatio: 20,
139+
RateLimitConfig: chutil.RateLimitConfig{
140+
WindowDuration: 2 * time.Minute,
141+
MaxErrorRate: 20,
142+
MaxInflightWeight: 10,
143+
RecoverWeightStep: 1,
144+
RecoverGapDuration: 6 * time.Second,
145+
SleepMultiplier: 1.5,
146+
MaxSleepDuration: 2 * time.Minute,
147+
MinSleepDuration: 30 * time.Second,
148+
CheckCount: 9,
149+
RecalcInterval: 1 * time.Second,
150+
},
126151
}
127152
}
128153

@@ -154,7 +179,7 @@ type HandlerOptions struct {
154179
timezone string
155180
protectedMetricPrefixesS string
156181
protectedMetricPrefixes []string
157-
querySelectTimeout time.Duration
182+
QuerySelectTimeout time.Duration
158183
weekStartAt int
159184
location *time.Location
160185
utcOffset int64
@@ -167,7 +192,7 @@ func (argv *HandlerOptions) Bind(f *flag.FlagSet) {
167192
f.BoolVar(&argv.querySequential, "query-sequential", false, "disables query parallel execution")
168193
f.BoolVar(&argv.readOnly, "readonly", false, "read only mode")
169194
f.BoolVar(&argv.verbose, "verbose", false, "verbose logging")
170-
f.DurationVar(&argv.querySelectTimeout, "query-select-timeout", QuerySelectTimeoutDefault, "query select timeout")
195+
f.DurationVar(&argv.QuerySelectTimeout, "query-select-timeout", QuerySelectTimeoutDefault, "query select timeout")
171196
f.StringVar(&argv.protectedMetricPrefixesS, "protected-metric-prefixes", "", "comma-separated list of metric prefixes that require access bits set")
172197
f.StringVar(&argv.timezone, "timezone", "Europe/Moscow", "location of the desired timezone")
173198
f.IntVar(&argv.weekStartAt, "week-start", int(time.Monday), "week day of beginning of the week (from sunday=0 to saturday=6)")

internal/api/endpoint_stat.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,18 @@ func ChSelectActiveQueries(client *statshouse.Client, versionTag string, tagLane
186186
}
187187
}
188188

189+
func ChRateLimit(client *statshouse.Client, versionTag string, stats []chutil.RateLimitMetric) {
190+
for _, stat := range stats {
191+
metric := client.MetricRef(format.BuiltinMetricMetaAPIRateLimit.Name, statshouse.Tags{
192+
2: versionTag,
193+
3: stat.Stage,
194+
4: strconv.FormatUint(stat.InflightWeight, 10),
195+
5: strconv.Itoa(stat.ShardKey),
196+
6: strconv.Itoa(stat.ReplicaKey)})
197+
metric.Value(float64(stat.InflightCnt))
198+
}
199+
}
200+
189201
func ChRequestsMetric(shard int, aggHost string, table string, ok bool) {
190202
status := "1"
191203
if !ok {

internal/api/handler.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -686,7 +686,7 @@ func NewHandler(staticDir fs.FS, jsSettings JSSettings, showInvisible bool, chV1
686686
h.Version3Start.Store(cfg.Version3Start)
687687
h.Version3Prob.Store(cfg.Version3Prob)
688688
h.Version3StrcmpOff.Store(cfg.Version3StrcmpOff)
689-
chV2.SetLimits(cfg.UserLimits, cfg.CHMaxShardConnsRatio)
689+
chV2.SetLimits(cfg.UserLimits, cfg.CHMaxShardConnsRatio, cfg.RateLimitConfig)
690690
h.NewShardingStart.Store(cfg.NewShardingStart)
691691
h.ConfigMu.Lock()
692692
h.DisableCHAddr = cfg.DisableCHAddr
@@ -739,6 +739,7 @@ func NewHandler(staticDir fs.FS, jsSettings JSSettings, showInvisible bool, chV1
739739
ChSelectActiveQueries(client, versionTag, format.TagValueIDAPILaneSlowHeavy, ch.SemaphoreCountSlowHeavy(), ch.ShardSemaphoreCountSlowHeavy())
740740
ChSelectActiveQueries(client, versionTag, format.TagValueIDAPILaneFastHardware, ch.SemaphoreCountFastHardware(), ch.ShardSemaphoreCountFastHardware())
741741
ChSelectActiveQueries(client, versionTag, format.TagValueIDAPILaneSlowHardware, ch.SemaphoreCountSlowHardware(), ch.ShardSemaphoreCountSlowHardware())
742+
ChRateLimit(client, versionTag, ch.RateLimitStatistics())
742743
}
743744
}
744745
writeActiveQuieries(chV1, "1")
@@ -1855,7 +1856,7 @@ func (h *Handler) handlePostMetric(ctx context.Context, ai accessInfo, _ string,
18551856
}
18561857

18571858
func HandleGetMetricTagValues(r *httpRequestHandler) {
1858-
ctx, cancel := context.WithTimeout(r.Context(), r.querySelectTimeout)
1859+
ctx, cancel := context.WithTimeout(r.Context(), r.QuerySelectTimeout)
18591860
defer cancel()
18601861

18611862
_ = r.ParseForm() // (*http.Request).FormValue ignores parse errors, too
@@ -2113,7 +2114,7 @@ func HandleGetTable(r *httpRequestHandler) {
21132114
return
21142115
}
21152116

2116-
ctx, cancel := context.WithTimeout(r.Context(), r.querySelectTimeout)
2117+
ctx, cancel := context.WithTimeout(r.Context(), r.QuerySelectTimeout)
21172118
defer cancel()
21182119

21192120
if req.numResults <= 0 || maxTableRowsPage < req.numResults {
@@ -2163,7 +2164,7 @@ func HandleBadgesQuery(r *httpRequestHandler) {
21632164
respondJSON(r, nil, 0, 0, err)
21642165
return
21652166
}
2166-
ctx, cancel := context.WithTimeout(r.Context(), r.querySelectTimeout)
2167+
ctx, cancel := context.WithTimeout(r.Context(), r.QuerySelectTimeout)
21672168
defer cancel()
21682169
query := promql.Query{
21692170
Start: req.from.Unix(),
@@ -2367,7 +2368,7 @@ func HandlePointQuery(r *httpRequestHandler) {
23672368
}
23682369

23692370
func HandleGetRender(r *httpRequestHandler) {
2370-
ctx, cancel := context.WithTimeout(r.Context(), r.querySelectTimeout)
2371+
ctx, cancel := context.WithTimeout(r.Context(), r.QuerySelectTimeout)
23712372
defer cancel()
23722373
s, err := r.parseSeriesRequestS(12)
23732374
if err != nil {
@@ -2614,7 +2615,7 @@ func (h *requestHandler) handleSeriesRequestS(ctx context.Context, req seriesReq
26142615
var cancelT func()
26152616
var freeBadges func()
26162617
var freeRes func()
2617-
ctx, cancelT = context.WithTimeout(ctx, h.querySelectTimeout)
2618+
ctx, cancelT = context.WithTimeout(ctx, h.QuerySelectTimeout)
26182619
cancel := func() {
26192620
cancelT()
26202621
if freeBadges != nil {

internal/api/promql.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func HandleInstantQuery(r *httpRequestHandler) {
6262
}
6363
q.End = q.Start + 1 // handler expects half open interval [start, end)
6464
// execute query
65-
ctx, cancel := context.WithTimeout(r.Context(), r.querySelectTimeout)
65+
ctx, cancel := context.WithTimeout(r.Context(), r.QuerySelectTimeout)
6666
defer cancel()
6767
res, dispose, err := r.promEngine.Exec(ctx, r, q)
6868
if err != nil {
@@ -103,7 +103,7 @@ func HandleRangeQuery(r *httpRequestHandler) {
103103
return
104104
}
105105
// execute query
106-
ctx, cancel := context.WithTimeout(r.Context(), r.querySelectTimeout)
106+
ctx, cancel := context.WithTimeout(r.Context(), r.QuerySelectTimeout)
107107
defer cancel()
108108
res, dispose, err := r.promEngine.Exec(ctx, r, q)
109109
if err != nil {

0 commit comments

Comments
 (0)