Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
446 changes: 446 additions & 0 deletions src/query/ast/src/ast/expr.rs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/query/ast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(let_chains)]

// TODO(xuanwo): Add crate level documents here.
pub mod ast;
mod error;
Expand Down
22 changes: 17 additions & 5 deletions src/query/sql/src/planner/binder/bind_query/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use databend_common_ast::ast::CreateOption;
use databend_common_ast::ast::CreateTableStmt;
use databend_common_ast::ast::Engine;
use databend_common_ast::ast::Expr;
use databend_common_ast::ast::ExprReplacer;
use databend_common_ast::ast::Identifier;
use databend_common_ast::ast::Query;
use databend_common_ast::ast::SetExpr;
Expand Down Expand Up @@ -179,15 +180,21 @@ impl Binder {
))
}

// The return value is temp_table name`
fn m_cte_to_temp_table(&self, cte: &CTE) -> Result<()> {
fn m_cte_to_temp_table(&mut self, cte: &CTE) -> Result<()> {
let engine = if self.ctx.get_settings().get_persist_materialized_cte()? {
Engine::Fuse
} else {
Engine::Memory
};
let query_id = self.ctx.get_id();
let database = self.ctx.get_current_database();
let table_name = normalize_identifier(&cte.alias.name, &self.name_resolution_ctx).name;
let mut table_identifier = cte.alias.name.clone();
table_identifier.name = format!("{}_{}", table_identifier.name, query_id.replace("-", "_"));
let table_name = normalize_identifier(&table_identifier, &self.name_resolution_ctx).name;
self.m_cte_table_name.insert(
normalize_identifier(&cte.alias.name, &self.name_resolution_ctx).name,
table_name.clone(),
);
if self
.ctx
.is_temp_table(CATALOG_DEFAULT, &database, &table_name)
Expand All @@ -197,17 +204,22 @@ impl Binder {
table_name
)));
}

let expr_replacer = ExprReplacer::new(database.clone(), self.m_cte_table_name.clone());
let mut as_query = cte.query.clone();
expr_replacer.replace_query(&mut as_query);

let create_table_stmt = CreateTableStmt {
create_option: CreateOption::Create,
catalog: Some(Identifier::from_name(Span::None, CATALOG_DEFAULT)),
database: Some(Identifier::from_name(Span::None, database.clone())),
table: cte.alias.name.clone(),
table: table_identifier,
source: None,
engine: Some(engine),
uri_location: None,
cluster_by: None,
table_options: Default::default(),
as_query: Some(cte.query.clone()),
as_query: Some(as_query),
table_type: TableType::Temporary,
};

Expand Down
57 changes: 34 additions & 23 deletions src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,37 +70,45 @@ impl Binder {
};

