Skip to content

Commit ba08bd9

Browse files
committed
planbuilder: add support for window functions
Signed-off-by: crangavajha1 <[email protected]>
1 parent c048e89 commit ba08bd9

File tree

13 files changed

+1503
-3
lines changed

13 files changed

+1503
-3
lines changed

go/vt/vtgate/planbuilder/operator_transformers.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ func recursiveTransform(ctx *plancontext.PlanningContext, op operators.Operator)
9191
return transformDMLWithInput(ctx, op)
9292
case *operators.RecurseCTE:
9393
return transformRecurseCTE(ctx, op)
94+
case *operators.Window:
95+
return transformWindow(ctx, op)
9496
case *operators.PercentBasedMirror:
9597
return transformPercentBasedMirror(ctx, op)
9698
}
@@ -1059,3 +1061,23 @@ func generateQuery(statement sqlparser.Statement) string {
10591061
statement.Format(buf)
10601062
return buf.String()
10611063
}
1064+
1065+
func isSingleShard(prim engine.Primitive) bool {
1066+
switch p := prim.(type) {
1067+
case *engine.Route:
1068+
return p.Opcode.IsSingleShard() || p.Opcode == engine.ByDestination
1069+
case *engine.PlanSwitcher:
1070+
return isSingleShard(p.Optimized)
1071+
case *engine.Filter:
1072+
return isSingleShard(p.Input)
1073+
case *engine.Limit:
1074+
return isSingleShard(p.Input)
1075+
case *engine.MemorySort:
1076+
return isSingleShard(p.Input)
1077+
case *engine.Projection:
1078+
return isSingleShard(p.Input)
1079+
case *engine.SimpleProjection:
1080+
return isSingleShard(p.Input)
1081+
}
1082+
return false
1083+
}

go/vt/vtgate/planbuilder/operators/horizon_expanding.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,14 @@ func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel
9494
extracted = append(extracted, "Projection")
9595
}
9696

