diff --git a/.dagger/versions_pinned.go b/.dagger/versions_pinned.go index bbb6953f11..2a1d1b5050 100644 --- a/.dagger/versions_pinned.go +++ b/.dagger/versions_pinned.go @@ -2,7 +2,7 @@ package main const ( kafkaVersion = "3.6" - clickhouseVersion = "24.5.5.78" + clickhouseVersion = "24.10" redisVersion = "7.0.12" postgresVersion = "14.9" svixVersion = "v1.44" diff --git a/app/common/streaming.go b/app/common/streaming.go index af591f00e3..c22571981d 100644 --- a/app/common/streaming.go +++ b/app/common/streaming.go @@ -35,6 +35,7 @@ func NewStreamingConnector( AsyncInsert: conf.AsyncInsert, AsyncInsertWait: conf.AsyncInsertWait, InsertQuerySettings: conf.InsertQuerySettings, + MeterQuerySettings: conf.MeterQuerySettings, ProgressManager: progressmanager, }) if err != nil { diff --git a/app/config/aggregation.go b/app/config/aggregation.go index ac5f2111e9..6178feefc9 100644 --- a/app/config/aggregation.go +++ b/app/config/aggregation.go @@ -28,6 +28,10 @@ type AggregationConfiguration struct { // For example, you can set the `max_insert_threads` setting to control the number of threads // or the `parallel_view_processing` setting to enable pushing to attached views concurrently. InsertQuerySettings map[string]string + + // MeterQuerySettings is the settings for the meter query + // For example, you can set the `enable_parallel_replicas` and `max_parallel_replicas` settings. + MeterQuerySettings map[string]string } // Validate validates the configuration. diff --git a/deploy/charts/openmeter/templates/clickhouse.yaml b/deploy/charts/openmeter/templates/clickhouse.yaml index 819bdbc823..be7ede16cc 100644 --- a/deploy/charts/openmeter/templates/clickhouse.yaml +++ b/deploy/charts/openmeter/templates/clickhouse.yaml @@ -21,7 +21,7 @@ spec: spec: containers: - name: clickhouse - image: clickhouse/clickhouse-server:23.3 + image: clickhouse/clickhouse-server:24.10 volumeMounts: - name: data-storage-vc-template mountPath: /var/lib/clickhouse diff --git a/docker-compose.yaml b/docker-compose.yaml index 2bc68bf1bd..eaf2a447c4 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -27,7 +27,7 @@ services: retries: 30 clickhouse: - image: clickhouse/clickhouse-server:24.9-alpine + image: clickhouse/clickhouse-server:24.10-alpine ports: - "127.0.0.1:8123:8123" - "127.0.0.1:9000:9000" diff --git a/examples/collectors/database/docker-compose.yaml b/examples/collectors/database/docker-compose.yaml index 9d407dd8ff..0103200c86 100644 --- a/examples/collectors/database/docker-compose.yaml +++ b/examples/collectors/database/docker-compose.yaml @@ -48,7 +48,7 @@ services: clickhouse: profiles: - clickhouse - image: clickhouse/clickhouse-server:23.8.9.54-alpine + image: clickhouse/clickhouse-server:24.0-alpine ports: - 127.0.0.1:8123:8123 - 127.0.0.1:9000:9000 diff --git a/openmeter/streaming/clickhouse/connector.go b/openmeter/streaming/clickhouse/connector.go index 339a5007ea..36e3f104f6 100644 --- a/openmeter/streaming/clickhouse/connector.go +++ b/openmeter/streaming/clickhouse/connector.go @@ -34,6 +34,7 @@ type Config struct { AsyncInsert bool AsyncInsertWait bool InsertQuerySettings map[string]string + MeterQuerySettings map[string]string ProgressManager progressmanager.Service SkipCreateTables bool } @@ -169,6 +170,7 @@ func (c *Connector) QueryMeter(ctx context.Context, namespace string, meter mete GroupBy: groupBy, WindowSize: params.WindowSize, WindowTimeZone: params.WindowTimeZone, + QuerySettings: c.config.MeterQuerySettings, } // Load cached rows if any diff --git a/openmeter/streaming/clickhouse/meter_query.go b/openmeter/streaming/clickhouse/meter_query.go index f3c0acfea3..76c9958311 100644 --- a/openmeter/streaming/clickhouse/meter_query.go +++ b/openmeter/streaming/clickhouse/meter_query.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "sort" + "strings" "time" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -26,6 +27,7 @@ type queryMeter struct { GroupBy []string WindowSize *meterpkg.WindowSize WindowTimeZone *time.Location + QuerySettings map[string]string } // from returns the from time for the query. @@ -150,7 +152,7 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { // TODO: remove this when we don't round to the nearest minute anymore // We round them to the nearest minute to ensure the result is the same as with // streaming connector using materialized views with per minute windows - selectColumn := fmt.Sprintf("tumbleStart(min(%s), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(%s), toIntervalMinute(1)) AS windowend", timeColumn, timeColumn) + selectColumn := fmt.Sprintf("toStartOfMinute(min(%s)) AS windowstart, toStartOfMinute(max(%s)) + INTERVAL 1 MINUTE AS windowend", timeColumn, timeColumn) selectColumns = append(selectColumns, selectColumn) } @@ -207,6 +209,9 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { query := sqlbuilder.ClickHouse.NewSelectBuilder() query.Select(selectColumns...) query.From(tableName) + + // Prewhere clauses + query.Where(query.Equal(getColumn("namespace"), d.Namespace)) query.Where(query.Equal(getColumn("type"), d.Meter.EventType)) @@ -218,15 +223,32 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { query.Where(query.Or(slicesx.Map(d.Subject, mapFunc)...)) } + // Apply the time where clause + from := d.from() + + if from != nil { + query.Where(query.GreaterEqualThan(timeColumn, from.Unix())) + } + + if d.To != nil { + query.Where(query.LessThan(timeColumn, d.To.Unix())) + } + + var sqlPreWhere string + if len(d.FilterGroupBy) > 0 { - // We sort the group by s to ensure the query is deterministic - groupByKeys := make([]string, 0, len(d.FilterGroupBy)) + sqlPreWhere, _ = query.Build() + dataColumn := getColumn("data") + + // We sort the group bys to ensure the query is deterministic + filterGroupByKeys := make([]string, 0, len(d.FilterGroupBy)) for k := range d.FilterGroupBy { - groupByKeys = append(groupByKeys, k) + filterGroupByKeys = append(filterGroupByKeys, k) } - sort.Strings(groupByKeys) + sort.Strings(filterGroupByKeys) - for _, groupByKey := range groupByKeys { + // Where clauses + for _, groupByKey := range filterGroupByKeys { if _, ok := d.Meter.GroupBy[groupByKey]; !ok { return "", nil, fmt.Errorf("meter does not have group by: %s", groupByKey) } @@ -238,31 +260,14 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { return "", nil, fmt.Errorf("empty filter for group by: %s", groupByKey) } mapFunc := func(value string) string { - column := fmt.Sprintf("JSON_VALUE(%s, '%s')", getColumn("data"), groupByJSONPath) - - // Subject is a special case - if groupByKey == "subject" { - column = "subject" - } - - return fmt.Sprintf("%s = '%s'", column, sqlbuilder.Escape((value))) + return fmt.Sprintf("JSON_VALUE(%s, '%s') = '%s'", dataColumn, groupByJSONPath, sqlbuilder.Escape((value))) } query.Where(query.Or(slicesx.Map(values, mapFunc)...)) } } - // Apply the time where clause - from := d.from() - - if from != nil { - query.Where(query.GreaterEqualThan(timeColumn, from.Unix())) - } - - if d.To != nil { - query.Where(query.LessThan(timeColumn, d.To.Unix())) - } - + // Group by query.GroupBy(groupByColumns...) if groupByWindowSize { @@ -270,6 +275,31 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) { } sql, args := query.Build() + + // Only add prewhere if there are filters on JSON data + if sqlPreWhere != "" { + sqlParts := strings.Split(sql, sqlPreWhere) + sqlAfter := sqlParts[1] + + if strings.HasPrefix(sqlAfter, " AND") { + sqlAfter = strings.Replace(sqlAfter, "AND", "WHERE", 1) + } + + sqlPreWhere = strings.Replace(sqlPreWhere, "WHERE", "PREWHERE", 1) + sql = fmt.Sprintf("%s%s", sqlPreWhere, sqlAfter) + } + + // Add settings + settings := []string{ + "optimize_move_to_prewhere = 1", + "allow_reorder_prewhere_conditions = 1", + } + for key, value := range d.QuerySettings { + settings = append(settings, fmt.Sprintf("%s = %s", key, value)) + } + + sql = sql + fmt.Sprintf(" SETTINGS %s", strings.Join(settings, ", ")) + return sql, args, nil } diff --git a/openmeter/streaming/clickhouse/meter_query_test.go b/openmeter/streaming/clickhouse/meter_query_test.go index cdd1c92834..d2282ebbd2 100644 --- a/openmeter/streaming/clickhouse/meter_query_test.go +++ b/openmeter/streaming/clickhouse/meter_query_test.go @@ -43,7 +43,7 @@ func TestQueryMeter(t *testing.T) { GroupBy: []string{"subject", "group1", "group2"}, WindowSize: &windowSize, }, - wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'UTC') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject, JSON_VALUE(om_events.data, '$.group1') as group1, JSON_VALUE(om_events.data, '$.group2') as group2 FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend, subject, group1, group2 ORDER BY windowstart", + wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'UTC') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject, JSON_VALUE(om_events.data, '$.group1') as group1, JSON_VALUE(om_events.data, '$.group2') as group2 FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend, subject, group1, group2 ORDER BY windowstart SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", "subject1", from.Unix(), to.Unix()}, }, { // Aggregate all available data @@ -62,7 +62,7 @@ func TestQueryMeter(t *testing.T) { }, }, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ?", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, { // Aggregate with count aggregation @@ -80,7 +80,7 @@ func TestQueryMeter(t *testing.T) { }, }, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, toFloat64(count(*)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ?", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, toFloat64(count(*)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, { // Aggregate with LATEST aggregation @@ -99,7 +99,7 @@ func TestQueryMeter(t *testing.T) { }, }, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, argMax(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null), om_events.time) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ?", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, argMax(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null), om_events.time) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, { // Aggregate data from start @@ -119,7 +119,7 @@ func TestQueryMeter(t *testing.T) { }, From: &from, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ?", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", from.Unix()}, }, { // Aggregate data between period @@ -140,7 +140,7 @@ func TestQueryMeter(t *testing.T) { From: &from, To: &to, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ?", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()}, }, { // Aggregate data between period, groupped by window size @@ -162,7 +162,7 @@ func TestQueryMeter(t *testing.T) { To: &to, WindowSize: &windowSize, }, - wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'UTC') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart", + wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'UTC') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()}, }, { // Aggregate data between period in a different timezone, groupped by window size @@ -185,7 +185,7 @@ func TestQueryMeter(t *testing.T) { WindowSize: &windowSize, WindowTimeZone: tz, }, - wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart", + wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()}, }, { // Aggregate data for a single subject @@ -206,7 +206,7 @@ func TestQueryMeter(t *testing.T) { Subject: []string{subject}, GroupBy: []string{"subject"}, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) GROUP BY subject", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) GROUP BY subject SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", "subject1"}, }, { // Aggregate data for a single subject and group by additional fields @@ -227,7 +227,7 @@ func TestQueryMeter(t *testing.T) { Subject: []string{subject}, GroupBy: []string{"subject", "group1", "group2"}, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject, JSON_VALUE(om_events.data, '$.group1') as group1, JSON_VALUE(om_events.data, '$.group2') as group2 FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) GROUP BY subject, group1, group2", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject, JSON_VALUE(om_events.data, '$.group1') as group1, JSON_VALUE(om_events.data, '$.group2') as group2 FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) GROUP BY subject, group1, group2 SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", "subject1"}, }, { // Aggregate data for a multiple subjects @@ -248,7 +248,7 @@ func TestQueryMeter(t *testing.T) { Subject: []string{subject, "subject2"}, GroupBy: []string{"subject"}, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ? OR om_events.subject = ?) GROUP BY subject", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ? OR om_events.subject = ?) GROUP BY subject SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1", "subject1", "subject2"}, }, { // Aggregate data with filtering for a single group and single value @@ -268,7 +268,7 @@ func TestQueryMeter(t *testing.T) { }, FilterGroupBy: map[string][]string{"g1": {"g1v1"}}, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (JSON_VALUE(om_events.data, '$.group1') = 'g1v1')", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, { // Aggregate data with filtering for a single group and multiple values @@ -288,7 +288,7 @@ func TestQueryMeter(t *testing.T) { }, FilterGroupBy: map[string][]string{"g1": {"g1v1", "g1v2"}}, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2')", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, { // Aggregate data with filtering for multiple groups and multiple values @@ -308,7 +308,7 @@ func TestQueryMeter(t *testing.T) { }, FilterGroupBy: map[string][]string{"g1": {"g1v1", "g1v2"}, "g2": {"g2v1", "g2v2"}}, }, - wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2') AND (JSON_VALUE(om_events.data, '$.group2') = 'g2v1' OR JSON_VALUE(om_events.data, '$.group2') = 'g2v2')", + wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2') AND (JSON_VALUE(om_events.data, '$.group2') = 'g2v1' OR JSON_VALUE(om_events.data, '$.group2') = 'g2v2') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1", wantArgs: []interface{}{"my_namespace", "event1"}, }, }