// Check and bind common table expression
let mut cte_suffix_name = None;
let cte_map = bind_context.cte_context.cte_map.clone();
if let Some(cte_info) = cte_map.get(&table_name)
&& !cte_info.materialized
{
if self
.metadata
.read()
.get_table_index(Some(&database), &table_name)
.is_some()
{
return Err(ErrorCode::SyntaxException(format!(
"Table name `{}` is misleading, please distinguish it.",
table_name
))
.set_span(*span));
}
return if cte_info.recursive {
if self.bind_recursive_cte {
self.bind_r_cte_scan(bind_context, cte_info, &table_name, alias)
} else {
self.bind_r_cte(*span, bind_context, cte_info, &table_name, alias)
}
if let Some(cte_info) = cte_map.get(&table_name) {
if cte_info.materialized {
cte_suffix_name = Some(self.ctx.get_id().replace("-", "_"));
} else {
self.bind_cte(*span, bind_context, &table_name, alias, cte_info)
};
if self
.metadata
.read()
.get_table_index(Some(&database), &table_name)
.is_some()
{
return Err(ErrorCode::SyntaxException(format!(
"Table name `{}` is misleading, please distinguish it.",
table_name
))
.set_span(*span));
}
return if cte_info.recursive {
if self.bind_recursive_cte {
self.bind_r_cte_scan(bind_context, cte_info, &table_name, alias)
} else {
self.bind_r_cte(*span, bind_context, cte_info, &table_name, alias)
}
} else {
self.bind_cte(*span, bind_context, &table_name, alias, cte_info)
};
}
}

let navigation = self.resolve_temporal_clause(bind_context, temporal)?;

// Resolve table with catalog
let table_meta = {
let table_name = if let Some(cte_suffix_name) = cte_suffix_name.as_ref() {
format!("{}_{}", &table_name, cte_suffix_name)
} else {
table_name.clone()
};
match self.resolve_data_source(
catalog.as_str(),
database.as_str(),
Expand Down Expand Up @@ -154,6 +162,7 @@ impl Binder {
bind_context.planning_agg_index,
false,
consume,
None,
);
let (s_expr, mut bind_context) = self.bind_base_table(
bind_context,
Expand Down Expand Up @@ -232,6 +241,7 @@ impl Binder {
false,
false,
false,
None,
);
let (s_expr, mut new_bind_context) =
self.bind_query(&mut new_bind_context, query)?;
Expand Down Expand Up @@ -264,6 +274,7 @@ impl Binder {
bind_context.planning_agg_index,
false,
false,
cte_suffix_name,
);

let (s_expr, mut bind_context) = self.bind_base_table(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl Binder {
false,
false,
false,
None,
);

let (s_expr, mut bind_context) =
Expand Down Expand Up @@ -209,6 +210,7 @@ impl Binder {
false,
false,
false,
None,
);

let (s_expr, mut bind_context) =
Expand Down
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/binder/binder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ pub struct Binder {
/// For the recursive cte, the cte table name occurs in the recursive cte definition and main query
/// if meet recursive cte table name in cte definition, set `bind_recursive_cte` true and treat it as `CteScan`.
pub bind_recursive_cte: bool,
pub m_cte_table_name: HashMap<String, String>,

pub enable_result_cache: bool,

Expand All @@ -132,6 +133,7 @@ impl<'a> Binder {
metadata,
expression_scan_context: ExpressionScanContext::new(),
bind_recursive_cte: false,
m_cte_table_name: HashMap::new(),
enable_result_cache,
subquery_executor: None,
}
Expand Down
49 changes: 31 additions & 18 deletions src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use databend_common_ast::ast::InvertedIndexDefinition;
use databend_common_ast::ast::ModifyColumnAction;
use databend_common_ast::ast::OptimizeTableAction as AstOptimizeTableAction;
use databend_common_ast::ast::OptimizeTableStmt;
use databend_common_ast::ast::Query;
use databend_common_ast::ast::RenameTableStmt;
use databend_common_ast::ast::ShowCreateTableStmt;
use databend_common_ast::ast::ShowDropTablesStmt;
Expand Down Expand Up @@ -414,6 +415,12 @@ impl Binder {
}
}

async fn as_query_plan(&mut self, query: &Query) -> Result<Plan> {
let stmt = Statement::Query(Box::new(query.clone()));
let mut bind_context = BindContext::new();
self.bind_statement(&mut bind_context, &stmt).await
}

#[async_backtrace::framed]
pub(in crate::planner::binder) async fn bind_create_table(
&mut self,
Expand Down Expand Up @@ -513,15 +520,17 @@ impl Binder {
}

// Build table schema
let (schema, field_comments, inverted_indexes) = match (&source, &as_query) {
let (schema, field_comments, inverted_indexes, as_query_plan) = match (&source, &as_query) {
(Some(source), None) => {
// `CREATE TABLE` without `AS SELECT ...`
self.analyze_create_table_schema(source).await?
let (schema, field_comments, inverted_indexes) =
self.analyze_create_table_schema(source).await?;
(schema, field_comments, inverted_indexes, None)
}
(None, Some(query)) => {
// `CREATE TABLE AS SELECT ...` without column definitions
let mut init_bind_context = BindContext::new();
let (_, bind_context) = self.bind_query(&mut init_bind_context, query)?;
let as_query_plan = self.as_query_plan(query).await?;
let bind_context = as_query_plan.bind_context().unwrap();
let fields = bind_context
.columns
.iter()
Expand All @@ -534,14 +543,14 @@ impl Binder {
.collect::<Result<Vec<_>>>()?;
let schema = TableSchemaRefExt::create(fields);
Self::validate_create_table_schema(&schema)?;
(schema, vec![], None)
(schema, vec![], None, Some(Box::new(as_query_plan)))
}
(Some(source), Some(query)) => {
// e.g. `CREATE TABLE t (i INT) AS SELECT * from old_t` with columns specified
let (source_schema, source_comments, inverted_indexes) =
self.analyze_create_table_schema(source).await?;
let mut init_bind_context = BindContext::new();
let (_, bind_context) = self.bind_query(&mut init_bind_context, query)?;
let as_query_plan = self.as_query_plan(query).await?;
let bind_context = as_query_plan.bind_context().unwrap();
let query_fields: Vec<TableField> = bind_context
.columns
.iter()
Expand All @@ -556,9 +565,20 @@ impl Binder {
return Err(ErrorCode::BadArguments("Number of columns does not match"));
}
Self::validate_create_table_schema(&source_schema)?;
(source_schema, source_comments, inverted_indexes)
(
source_schema,
source_comments,
inverted_indexes,
Some(Box::new(as_query_plan)),
)
}
_ => {
let as_query_plan = if let Some(query) = as_query {
let as_query_plan = self.as_query_plan(query).await?;
Some(Box::new(as_query_plan))
} else {
None
};
match engine {
Engine::Iceberg => {
let sp =
Expand All @@ -569,7 +589,7 @@ impl Binder {
// since we get it from table options location and connection when load table each time.
// we do this in case we change this idea.
storage_params = Some(sp);
(Arc::new(table_schema), vec![], None)
(Arc::new(table_schema), vec![], None, as_query_plan)
}
Engine::Delta => {
let sp =
Expand All @@ -581,7 +601,7 @@ impl Binder {
// we do this in case we change this idea.
storage_params = Some(sp);
engine_options.insert(OPT_KEY_ENGINE_META.to_lowercase().to_string(), meta);
(Arc::new(table_schema), vec![], None)
(Arc::new(table_schema), vec![], None, as_query_plan)
}
_ => Err(ErrorCode::BadArguments(
"Incorrect CREATE query: required list of column descriptions or AS section or SELECT or ICEBERG/DELTA table engine",
Expand Down Expand Up @@ -689,14 +709,7 @@ impl Binder {
options,
field_comments,
cluster_key,
as_select: if let Some(query) = as_query {
let mut bind_context = BindContext::new();
let stmt = Statement::Query(Box::new(*query.clone()));
let select_plan = self.bind_statement(&mut bind_context, &stmt).await?;
Some(Box::new(select_plan))
} else {
None
},
as_select: as_query_plan,
inverted_indexes,
};
Ok(Plan::CreateTable(Box::new(plan)))
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/binder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl Binder {
false,
true,
false,
None,
);

let (s_expr, mut bind_context) =
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl Dataframe {
false,
false,
false,
None,
);

binder.bind_base_table(&bind_context, database, table_index, None, &None)
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/expression_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub fn bind_table(table_meta: Arc<dyn Table>) -> Result<(BindContext, MetadataRe
false,
false,
false,
None,
);

let columns = metadata.read().columns_by_table_index(table_index);
Expand Down
11 changes: 11 additions & 0 deletions src/query/sql/src/planner/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,10 @@ impl Metadata {
source_of_index: bool,
source_of_stage: bool,
consume: bool,
cte_suffix_name: Option<String>,
) -> IndexType {
let table_name = table_meta.name().to_string();
let table_name = Self::remove_cte_suffix(table_name, cte_suffix_name);

let table_index = self.tables.len();
// If exists table alias name, use it instead of origin name
Expand Down Expand Up @@ -485,6 +487,15 @@ impl Metadata {
pub fn base_column_scan_id(&self, column_index: usize) -> Option<usize> {
self.base_column_scan_id.get(&column_index).cloned()
}

fn remove_cte_suffix(mut table_name: String, cte_suffix_name: Option<String>) -> String {
if let Some(suffix) = cte_suffix_name {
if table_name.ends_with(&suffix) {
table_name.truncate(table_name.len() - suffix.len() - 1);
}
}
table_name
}
}

#[derive(Clone)]
Expand Down
8 changes: 8 additions & 0 deletions src/query/sql/src/planner/plans/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,4 +528,12 @@ impl Plan {
}
self.clone()
}

pub fn bind_context(&self) -> Option<BindContext> {
if let Plan::Query { bind_context, .. } = self {
Some(*bind_context.clone())
} else {
None
}
}
}
Loading
Loading