Skip to content

Commit 572d958

Browse files
authored
chore(binder): fix bind materialized cte (#17046)
* chore(binder): fix bind materialized cte * chore(code): remove unused code * chore(code): refine code * chore(binder): refine code * chore(binder): fix code * chore(query): replace m_cte table name * chore(test): update sqllogictest
1 parent f777d19 commit 572d958

File tree

15 files changed

+584
-59
lines changed

15 files changed

+584
-59
lines changed

src/query/ast/src/ast/expr.rs

Lines changed: 446 additions & 0 deletions
Large diffs are not rendered by default.

src/query/ast/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#![feature(let_chains)]
16+
1517
// TODO(xuanwo): Add crate level documents here.
1618
pub mod ast;
1719
mod error;

src/query/sql/src/planner/binder/bind_query/bind.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use databend_common_ast::ast::CreateOption;
1818
use databend_common_ast::ast::CreateTableStmt;
1919
use databend_common_ast::ast::Engine;
2020
use databend_common_ast::ast::Expr;
21+
use databend_common_ast::ast::ExprReplacer;
2122
use databend_common_ast::ast::Identifier;
2223
use databend_common_ast::ast::Query;
2324
use databend_common_ast::ast::SetExpr;
@@ -179,15 +180,21 @@ impl Binder {
179180
))
180181
}
181182

182-
// The return value is temp_table name`
183-
fn m_cte_to_temp_table(&self, cte: &CTE) -> Result<()> {
183+
fn m_cte_to_temp_table(&mut self, cte: &CTE) -> Result<()> {
184184
let engine = if self.ctx.get_settings().get_persist_materialized_cte()? {
185185
Engine::Fuse
186186
} else {
187187
Engine::Memory
188188
};
189+
let query_id = self.ctx.get_id();
189190
let database = self.ctx.get_current_database();
190-
let table_name = normalize_identifier(&cte.alias.name, &self.name_resolution_ctx).name;
191+
let mut table_identifier = cte.alias.name.clone();
192+
table_identifier.name = format!("{}_{}", table_identifier.name, query_id.replace("-", "_"));
193+
let table_name = normalize_identifier(&table_identifier, &self.name_resolution_ctx).name;
194+
self.m_cte_table_name.insert(
195+
normalize_identifier(&cte.alias.name, &self.name_resolution_ctx).name,
196+
table_name.clone(),
197+
);
191198
if self
192199
.ctx
193200
.is_temp_table(CATALOG_DEFAULT, &database, &table_name)
@@ -197,17 +204,22 @@ impl Binder {
197204
table_name
198205
)));
199206
}
207+
208+
let expr_replacer = ExprReplacer::new(database.clone(), self.m_cte_table_name.clone());
209+
let mut as_query = cte.query.clone();
210+
expr_replacer.replace_query(&mut as_query);
211+
200212
let create_table_stmt = CreateTableStmt {
201213
create_option: CreateOption::Create,
202214
catalog: Some(Identifier::from_name(Span::None, CATALOG_DEFAULT)),
203215
database: Some(Identifier::from_name(Span::None, database.clone())),
204-
table: cte.alias.name.clone(),
216+
table: table_identifier,
205217
source: None,
206218
engine: Some(engine),
207219
uri_location: None,
208220
cluster_by: None,
209221
table_options: Default::default(),
210-
as_query: Some(cte.query.clone()),
222+
as_query: Some(as_query),
211223
table_type: TableType::Temporary,
212224
};
213225

src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -70,37 +70,45 @@ impl Binder {
7070
};
7171