97+
if qp.HasWindow {
98+
// Window functions are evaluated after aggregation but before ordering and limit.
99+
// We wrap the current operator (which is either a Projection or Aggregation)
100+
// with the Window operator to handle these calculations.
101+
op = newWindow(op, qp)
102+
extracted = append(extracted, "Window")
103+
}
104+
97105
if qp.NeedsDistinct() {
98106
op = newDistinct(op, qp, true)
99107
extracted = append(extracted, "Distinct")

go/vt/vtgate/planbuilder/operators/query_planning.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ func pushOrExpandHorizon(ctx *plancontext.PlanningContext, in *Horizon) (Operato
292292
!hasHaving &&
293293
!needsOrdering &&
294294
!qp.NeedsAggregation() &&
295+
!qp.HasWindow &&
295296
!isDistinctAST(in.selectStatement()) &&
296297
in.selectStatement().GetLimit() == nil
297298

@@ -308,6 +309,8 @@ func pushOrExpandHorizon(ctx *plancontext.PlanningContext, in *Horizon) (Operato
308309
debugNoRewrite("horizon push blocked: query has ORDER BY")
309310
} else if qp.NeedsAggregation() {
310311
debugNoRewrite("horizon push blocked: query needs aggregation")
312+
} else if qp.HasWindow {
313+
debugNoRewrite("horizon push blocked: query has window functions")
311314
} else if isDistinctAST(in.selectStatement()) {
312315
debugNoRewrite("horizon push blocked: query has DISTINCT")
313316
} else if in.selectStatement().GetLimit() != nil {

go/vt/vtgate/planbuilder/operators/queryprojection.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type (
4242
// If you change the contents here, please update the toString() method
4343
SelectExprs []SelectExpr
4444
HasAggr bool
45+
HasWindow bool
4546
Distinct bool
4647
WithRollup bool
4748
groupByExprs []GroupBy
@@ -169,6 +170,14 @@ func createQPFromSelect(ctx *plancontext.PlanningContext, sel *sqlparser.Select)
169170
if !qp.HasAggr && sel.Having != nil {
170171
qp.HasAggr = ctx.ContainsAggr(sel.Having.Expr)
171172
}
173+
if !qp.HasWindow {
174+
for _, order := range sel.OrderBy {
175+
if ctx.ContainsWindowFunc(order.Expr) {
176+
qp.HasWindow = true
177+
break
178+
}
179+
}
180+
}
172181
qp.calculateDistinct(ctx)
173182

174183
return qp
@@ -185,6 +194,9 @@ func (qp *QueryProjection) addSelectExpressions(ctx *plancontext.PlanningContext
185194
col.Aggr = true
186195
qp.HasAggr = true
187196
}
197+
if ctx.ContainsWindowFunc(selExp.Expr) {
198+
qp.HasWindow = true
199+
}
188200

189201
qp.SelectExprs = append(qp.SelectExprs, col)
190202
case *sqlparser.StarExpr:
@@ -414,6 +426,12 @@ func (qp *QueryProjection) AggregationExpressions(ctx *plancontext.PlanningConte
414426
if err != nil {
415427
panic(err)
416428
}
429+
430+
if ctx.ContainsWindowFunc(selectExpr.Col) {
431+
sqlparser.CopyOnRewrite(aliasedExpr.Expr, qp.extractAggr(ctx, aliasedExpr, addAggr, makeComplex), nil, nil)
432+
continue
433+
}
434+
417435
if !qp.isExprInGroupByExprs(ctx, getExpr) {
418436
aggr := createNonGroupingAggr(aliasedExpr)
419437
out = append(out, aggr)
@@ -441,6 +459,9 @@ func (qp *QueryProjection) extractAggr(
441459
return true
442460
}
443461
if aggr, isAggr := node.(sqlparser.AggrFunc); isAggr {
462+
if wf, ok := node.(sqlparser.WindowFunc); ok && wf.GetOverClause() != nil {
463+
return true
464+
}
444465
ae := aeWrap(aggr)
445466
if aggr == aliasedExpr.Expr {
446467
ae = aliasedExpr

go/vt/vtgate/planbuilder/operators/subquery_planning.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ func isMergeable(ctx *plancontext.PlanningContext, query sqlparser.TableStatemen
4343

4444
switch node := query.(type) {
4545
case *sqlparser.Select:
46+
// Window functions cannot be merged into the outer scope because they operate on a result set
47+
// and require specific partitioning and ordering semantics that would be lost in a merge
48+
if ctx.ContainsWindowFunc(node) {
49+
return false
50+
}
51+
4652
if node.GroupBy != nil && len(node.GroupBy.Exprs) > 0 {
4753
// iff we are grouping, we need to check that we can perform the grouping inside a single shard, and we check that
4854
// by checking that one of the grouping expressions used is a unique single column vindex.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
Copyright 2025 The Vitess Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package operators
18+
19+
import (
20+
"vitess.io/vitess/go/vt/sqlparser"
21+
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
22+
)
23+
24+
type Window struct {
25+
unaryOperator
26+
QP *QueryProjection
27+
}
28+
29+
func newWindow(source Operator, qp *QueryProjection) *Window {
30+
return &Window{
31+
unaryOperator: newUnaryOp(source),
32+
QP: qp,
33+
}
34+
}
35+
36+
func (w *Window) Clone(inputs []Operator) Operator {
37+
return newWindow(inputs[0], w.QP)
38+
}
39+
40+
func (w *Window) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) Operator {
41+
w.Source = w.Source.AddPredicate(ctx, expr)
42+
return w
43+
}
44+
45+
func (w *Window) AddColumn(ctx *plancontext.PlanningContext, reuseExisting bool, addToGroupBy bool, expr *sqlparser.AliasedExpr) int {
46+
return w.Source.AddColumn(ctx, reuseExisting, addToGroupBy, expr)
47+
}
48+
49+
func (w *Window) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
50+
return w.Source.AddWSColumn(ctx, offset, underRoute)
51+
}
52+
53+
func (w *Window) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int {
54+
return w.Source.FindCol(ctx, expr, underRoute)
55+
}
56+
57+
func (w *Window) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
58+
return w.Source.GetColumns(ctx)
59+
}
60+
61+
func (w *Window) GetSelectExprs(ctx *plancontext.PlanningContext) []sqlparser.SelectExpr {
62+
return w.Source.GetSelectExprs(ctx)
63+
}
64+
65+
func (w *Window) ShortDescription() string {
66+
return "Window"
67+
}
68+
69+
func (w *Window) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
70+
return w.Source.GetOrdering(ctx)
71+
}

go/vt/vtgate/planbuilder/plan_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ func (s *planTestSuite) TestPlan() {
9898
s.testFile("filter_cases.json", vw, false)
9999
s.testFile("postprocess_cases.json", vw, false)
100100
s.testFile("select_cases.json", vw, false)
101+
s.testFile("window_function_cases.json", vw, false)
101102
s.testFile("symtab_cases.json", vw, false)
102103
s.testFile("unsupported_cases.json", vw, false)
103104
s.testFile("unknown_schema_cases.json", vw, false)

go/vt/vtgate/planbuilder/plancontext/planning_context.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,12 +277,15 @@ func (ctx *PlanningContext) IsAggr(e sqlparser.SQLNode) bool {
277277

278278
func (ctx *PlanningContext) ContainsAggr(e sqlparser.SQLNode) (hasAggr bool) {
279279
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
280-
switch node.(type) {
280+
switch node := node.(type) {
281281
case *sqlparser.Offset:
282282
// offsets here indicate that a possible aggregation has already been handled by an input,
283283
// so we don't need to worry about aggregation in the original
284284
return false, nil
285285
case sqlparser.AggrFunc:
286+
if wf, ok := node.(sqlparser.WindowFunc); ok && wf.GetOverClause() != nil {
287+
return true, nil
288+
}
286289
hasAggr = true
287290
return false, io.EOF
288291
case *sqlparser.Subquery:
@@ -299,6 +302,26 @@ func (ctx *PlanningContext) ContainsAggr(e sqlparser.SQLNode) (hasAggr bool) {
299302
return
300303
}
301304

305+
func (ctx *PlanningContext) ContainsWindowFunc(e sqlparser.SQLNode) (hasWindow bool) {
306+
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
307+
switch node := node.(type) {
308+
case *sqlparser.Offset:
309+
// offsets here indicate that a possible window function has already been handled by an input,
310+
// so we don't need to worry about it in the original
311+
return false, nil
312+
case sqlparser.WindowFunc:
313+
if node.GetOverClause() != nil {
314+
hasWindow = true
315+
return false, io.EOF
316+
}
317+
case *sqlparser.Subquery:
318+
return false, nil
319+
}
320+
return true, nil
321+
}, e)
322+
return
323+
}
324+
302325
func (ctx *PlanningContext) IsMirrored() bool {
303326
return ctx.isMirrored
304327
}

0 commit comments

Comments
 (0)