Skip to content

Commit b4a4bf6

Browse files
authored
[api] add hardware semaphore (#1506)
1 parent ff66667 commit b4a4bf6

File tree

7 files changed

+132
-72
lines changed

7 files changed

+132
-72
lines changed

cmd/statshouse-api/statshouse-api.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ type args struct {
6969
chV2MaxHeavyFastConns int
7070
chV2MaxHeavySlowConns int
7171
chV2MaxLightSlowConns int
72+
chV2MaxHardwareFastConns int
73+
chV2MaxHardwareSlowConns int
74+
7275
chV2Password string
7376
chV2User string
7477
defaultMetric string
@@ -122,6 +125,8 @@ func main() {
122125
pflag.IntVar(&argv.chV2MaxLightSlowConns, "clickhouse-v2-max-light-slow-conns", 12, "maximum number of ClickHouse-v2 connections (light slow)")
123126
pflag.IntVar(&argv.chV2MaxHeavyFastConns, "clickhouse-v2-max-heavy-conns", 5, "maximum number of ClickHouse-v2 connections (heavy fast)")
124127
pflag.IntVar(&argv.chV2MaxHeavySlowConns, "clickhouse-v2-max-heavy-slow-conns", 1, "maximum number of ClickHouse-v2 connections (heavy slow)")
128+
pflag.IntVar(&argv.chV2MaxHardwareFastConns, "clickhouse-v2-max-hardware-fast-conns", 8, "maximum number of ClickHouse-v2 connections (hardware fast)")
129+
pflag.IntVar(&argv.chV2MaxHardwareSlowConns, "clickhouse-v2-max-hardware-slow-conns", 4, "maximum number of ClickHouse-v2 connections (hardware slow)")
125130

126131
pflag.StringVar(&argv.chV2Password, "clickhouse-v2-password", "", "ClickHouse-v2 password")
127132
pflag.StringVar(&argv.chV2User, "clickhouse-v2-user", "", "ClickHouse-v2 user")
@@ -250,14 +255,16 @@ func run(argv args, cfg *api.Config, vkuthPublicKeys map[string][]byte) error {
250255
}
251256
// argv.chV2MaxLightFastConns, argv.chV2MaxHeavyConns, , , argv.chV2Password, argv.chV2Debug, chDialTimeout
252257
chV2, err := util.OpenClickHouse(util.ChConnOptions{
253-
Addrs: argv.chV2Addrs,
254-
User: argv.chV2User,
255-
Password: argv.chV2Password,
256-
DialTimeout: chDialTimeout,
257-
FastLightMaxConns: argv.chV2MaxLightFastConns,
258-
FastHeavyMaxConns: argv.chV2MaxHeavyFastConns,
259-
SlowLightMaxConns: argv.chV2MaxLightSlowConns,
260-
SlowHeavyMaxConns: argv.chV2MaxHeavySlowConns,
258+
Addrs: argv.chV2Addrs,
259+
User: argv.chV2User,
260+
Password: argv.chV2Password,
261+
DialTimeout: chDialTimeout,
262+
FastLightMaxConns: argv.chV2MaxLightFastConns,
263+
FastHeavyMaxConns: argv.chV2MaxHeavyFastConns,
264+
SlowLightMaxConns: argv.chV2MaxLightSlowConns,
265+
SlowHeavyMaxConns: argv.chV2MaxHeavySlowConns,
266+
FastHardwareMaxConns: argv.chV2MaxHardwareFastConns,
267+
SlowHardwareMaxConns: argv.chV2MaxHardwareSlowConns,
261268
})
262269
if err != nil {
263270
return fmt.Errorf("failed to open ClickHouse-v2: %w", err)

internal/api/context.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ func withEndpointStat(ctx context.Context, es *endpointStat) context.Context {
6161
return context.WithValue(ctx, endpointStatContextKey, es)
6262
}
6363

64-
func reportQueryKind(ctx context.Context, isFast, isLight bool) {
64+
func reportQueryKind(ctx context.Context, isFast, isLight, isHardware bool) {
6565
if s, ok := ctx.Value(endpointStatContextKey).(*endpointStat); ok {
6666
s.laneMutex.Lock()
6767
defer s.laneMutex.Unlock()
6868
if len(s.lane) == 0 {
69-
s.lane = strconv.Itoa(util.QueryKind(isFast, isLight))
69+
s.lane = strconv.Itoa(util.QueryKind(isFast, isLight, isHardware))
7070
}
7171
}
7272
}

internal/api/endpoint_stat.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,15 +142,15 @@ func CurrentChunksCount(brs *BigResponseStorage) func(*statshouse.Client) {
142142
}
143143
}
144144

145-
func ChSelectMetricDuration(duration time.Duration, metricID int32, user, table, kind string, isFast, isLight bool, err error) {
145+
func ChSelectMetricDuration(duration time.Duration, metricID int32, user, table, kind string, isFast, isLight, isHardware bool, err error) {
146146
ok := "ok"
147147
if err != nil {
148148
ok = "error"
149149
}
150150
statshouse.Value(
151151
format.BuiltinMetricNameAPISelectDuration,
152152
statshouse.Tags{
153-
1: modeStr(isFast, isLight),
153+
1: modeStr(isFast, isLight, isHardware),
154154
2: strconv.Itoa(int(metricID)),
155155
3: table,
156156
4: kind,
@@ -162,16 +162,19 @@ func ChSelectMetricDuration(duration time.Duration, metricID int32, user, table,
162162
duration.Seconds())
163163
}
164164

165-
func ChSelectProfile(isFast, isLight bool, info proto.Profile, err error) {
166-
chSelectPushMetric(format.BuiltinMetricNameAPISelectBytes, isFast, isLight, float64(info.Bytes), err)
167-
chSelectPushMetric(format.BuiltinMetricNameAPISelectRows, isFast, isLight, float64(info.Rows), err)
165+
func ChSelectProfile(isFast, isLight, isHardware bool, info proto.Profile, err error) {
166+
chSelectPushMetric(format.BuiltinMetricNameAPISelectBytes, isFast, isLight, isHardware, float64(info.Bytes), err)
167+
chSelectPushMetric(format.BuiltinMetricNameAPISelectRows, isFast, isLight, isHardware, float64(info.Rows), err)
168168
}
169169

170-
func modeStr(isFast, isLight bool) string {
170+
func modeStr(isFast, isLight, isHardware bool) string {
171171
mode := "slow"
172172
if isFast {
173173
mode = "fast"
174174
}
175+
if isHardware {
176+
return mode + "_hardware"
177+
}
175178
if isLight {
176179
mode += "light"
177180
} else {
@@ -180,11 +183,11 @@ func modeStr(isFast, isLight bool) string {
180183
return mode
181184
}
182185

183-
func chSelectPushMetric(metric string, isFast, isLight bool, data float64, err error) {
186+
func chSelectPushMetric(metric string, isFast, isLight, isHardware bool, data float64, err error) {
184187
m := statshouse.GetMetricRef(
185188
metric,
186189
statshouse.Tags{
187-
1: modeStr(isFast, isLight),
190+
1: modeStr(isFast, isLight, isHardware),
188191
},
189192
)
190193
m.Value(data)

internal/api/handler.go

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,12 @@ func NewHandler(staticDir fs.FS, jsSettings JSSettings, showInvisible bool, chV1
654654

655655
slowHeavy := client.MetricRef(format.BuiltinMetricNameAPIActiveQueries, statshouse.Tags{2: versionTag, 3: strconv.Itoa(format.TagValueIDAPILaneSlowHeavy), 4: srvfunc.HostnameForStatshouse()})
656656
slowHeavy.Value(float64(ch.SemaphoreCountSlowHeavy()))
657+
658+
slowHardware := client.MetricRef(format.BuiltinMetricNameAPIActiveQueries, statshouse.Tags{2: versionTag, 3: strconv.Itoa(format.TagValueIDAPILaneFastHardware), 4: srvfunc.HostnameForStatshouse()})
659+
slowHardware.Value(float64(ch.SemaphoreCountFastHardware()))
660+
661+
fastHardware := client.MetricRef(format.BuiltinMetricNameAPIActiveQueries, statshouse.Tags{2: versionTag, 3: strconv.Itoa(format.TagValueIDAPILaneSlowHardware), 4: srvfunc.HostnameForStatshouse()})
662+
fastHardware.Value(float64(ch.SemaphoreCountSlowHardware()))
657663
}
658664
}
659665
writeActiveQuieries(chV1, "1")
@@ -788,16 +794,16 @@ func (h *Handler) doSelect(ctx context.Context, meta util.QueryMetaInto, version
788794
saveDebugQuery(ctx, query.Body)
789795

790796
start := time.Now()
791-
reportQueryKind(ctx, meta.IsFast, meta.IsLight)
797+
reportQueryKind(ctx, meta.IsFast, meta.IsLight, meta.IsHardware)
792798
info, err := h.ch[version].Select(ctx, meta, query)
793799
duration := time.Since(start)
794800
if h.verbose {
795801
log.Printf("[debug] SQL for %q done in %v, err: %v", meta.User, duration, err)
796802
}
797803

798804
reportTiming(ctx, "ch-select", duration)
799-
ChSelectMetricDuration(info.Duration, meta.Metric, meta.User, meta.Table, meta.Kind, meta.IsFast, meta.IsLight, err)
800-
ChSelectProfile(meta.IsFast, meta.IsLight, info.Profile, err)
805+
ChSelectMetricDuration(info.Duration, meta.Metric, meta.User, meta.Table, meta.Kind, meta.IsFast, meta.IsLight, meta.IsHardware, err)
806+
ChSelectProfile(meta.IsFast, meta.IsLight, meta.IsHardware, info.Profile, err)
801807

802808
return err
803809
}
@@ -2972,6 +2978,7 @@ func (h *Handler) loadPoints(ctx context.Context, pq *preparedPointsQuery, lod d
29722978
cols := newPointsSelectCols(args, true, pq.version)
29732979
isFast := lod.IsFast()
29742980
isLight := pq.isLight()
2981+
IsHardware := pq.IsHardware()
29752982
metric := pq.metricID
29762983
table := lod.Table
29772984
kind := pq.kind
@@ -2984,12 +2991,13 @@ func (h *Handler) loadPoints(ctx context.Context, pq *preparedPointsQuery, lod d
29842991
start := time.Now()
29852992
mappings := make(map[string]int32)
29862993
err = h.doSelect(ctx, util.QueryMetaInto{
2987-
IsFast: isFast,
2988-
IsLight: isLight,
2989-
User: pq.user,
2990-
Metric: metric,
2991-
Table: table,
2992-
Kind: kind.String(),
2994+
IsFast: isFast,
2995+
IsLight: isLight,
2996+
IsHardware: IsHardware,
2997+
User: pq.user,
2998+
Metric: metric,
2999+
Table: table,
3000+
Kind: kind.String(),
29933001
}, pq.version, ch.Query{
29943002
Body: query,
29953003
Result: cols.res,
@@ -3060,16 +3068,18 @@ func (h *Handler) loadPoint(ctx context.Context, pq *preparedPointsQuery, lod da
30603068
cols := newPointsSelectColsV2(args, false) // loadPoint doesn't yet have v3 query
30613069
isFast := lod.IsFast()
30623070
isLight := pq.isLight()
3071+
isHardware := pq.IsHardware()
30633072
metric := pq.metricID
30643073
table := lod.Table
30653074
kind := pq.kind
30663075
err = h.doSelect(ctx, util.QueryMetaInto{
3067-
IsFast: isFast,
3068-
IsLight: isLight,
3069-
User: pq.user,
3070-
Metric: metric,
3071-
Table: table,
3072-
Kind: kind.String(),
3076+
IsFast: isFast,
3077+
IsLight: isLight,
3078+
IsHardware: isHardware,
3079+
User: pq.user,
3080+
Metric: metric,
3081+
Table: table,
3082+
Kind: kind.String(),
30733083
}, pq.version, ch.Query{
30743084
Body: query,
30753085
Result: cols.res,

internal/api/sql.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ func (pq *preparedPointsQuery) isLight() bool {
6868
return pq.kind != data_model.DigestKindUnique && pq.kind != data_model.DigestKindPercentiles
6969
}
7070

71+
func (pq *preparedPointsQuery) IsHardware() bool {
72+
return format.HardwareMetric(pq.metricID)
73+
}
74+
7175
func tagValuesQuery(pq *preparedTagValuesQuery, lod data_model.LOD) (string, tagValuesQueryMeta, error) {
7276
if pq.version == Version3 {
7377
return tagValuesQueryV3(pq, lod)

internal/format/builtin.go

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -228,15 +228,19 @@ const (
228228
TagValueIDStaging2 = 3
229229
TagValueIDStaging3 = 4
230230

231-
TagValueIDAPILaneFastLight = 1
232-
TagValueIDAPILaneFastHeavy = 2
233-
TagValueIDAPILaneSlowLight = 3
234-
TagValueIDAPILaneSlowHeavy = 4
235-
236-
TagValueIDAPILaneFastLightv2 = 0
237-
TagValueIDAPILaneFastHeavyv2 = 1
238-
TagValueIDAPILaneSlowLightv2 = 2
239-
TagValueIDAPILaneSlowHeavyv2 = 3
231+
TagValueIDAPILaneFastLight = 1
232+
TagValueIDAPILaneFastHeavy = 2
233+
TagValueIDAPILaneSlowLight = 3
234+
TagValueIDAPILaneSlowHeavy = 4
235+
TagValueIDAPILaneFastHardware = 5
236+
TagValueIDAPILaneSlowHardware = 6
237+
238+
TagValueIDAPILaneFastLightv2 = 0
239+
TagValueIDAPILaneFastHeavyv2 = 1
240+
TagValueIDAPILaneSlowLightv2 = 2
241+
TagValueIDAPILaneSlowHeavyv2 = 3
242+
TagValueIDAPILaneSlowHardwarev2 = 4
243+
TagValueIDAPILaneFastHardwarev2 = 5
240244

241245
TagValueIDConveyorRecent = 1
242246
TagValueIDConveyorHistoric = 2
@@ -1364,10 +1368,13 @@ Ingress proxies first proxy request (to record host and IP of agent), then repla
13641368
Description: "lane",
13651369
Raw: true,
13661370
ValueComments: convertToValueComments(map[int32]string{
1367-
TagValueIDAPILaneFastLightv2: "fastlight",
1368-
TagValueIDAPILaneFastHeavyv2: "fastheavy",
1369-
TagValueIDAPILaneSlowLightv2: "slowlight",
1370-
TagValueIDAPILaneSlowHeavyv2: "slowheavy"}),
1371+
TagValueIDAPILaneFastLightv2: "fast_light",
1372+
TagValueIDAPILaneFastHeavyv2: "fast_heavy",
1373+
TagValueIDAPILaneSlowLightv2: "slow_light",
1374+
TagValueIDAPILaneSlowHeavyv2: "slow_heavy",
1375+
TagValueIDAPILaneSlowHardwarev2: "slow_hardware",
1376+
TagValueIDAPILaneFastHardwarev2: "fast_hardware",
1377+
}),
13711378
}, {
13721379
Description: "host",
13731380
}, {
@@ -1612,10 +1619,13 @@ Ingress proxies first proxy request (to record host and IP of agent), then repla
16121619
}, {
16131620
Description: "lane",
16141621
ValueComments: convertToValueComments(map[int32]string{
1615-
TagValueIDAPILaneFastLight: "fastlight",
1616-
TagValueIDAPILaneFastHeavy: "fastheavy",
1617-
TagValueIDAPILaneSlowLight: "slowlight",
1618-
TagValueIDAPILaneSlowHeavy: "slowheavy"}),
1622+
TagValueIDAPILaneFastLight: "fast_light",
1623+
TagValueIDAPILaneFastHeavy: "fast_heavy",
1624+
TagValueIDAPILaneSlowLight: "slow_light",
1625+
TagValueIDAPILaneSlowHeavy: "slow_heavy",
1626+
TagValueIDAPILaneFastHardware: "fast_hardware",
1627+
TagValueIDAPILaneSlowHardware: "slow_hardware",
1628+
}),
16191629
}, {
16201630
Description: "host",
16211631
}},

internal/util/chutil.go

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,18 @@ type connPool struct {
3737
}
3838

3939
type ClickHouse struct {
40-
pools [4]*connPool
40+
pools [6]*connPool
4141
}
4242

4343
type QueryMetaInto struct {
44-
IsFast bool
45-
IsLight bool
46-
User string
47-
Metric int32
48-
Table string
49-
Kind string
44+
IsFast bool
45+
IsLight bool
46+
IsHardware bool
47+
48+
User string
49+
Metric int32
50+
Table string
51+
Kind string
5052
}
5153

5254
type QueryHandleInfo struct {
@@ -55,33 +57,41 @@ type QueryHandleInfo struct {
5557
}
5658

5759
type ChConnOptions struct {
58-
Addrs []string
59-
User string
60-
Password string
61-
DialTimeout time.Duration
60+
Addrs []string
61+
User string
62+
Password string
63+
DialTimeout time.Duration
64+
6265
FastLightMaxConns int
6366
FastHeavyMaxConns int
6467
SlowLightMaxConns int
6568
SlowHeavyMaxConns int
69+
70+
SlowHardwareMaxConns int
71+
FastHardwareMaxConns int
6672
}
6773

6874
const (
69-
fastLight = 0
70-
fastHeavy = 1
71-
slowLight = 2
72-
slowHeavy = 3 // fix Close after adding new modes
75+
fastLight = 0 // // Must be synced with format.TagValueIDAPILaneFastLightv2
76+
fastHeavy = 1
77+
slowLight = 2
78+
slowHeavy = 3
79+
slowHardware = 4
80+
fastHardware = 5
7381
)
7482

7583
func OpenClickHouse(opt ChConnOptions) (*ClickHouse, error) {
7684
if len(opt.Addrs) == 0 {
7785
return nil, fmt.Errorf("at least one ClickHouse address must be specified")
7886
}
7987

80-
result := &ClickHouse{[4]*connPool{
81-
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.FastLightMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // fastLight
82-
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.FastHeavyMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // fastHeavy
83-
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.SlowLightMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // slowLight
84-
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.SlowHeavyMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // slowHeavy
88+
result := &ClickHouse{[6]*connPool{
89+
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.FastLightMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // fastLight
90+
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.FastHeavyMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // fastHeavy
91+
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.SlowLightMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // slowLight
92+
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.SlowHeavyMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // slowHeavy
93+
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.SlowHardwareMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // slowHardware
94+
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.FastHardwareMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // fastHardware
8595
}}
8696
for _, addr := range opt.Addrs {
8797
for _, pool := range result.pools {
@@ -142,7 +152,23 @@ func (ch *ClickHouse) SemaphoreCountFastHeavy() int64 {
142152
return cur
143153
}
144154

145-
func QueryKind(isFast, isLight bool) int {
155+
func (ch *ClickHouse) SemaphoreCountFastHardware() int64 {
156+
cur, _ := ch.pools[fastHardware].sem.Observe()
157+
return cur
158+
}
159+
160+
func (ch *ClickHouse) SemaphoreCountSlowHardware() int64 {
161+
cur, _ := ch.pools[slowHardware].sem.Observe()
162+
return cur
163+
}
164+
165+
func QueryKind(isFast, isLight, isHardware bool) int {
166+
if isHardware {
167+
if isFast {
168+
return fastHardware
169+
}
170+
return slowHardware
171+
}
146172
if isFast {
147173
if isLight {
148174
return fastLight
@@ -160,7 +186,7 @@ func (ch *ClickHouse) Select(ctx context.Context, meta QueryMetaInto, query ch.Q
160186
info.Profile = p
161187
return nil
162188
}
163-
kind := QueryKind(meta.IsFast, meta.IsLight)
189+
kind := QueryKind(meta.IsFast, meta.IsLight, meta.IsHardware)
164190
pool := ch.pools[kind]
165191
servers := append(make([]*chpool.Pool, 0, len(pool.servers)), pool.servers...)
166192
for safetyCounter := 0; safetyCounter < len(pool.servers); safetyCounter++ {

0 commit comments

Comments
 (0)