7272
// Check and bind common table expression
73+
let mut cte_suffix_name = None;
7374
let cte_map = bind_context.cte_context.cte_map.clone();
74-
if let Some(cte_info) = cte_map.get(&table_name)
75-
&& !cte_info.materialized
76-
{
77-
if self
78-
.metadata
79-
.read()
80-
.get_table_index(Some(&database), &table_name)
81-
.is_some()
82-
{
83-
return Err(ErrorCode::SyntaxException(format!(
84-
"Table name `{}` is misleading, please distinguish it.",
85-
table_name
86-
))
87-
.set_span(*span));
88-
}
89-
return if cte_info.recursive {
90-
if self.bind_recursive_cte {
91-
self.bind_r_cte_scan(bind_context, cte_info, &table_name, alias)
92-
} else {
93-
self.bind_r_cte(*span, bind_context, cte_info, &table_name, alias)
94-
}
75+
if let Some(cte_info) = cte_map.get(&table_name) {
76+
if cte_info.materialized {
77+
cte_suffix_name = Some(self.ctx.get_id().replace("-", "_"));
9578
} else {
96-
self.bind_cte(*span, bind_context, &table_name, alias, cte_info)
97-
};
79+
if self
80+
.metadata
81+
.read()
82+
.get_table_index(Some(&database), &table_name)
83+
.is_some()
84+
{
85+
return Err(ErrorCode::SyntaxException(format!(
86+
"Table name `{}` is misleading, please distinguish it.",
87+
table_name
88+
))
89+
.set_span(*span));
90+
}
91+
return if cte_info.recursive {
92+
if self.bind_recursive_cte {
93+
self.bind_r_cte_scan(bind_context, cte_info, &table_name, alias)
94+
} else {
95+
self.bind_r_cte(*span, bind_context, cte_info, &table_name, alias)
96+
}
97+
} else {
98+
self.bind_cte(*span, bind_context, &table_name, alias, cte_info)
99+
};
100+
}
98101
}
99102

100103
let navigation = self.resolve_temporal_clause(bind_context, temporal)?;
101104

102105
// Resolve table with catalog
103106
let table_meta = {
107+
let table_name = if let Some(cte_suffix_name) = cte_suffix_name.as_ref() {
108+
format!("{}_{}", &table_name, cte_suffix_name)
109+
} else {
110+
table_name.clone()
111+
};
104112
match self.resolve_data_source(
105113
catalog.as_str(),
106114
database.as_str(),
@@ -154,6 +162,7 @@ impl Binder {
154162
bind_context.planning_agg_index,
155163
false,
156164
consume,
165+
None,
157166
);
158167
let (s_expr, mut bind_context) = self.bind_base_table(
159168
bind_context,
@@ -232,6 +241,7 @@ impl Binder {
232241
false,
233242
false,
234243
false,
244+
None,
235245
);
236246
let (s_expr, mut new_bind_context) =
237247
self.bind_query(&mut new_bind_context, query)?;
@@ -264,6 +274,7 @@ impl Binder {
264274
bind_context.planning_agg_index,
265275
false,
266276
false,
277+
cte_suffix_name,
267278
);
268279

269280
let (s_expr, mut bind_context) = self.bind_base_table(

src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ impl Binder {
147147
false,
148148
false,
149149
false,
150+
None,
150151
);
151152

152153
let (s_expr, mut bind_context) =
@@ -209,6 +210,7 @@ impl Binder {
209210
false,
210211
false,
211212
false,
213+
None,
212214
);
213215

214216
let (s_expr, mut bind_context) =

src/query/sql/src/planner/binder/binder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ pub struct Binder {
106106
/// For the recursive cte, the cte table name occurs in the recursive cte definition and main query
107107
/// if meet recursive cte table name in cte definition, set `bind_recursive_cte` true and treat it as `CteScan`.
108108
pub bind_recursive_cte: bool,
109+
pub m_cte_table_name: HashMap<String, String>,
109110

110111
pub enable_result_cache: bool,
111112

@@ -132,6 +133,7 @@ impl<'a> Binder {
132133
metadata,
133134
expression_scan_context: ExpressionScanContext::new(),
134135
bind_recursive_cte: false,
136+
m_cte_table_name: HashMap::new(),
135137
enable_result_cache,
136138
subquery_executor: None,
137139
}

src/query/sql/src/planner/binder/ddl/table.rs

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use databend_common_ast::ast::InvertedIndexDefinition;
3737
use databend_common_ast::ast::ModifyColumnAction;
3838
use databend_common_ast::ast::OptimizeTableAction as AstOptimizeTableAction;
3939
use databend_common_ast::ast::OptimizeTableStmt;
40+
use databend_common_ast::ast::Query;
4041
use databend_common_ast::ast::RenameTableStmt;
4142
use databend_common_ast::ast::ShowCreateTableStmt;
4243
use databend_common_ast::ast::ShowDropTablesStmt;
@@ -414,6 +415,12 @@ impl Binder {
414415
}
415416
}
416417

418+
async fn as_query_plan(&mut self, query: &Query) -> Result<Plan> {
419+
let stmt = Statement::Query(Box::new(query.clone()));
420+
let mut bind_context = BindContext::new();
421+
self.bind_statement(&mut bind_context, &stmt).await
422+
}
423+
417424
#[async_backtrace::framed]
418425
pub(in crate::planner::binder) async fn bind_create_table(
419426
&mut self,
@@ -513,15 +520,17 @@ impl Binder {
513520
}
514521

515522
// Build table schema
516-
let (schema, field_comments, inverted_indexes) = match (&source, &as_query) {
523+
let (schema, field_comments, inverted_indexes, as_query_plan) = match (&source, &as_query) {
517524
(Some(source), None) => {
518525
// `CREATE TABLE` without `AS SELECT ...`
519-
self.analyze_create_table_schema(source).await?
526+
let (schema, field_comments, inverted_indexes) =
527+
self.analyze_create_table_schema(source).await?;
528+
(schema, field_comments, inverted_indexes, None)
520529
}
521530
(None, Some(query)) => {
522531
// `CREATE TABLE AS SELECT ...` without column definitions
523-
let mut init_bind_context = BindContext::new();
524-
let (_, bind_context) = self.bind_query(&mut init_bind_context, query)?;
532+
let as_query_plan = self.as_query_plan(query).await?;
533+
let bind_context = as_query_plan.bind_context().unwrap();
525534
let fields = bind_context
526535
.columns
527536
.iter()
@@ -534,14 +543,14 @@ impl Binder {
534543
.collect::<Result<Vec<_>>>()?;
535544
let schema = TableSchemaRefExt::create(fields);
536545
Self::validate_create_table_schema(&schema)?;
537-
(schema, vec![], None)
546+
(schema, vec![], None, Some(Box::new(as_query_plan)))
538547
}
539548
(Some(source), Some(query)) => {
540549
// e.g. `CREATE TABLE t (i INT) AS SELECT * from old_t` with columns specified
541550
let (source_schema, source_comments, inverted_indexes) =
542551
self.analyze_create_table_schema(source).await?;
543-
let mut init_bind_context = BindContext::new();
544-
let (_, bind_context) = self.bind_query(&mut init_bind_context, query)?;
552+
let as_query_plan = self.as_query_plan(query).await?;
553+
let bind_context = as_query_plan.bind_context().unwrap();
545554
let query_fields: Vec<TableField> = bind_context
546555
.columns
547556
.iter()
@@ -556,9 +565,20 @@ impl Binder {
556565
return Err(ErrorCode::BadArguments("Number of columns does not match"));
557566
}
558567
Self::validate_create_table_schema(&source_schema)?;
559-
(source_schema, source_comments, inverted_indexes)
568+
(
569+
source_schema,
570+
source_comments,
571+
inverted_indexes,
572+
Some(Box::new(as_query_plan)),
573+
)
560574
}
561575
_ => {
576+
let as_query_plan = if let Some(query) = as_query {
577+
let as_query_plan = self.as_query_plan(query).await?;
578+
Some(Box::new(as_query_plan))
579+
} else {
580+
None
581+
};
562582
match engine {
563583
Engine::Iceberg => {
564584
let sp =
@@ -569,7 +589,7 @@ impl Binder {
569589
// since we get it from table options location and connection when load table each time.
570590
// we do this in case we change this idea.
571591
storage_params = Some(sp);
572-
(Arc::new(table_schema), vec![], None)
592+
(Arc::new(table_schema), vec![], None, as_query_plan)
573593
}
574594
Engine::Delta => {
575595
let sp =
@@ -581,7 +601,7 @@ impl Binder {
581601
// we do this in case we change this idea.
582602
storage_params = Some(sp);
583603
engine_options.insert(OPT_KEY_ENGINE_META.to_lowercase().to_string(), meta);
584-
(Arc::new(table_schema), vec![], None)
604+
(Arc::new(table_schema), vec![], None, as_query_plan)
585605
}
586606
_ => Err(ErrorCode::BadArguments(
587607
"Incorrect CREATE query: required list of column descriptions or AS section or SELECT or ICEBERG/DELTA table engine",
@@ -689,14 +709,7 @@ impl Binder {
689709
options,
690710
field_comments,
691711
cluster_key,
692-
as_select: if let Some(query) = as_query {
693-
let mut bind_context = BindContext::new();
694-
let stmt = Statement::Query(Box::new(*query.clone()));
695-
let select_plan = self.bind_statement(&mut bind_context, &stmt).await?;
696-
Some(Box::new(select_plan))
697-
} else {
698-
None
699-
},
712+
as_select: as_query_plan,
700713
inverted_indexes,
701714
};
702715
Ok(Plan::CreateTable(Box::new(plan)))

src/query/sql/src/planner/binder/table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ impl Binder {
141141
false,
142142
true,
143143
false,
144+
None,
144145
);
145146

146147
let (s_expr, mut bind_context) =

src/query/sql/src/planner/dataframe.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ impl Dataframe {
102102
false,
103103
false,
104104
false,
105+
None,
105106
);
106107

107108
binder.bind_base_table(&bind_context, database, table_index, None, &None)

src/query/sql/src/planner/expression_parser.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub fn bind_table(table_meta: Arc<dyn Table>) -> Result<(BindContext, MetadataRe
7272
false,
7373
false,
7474
false,
75+
None,
7576
);
7677

7778
let columns = metadata.read().columns_by_table_index(table_index);

0 commit comments

Comments
 (0)