From 57412be70467e9df3ff368644e380677a77c2a84 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Mon, 9 Jun 2025 11:55:06 -0700 Subject: [PATCH 1/6] feat(clickhouse): query optimize --- openmeter/streaming/clickhouse/meter_query.go | 84 +++++++++++++++---- .../streaming/clickhouse/meter_query_test.go | 28 +++---- 2 files changed, 81 insertions(+), 31 deletions(-) diff --git a/openmeter/streaming/clickhouse/meter_query.go b/openmeter/streaming/clickhouse/meter_query.go index f3c0acfea3..3bf3edc086 100644 --- a/openmeter/streaming/clickhouse/meter_query.go +++ b/openmeter/streaming/clickhouse/meter_query.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "sort" + "strings" "time" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -26,6 +27,7 @@ type queryMeter struct { GroupBy []string WindowSize *meterpkg.WindowSize WindowTimeZone *time.Location + QuerySettings map[string]string } // from returns the from time for the query. @@ -150,7 +152,7 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { // TODO: remove this when we don't round to the nearest minute anymore // We round them to the nearest minute to ensure the result is the same as with // streaming connector using materialized views with per minute windows - selectColumn := fmt.Sprintf("tumbleStart(min(%s), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(%s), toIntervalMinute(1)) AS windowend", timeColumn, timeColumn) + selectColumn := fmt.Sprintf("toStartOfMinute(min(%s)) AS windowstart, toStartOfMinute(max(%s)) + INTERVAL 1 MINUTE AS windowend", timeColumn, timeColumn) selectColumns = append(selectColumns, selectColumn) } @@ -207,6 +209,9 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { query := sqlbuilder.ClickHouse.NewSelectBuilder() query.Select(selectColumns...) query.From(tableName) + + // Prewhere clauses + query.Where(query.Equal(getColumn("namespace"), d.Namespace)) query.Where(query.Equal(getColumn("type"), d.Meter.EventType)) @@ -218,15 +223,45 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { query.Where(query.Or(slicesx.Map(d.Subject, mapFunc)...)) } + // Apply the time where clause + from := d.from() + + if from != nil { + query.Where(query.GreaterEqualThan(timeColumn, from.Unix())) + } + + if d.To != nil { + query.Where(query.LessThan(timeColumn, d.To.Unix())) + } + + var sqlPreWhere string + if len(d.FilterGroupBy) > 0 { - // We sort the group by s to ensure the query is deterministic - groupByKeys := make([]string, 0, len(d.FilterGroupBy)) + // We sort the group bys to ensure the query is deterministic + filterGroupByKeys := make([]string, 0, len(d.FilterGroupBy)) for k := range d.FilterGroupBy { - groupByKeys = append(groupByKeys, k) + filterGroupByKeys = append(filterGroupByKeys, k) + } + sort.Strings(filterGroupByKeys) + + // Add to prewhere + for _, groupByKey := range filterGroupByKeys { + mapFunc := func(value string) string { + // Subject is a special case + if groupByKey == "subject" { + return fmt.Sprintf("subject = '%s'", sqlbuilder.Escape(value)) + } + + return fmt.Sprintf("JSONHas('%s')", sqlbuilder.Escape(value)) + } + + query.Where(query.Or(slicesx.Map(d.FilterGroupBy[groupByKey], mapFunc)...)) } - sort.Strings(groupByKeys) - for _, groupByKey := range groupByKeys { + sqlPreWhere, _ = query.Build() + + // Where clauses + for _, groupByKey := range filterGroupByKeys { if _, ok := d.Meter.GroupBy[groupByKey]; !ok { return "", nil, fmt.Errorf("meter does not have group by: %s", groupByKey) } @@ -252,17 +287,7 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { } } - // Apply the time where clause - from := d.from() - - if from != nil { - query.Where(query.GreaterEqualThan(timeColumn, from.Unix())) - } - - if d.To != nil { - query.Where(query.LessThan(timeColumn, d.To.Unix())) - } - + // Group by query.GroupBy(groupByColumns...) if groupByWindowSize { @@ -270,6 +295,31 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { } sql, args := query.Build() + + // Only add prewhere if there are filters on JSON data + if sqlPreWhere != "" { + sqlParts := strings.Split(sql, sqlPreWhere) + sqlAfter := sqlParts[1] + + if strings.HasPrefix(sqlAfter, " AND") { + sqlAfter = strings.Replace(sqlAfter, "AND", "WHERE", 1) + } + + sqlPreWhere = strings.Replace(sqlPreWhere, "WHERE", "PREWHERE", 1) + sql = fmt.Sprintf("%s%s", sqlPreWhere, sqlAfter) + } + + // Add settings + settings := []string{ + "optimize_move_to_prewhere = 1", + "allow_reorder_prewhere_conditions = 1", + } + for key, value := range d.QuerySettings { + settings = append(settings, fmt.Sprintf("%s = %s", key, value)) + } + + sql = sql + fmt.Sprintf(" SETTINGS %s", strings.Join(settings, ", ")) + return sql, args, nil } diff --git a/openmeter/streaming/clickhouse/meter_query_test.go b/openmeter/streaming/clickhouse/meter_query_test.go index cdd1c92834..169eeec219 100644 --- a/openmeter/streaming/clickhouse/meter_query_test.go +++ b/openmeter/streaming/clickhouse/meter_query_test.go @@ -43,7 +43,7 @@ func TestQueryMeter(t *testing.T) { GroupBy: []string{"subject", "group1", "group2"}, WindowSize: &windowSize, }, - wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'UTC') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject, JSON_VALUE(om_events.data, '$.group1') as group1, JSON_VALUE(om_events.data, '$.group2') as group2 FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend, subject, group1, group2 ORDER BY windowstart", + wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'UTC') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject, JSON_VALUE(om_events.data, '$.group1') as group1, JSON_VALUE(om_events.data, '$.group2') as group2 FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend, subject, group1, group2 ORDER BY windowstart SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", "subject1", from.Unix(), to.Unix()}, }, { // Aggregate all available data @@ -62,7 +62,7 @@ func TestQueryMeter(t *testing.T) { }, }, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ?", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, { // Aggregate with count aggregation @@ -80,7 +80,7 @@ func TestQueryMeter(t *testing.T) { }, }, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, toFloat64(count(*)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ?", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, toFloat64(count(*)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, { // Aggregate with LATEST aggregation @@ -99,7 +99,7 @@ func TestQueryMeter(t *testing.T) { }, }, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, argMax(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null), om_events.time) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ?", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, argMax(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null), om_events.time) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, { // Aggregate data from start @@ -119,7 +119,7 @@ func TestQueryMeter(t *testing.T) { }, From: &from, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ?", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", from.Unix()}, }, { // Aggregate data between period @@ -140,7 +140,7 @@ func TestQueryMeter(t *testing.T) { From: &from, To: &to, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ?", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()}, }, { // Aggregate data between period, groupped by window size @@ -162,7 +162,7 @@ func TestQueryMeter(t *testing.T) { To: &to, WindowSize: &windowSize, }, - wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'UTC') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart", + wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'UTC') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()}, }, { // Aggregate data between period in a different timezone, groupped by window size @@ -185,7 +185,7 @@ func TestQueryMeter(t *testing.T) { WindowSize: &windowSize, WindowTimeZone: tz, }, - wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart", + wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()}, }, { // Aggregate data for a single subject @@ -206,7 +206,7 @@ func TestQueryMeter(t *testing.T) { Subject: []string{subject}, GroupBy: []string{"subject"}, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) GROUP BY subject", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) GROUP BY subject SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", "subject1"}, }, { // Aggregate data for a single subject and group by additional fields @@ -227,7 +227,7 @@ func TestQueryMeter(t *testing.T) { Subject: []string{subject}, GroupBy: []string{"subject", "group1", "group2"}, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject, JSON_VALUE(om_events.data, '$.group1') as group1, JSON_VALUE(om_events.data, '$.group2') as group2 FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) GROUP BY subject, group1, group2", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject, JSON_VALUE(om_events.data, '$.group1') as group1, JSON_VALUE(om_events.data, '$.group2') as group2 FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) GROUP BY subject, group1, group2 SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", "subject1"}, }, { // Aggregate data for a multiple subjects @@ -248,7 +248,7 @@ func TestQueryMeter(t *testing.T) { Subject: []string{subject, "subject2"}, GroupBy: []string{"subject"}, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ? OR om_events.subject = ?) GROUP BY subject", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ? OR om_events.subject = ?) GROUP BY subject SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", "subject1", "subject2"}, }, { // Aggregate data with filtering for a single group and single value @@ -268,7 +268,7 @@ func TestQueryMeter(t *testing.T) { }, FilterGroupBy: map[string][]string{"g1": {"g1v1"}}, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (JSON_VALUE(om_events.data, '$.group1') = 'g1v1')", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? AND (JSONHas('g1v1')) WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, { // Aggregate data with filtering for a single group and multiple values @@ -288,7 +288,7 @@ func TestQueryMeter(t *testing.T) { }, FilterGroupBy: map[string][]string{"g1": {"g1v1", "g1v2"}}, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2')", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? AND (JSONHas('g1v1') OR JSONHas('g1v2')) WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, { // Aggregate data with filtering for multiple groups and multiple values @@ -308,7 +308,7 @@ func TestQueryMeter(t *testing.T) { }, FilterGroupBy: map[string][]string{"g1": {"g1v1", "g1v2"}, "g2": {"g2v1", "g2v2"}}, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2') AND (JSON_VALUE(om_events.data, '$.group2') = 'g2v1' OR JSON_VALUE(om_events.data, '$.group2') = 'g2v2')", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? AND (JSONHas('g1v1') OR JSONHas('g1v2')) AND (JSONHas('g2v1') OR JSONHas('g2v2')) WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2') AND (JSON_VALUE(om_events.data, '$.group2') = 'g2v1' OR JSON_VALUE(om_events.data, '$.group2') = 'g2v2') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, } From a5085a54fedb94be1318294e5911f7c581cf95c0 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Mon, 9 Jun 2025 12:13:41 -0700 Subject: [PATCH 2/6] feat(clickhouse): optimize --- .../openmeter/templates/clickhouse.yaml | 2 +- docker-compose.yaml | 2 +- openmeter/streaming/clickhouse/meter_query.go | 23 +++++-------------- .../streaming/clickhouse/meter_query_test.go | 6 ++--- 4 files changed, 11 insertions(+), 22 deletions(-) diff --git a/deploy/charts/openmeter/templates/clickhouse.yaml b/deploy/charts/openmeter/templates/clickhouse.yaml index 819bdbc823..be7ede16cc 100644 --- a/deploy/charts/openmeter/templates/clickhouse.yaml +++ b/deploy/charts/openmeter/templates/clickhouse.yaml @@ -21,7 +21,7 @@ spec: spec: containers: - name: clickhouse - image: clickhouse/clickhouse-server:23.3 + image: clickhouse/clickhouse-server:24.10 volumeMounts: - name: data-storage-vc-template mountPath: /var/lib/clickhouse diff --git a/docker-compose.yaml b/docker-compose.yaml index 2bc68bf1bd..eaf2a447c4 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -27,7 +27,7 @@ services: retries: 30 clickhouse: - image: clickhouse/clickhouse-server:24.9-alpine + image: clickhouse/clickhouse-server:24.10-alpine ports: - "127.0.0.1:8123:8123" - "127.0.0.1:9000:9000" diff --git a/openmeter/streaming/clickhouse/meter_query.go b/openmeter/streaming/clickhouse/meter_query.go index 3bf3edc086..0344bf54a4 100644 --- a/openmeter/streaming/clickhouse/meter_query.go +++ b/openmeter/streaming/clickhouse/meter_query.go @@ -237,6 +237,9 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { var sqlPreWhere string if len(d.FilterGroupBy) > 0 { + sqlPreWhere, _ = query.Build() + dataColumn := getColumn("data") + // We sort the group bys to ensure the query is deterministic filterGroupByKeys := make([]string, 0, len(d.FilterGroupBy)) for k := range d.FilterGroupBy { @@ -244,22 +247,6 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { } sort.Strings(filterGroupByKeys) - // Add to prewhere - for _, groupByKey := range filterGroupByKeys { - mapFunc := func(value string) string { - // Subject is a special case - if groupByKey == "subject" { - return fmt.Sprintf("subject = '%s'", sqlbuilder.Escape(value)) - } - - return fmt.Sprintf("JSONHas('%s')", sqlbuilder.Escape(value)) - } - - query.Where(query.Or(slicesx.Map(d.FilterGroupBy[groupByKey], mapFunc)...)) - } - - sqlPreWhere, _ = query.Build() - // Where clauses for _, groupByKey := range filterGroupByKeys { if _, ok := d.Meter.GroupBy[groupByKey]; !ok { @@ -273,7 +260,7 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { return "", nil, fmt.Errorf("empty filter for group by: %s", groupByKey) } mapFunc := func(value string) string { - column := fmt.Sprintf("JSON_VALUE(%s, '%s')", getColumn("data"), groupByJSONPath) + column := fmt.Sprintf("JSON_VALUE(%s, '%s')", dataColumn, groupByJSONPath) // Subject is a special case if groupByKey == "subject" { @@ -320,6 +307,8 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { sql = sql + fmt.Sprintf(" SETTINGS %s", strings.Join(settings, ", ")) + fmt.Println(sql, args) + return sql, args, nil } diff --git a/openmeter/streaming/clickhouse/meter_query_test.go b/openmeter/streaming/clickhouse/meter_query_test.go index 169eeec219..d2282ebbd2 100644 --- a/openmeter/streaming/clickhouse/meter_query_test.go +++ b/openmeter/streaming/clickhouse/meter_query_test.go @@ -268,7 +268,7 @@ func TestQueryMeter(t *testing.T) { }, FilterGroupBy: map[string][]string{"g1": {"g1v1"}}, }, - wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? AND (JSONHas('g1v1')) WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, { // Aggregate data with filtering for a single group and multiple values @@ -288,7 +288,7 @@ func TestQueryMeter(t *testing.T) { }, FilterGroupBy: map[string][]string{"g1": {"g1v1", "g1v2"}}, }, - wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? AND (JSONHas('g1v1') OR JSONHas('g1v2')) WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, { // Aggregate data with filtering for multiple groups and multiple values @@ -308,7 +308,7 @@ func TestQueryMeter(t *testing.T) { }, FilterGroupBy: map[string][]string{"g1": {"g1v1", "g1v2"}, "g2": {"g2v1", "g2v2"}}, }, - wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? AND (JSONHas('g1v1') OR JSONHas('g1v2')) AND (JSONHas('g2v1') OR JSONHas('g2v2')) WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2') AND (JSON_VALUE(om_events.data, '$.group2') = 'g2v1' OR JSON_VALUE(om_events.data, '$.group2') = 'g2v2') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2') AND (JSON_VALUE(om_events.data, '$.group2') = 'g2v1' OR JSON_VALUE(om_events.data, '$.group2') = 'g2v2') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, } From 55d8f2be5ad79cffde72381789bdad77967b4e42 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Mon, 9 Jun 2025 12:18:29 -0700 Subject: [PATCH 3/6] feat(clickhouse): optimize --- openmeter/streaming/clickhouse/meter_query.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/openmeter/streaming/clickhouse/meter_query.go b/openmeter/streaming/clickhouse/meter_query.go index 0344bf54a4..501dfe81f2 100644 --- a/openmeter/streaming/clickhouse/meter_query.go +++ b/openmeter/streaming/clickhouse/meter_query.go @@ -307,8 +307,6 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { sql = sql + fmt.Sprintf(" SETTINGS %s", strings.Join(settings, ", ")) - fmt.Println(sql, args) - return sql, args, nil } From ad06cc5a767c5b4d01354c9f6a0fe36dda49dba4 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Mon, 9 Jun 2025 12:31:36 -0700 Subject: [PATCH 4/6] feat(streaming): allow query settings from config --- app/config/aggregation.go | 4 ++++ openmeter/streaming/clickhouse/connector.go | 2 ++ openmeter/streaming/clickhouse/meter_query.go | 9 +-------- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/app/config/aggregation.go b/app/config/aggregation.go index ac5f2111e9..6178feefc9 100644 --- a/app/config/aggregation.go +++ b/app/config/aggregation.go @@ -28,6 +28,10 @@ type AggregationConfiguration struct { // For example, you can set the `max_insert_threads` setting to control the number of threads // or the `parallel_view_processing` setting to enable pushing to attached views concurrently. InsertQuerySettings map[string]string + + // MeterQuerySettings is the settings for the meter query + // For example, you can set the `enable_parallel_replicas` and `max_parallel_replicas` settings. + MeterQuerySettings map[string]string } // Validate validates the configuration. diff --git a/openmeter/streaming/clickhouse/connector.go b/openmeter/streaming/clickhouse/connector.go index 339a5007ea..36e3f104f6 100644 --- a/openmeter/streaming/clickhouse/connector.go +++ b/openmeter/streaming/clickhouse/connector.go @@ -34,6 +34,7 @@ type Config struct { AsyncInsert bool AsyncInsertWait bool InsertQuerySettings map[string]string + MeterQuerySettings map[string]string ProgressManager progressmanager.Service SkipCreateTables bool } @@ -169,6 +170,7 @@ func (c *Connector) QueryMeter(ctx context.Context, namespace string, meter mete GroupBy: groupBy, WindowSize: params.WindowSize, WindowTimeZone: params.WindowTimeZone, + QuerySettings: c.config.MeterQuerySettings, } // Load cached rows if any diff --git a/openmeter/streaming/clickhouse/meter_query.go b/openmeter/streaming/clickhouse/meter_query.go index 501dfe81f2..76c9958311 100644 --- a/openmeter/streaming/clickhouse/meter_query.go +++ b/openmeter/streaming/clickhouse/meter_query.go @@ -260,14 +260,7 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { return "", nil, fmt.Errorf("empty filter for group by: %s", groupByKey) } mapFunc := func(value string) string { - column := fmt.Sprintf("JSON_VALUE(%s, '%s')", dataColumn, groupByJSONPath) - - // Subject is a special case - if groupByKey == "subject" { - column = "subject" - } - - return fmt.Sprintf("%s = '%s'", column, sqlbuilder.Escape((value))) + return fmt.Sprintf("JSON_VALUE(%s, '%s') = '%s'", dataColumn, groupByJSONPath, sqlbuilder.Escape((value))) } query.Where(query.Or(slicesx.Map(values, mapFunc)...)) From 16fd3b52a6c9b70f2022470df3fd72f2a7cc543f Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Mon, 9 Jun 2025 12:49:31 -0700 Subject: [PATCH 5/6] chore(dagger): clickhouse version --- .dagger/versions_pinned.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.dagger/versions_pinned.go b/.dagger/versions_pinned.go index bbb6953f11..2a1d1b5050 100644 --- a/.dagger/versions_pinned.go +++ b/.dagger/versions_pinned.go @@ -2,7 +2,7 @@ package main const ( kafkaVersion = "3.6" - clickhouseVersion = "24.5.5.78" + clickhouseVersion = "24.10" redisVersion = "7.0.12" postgresVersion = "14.9" svixVersion = "v1.44" From 7cddba562dd5d15cc126d0bf5432a79304d24b67 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Mon, 9 Jun 2025 12:50:03 -0700 Subject: [PATCH 6/6] chore(dagger): clickhouse version --- app/common/streaming.go | 1 + examples/collectors/database/docker-compose.yaml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/app/common/streaming.go b/app/common/streaming.go index af591f00e3..c22571981d 100644 --- a/app/common/streaming.go +++ b/app/common/streaming.go @@ -35,6 +35,7 @@ func NewStreamingConnector( AsyncInsert: conf.AsyncInsert, AsyncInsertWait: conf.AsyncInsertWait, InsertQuerySettings: conf.InsertQuerySettings, + MeterQuerySettings: conf.MeterQuerySettings, ProgressManager: progressmanager, }) if err != nil { diff --git a/examples/collectors/database/docker-compose.yaml b/examples/collectors/database/docker-compose.yaml index 9d407dd8ff..0103200c86 100644 --- a/examples/collectors/database/docker-compose.yaml +++ b/examples/collectors/database/docker-compose.yaml @@ -48,7 +48,7 @@ services: clickhouse: profiles: - clickhouse - image: clickhouse/clickhouse-server:23.8.9.54-alpine + image: clickhouse/clickhouse-server:24.0-alpine ports: - 127.0.0.1:8123:8123 - 127.0.0.1:9000:9000