Skip to content

Commit a4efe5d

Browse files
committed
fix
1 parent 0dbf9c6 commit a4efe5d

File tree

4 files changed

+310
-113
lines changed

4 files changed

+310
-113
lines changed

src/query/service/src/interpreters/interpreter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ async fn plan_sql(
250250
let extras = planner.parse_sql(sql)?;
251251
if !acquire_queue {
252252
// If queue guard is not required, plan the statement directly.
253-
let plan = planner.plan_stmt(&extras.statement, true).await?;
253+
let plan = planner.plan_stmt(&extras.statement).await?;
254254
return Ok((plan, extras, AcquireQueueGuard::create(None)));
255255
}
256256

@@ -261,11 +261,11 @@ async fn plan_sql(
261261
// See PR https://github.com/databendlabs/databend/pull/16632
262262
let query_entry = QueryEntry::create_entry(&ctx, &extras, true)?;
263263
let guard = QueriesQueueManager::instance().acquire(query_entry).await?;
264-
let plan = planner.plan_stmt(&extras.statement, true).await?;
264+
let plan = planner.plan_stmt(&extras.statement).await?;
265265
Ok((plan, extras, guard))
266266
} else {
267267
// No lock is needed, plan the statement first, then acquire the queue guard.
268-
let plan = planner.plan_stmt(&extras.statement, true).await?;
268+
let plan = planner.plan_stmt(&extras.statement).await?;
269269
let query_entry = QueryEntry::create(&ctx, &plan, &extras)?;
270270
let guard = QueriesQueueManager::instance().acquire(query_entry).await?;
271271
Ok((plan, extras, guard))

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 176 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,20 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::sync::Arc;
1617
use std::time::Duration;
1718
use std::time::SystemTime;
1819

19-
use databend_common_ast::parser::parse_sql;
20-
use databend_common_ast::parser::tokenize_sql;
2120
use databend_common_catalog::lock::LockTableOption;
2221
use databend_common_catalog::plan::PartInfoType;
2322
use databend_common_catalog::plan::PushDownInfo;
2423
use databend_common_catalog::plan::ReclusterInfoSideCar;
2524
use databend_common_catalog::plan::ReclusterParts;
26-
use databend_common_catalog::query_kind::QueryKind;
2725
use databend_common_catalog::table::TableExt;
2826
use databend_common_exception::ErrorCode;
2927
use databend_common_exception::Result;
28+
use databend_common_expression::types::DataType;
3029
use databend_common_expression::DataBlock;
3130
use databend_common_license::license::Feature;
3231
use databend_common_license::license_manager::LicenseManagerSwitch;
@@ -41,20 +40,22 @@ use databend_common_sql::executor::physical_plans::MutationKind;
4140
use databend_common_sql::executor::physical_plans::Recluster;
4241
use databend_common_sql::executor::PhysicalPlan;
4342
use databend_common_sql::executor::PhysicalPlanBuilder;
43+
use databend_common_sql::plans::plan_hilbert_sql;
44+
use databend_common_sql::plans::replace_with_constant;
4445
use databend_common_sql::plans::set_update_stream_columns;
4546
use databend_common_sql::plans::BoundColumnRef;
4647
use databend_common_sql::plans::Plan;
4748
use databend_common_sql::plans::ReclusterPlan;
4849
use databend_common_sql::query_executor::QueryExecutor;
4950
use databend_common_sql::IdentifierNormalizer;
51+
use databend_common_sql::IndexType;
52+
use databend_common_sql::MetadataRef;
5053
use databend_common_sql::NameResolutionContext;
51-
use databend_common_sql::Planner;
5254
use databend_common_sql::ScalarExpr;
5355
use databend_common_sql::TypeChecker;
5456
use databend_enterprise_hilbert_clustering::get_hilbert_clustering_handler;
5557
use databend_storages_common_table_meta::table::ClusterType;
5658
use derive_visitor::DriveMut;
57-
use itertools::Itertools;
5859
use log::error;
5960
use log::warn;
6061

@@ -109,6 +110,7 @@ impl Interpreter for ReclusterTableInterpreter {
109110

110111
let mut times = 0;
111112
let mut push_downs = None;
113+
let mut hilbert_info = None;
112114
let start = SystemTime::now();
113115
let timeout = Duration::from_secs(recluster_timeout_secs);
114116
let is_final = self.plan.is_final;
@@ -120,7 +122,9 @@ impl Interpreter for ReclusterTableInterpreter {
120122
return Err(err.with_context("failed to execute"));
121123
}
122124

123-
let res = self.execute_recluster(&mut push_downs).await;
125+
let res = self
126+
.execute_recluster(&mut push_downs, &mut hilbert_info)
127+
.await;
124128

125129
match res {
126130
Ok(is_break) => {
@@ -181,7 +185,11 @@ impl Interpreter for ReclusterTableInterpreter {
181185
}
182186

183187
impl ReclusterTableInterpreter {
184-
async fn execute_recluster(&self, push_downs: &mut Option<PushDownInfo>) -> Result<bool> {
188+
async fn execute_recluster(
189+
&self,
190+
push_downs: &mut Option<PushDownInfo>,
191+
hilbert_info: &mut Option<HilbertBuildInfo>,
192+
) -> Result<bool> {
185193
let start = SystemTime::now();
186194
let settings = self.ctx.get_settings();
187195

@@ -252,123 +260,187 @@ impl ReclusterTableInterpreter {
252260
let rows_per_block = block_thresholds.calc_rows_per_block(total_bytes, total_rows);
253261
let total_partitions = total_rows / rows_per_block;
254262

255-
let ast_exprs = tbl.resolve_cluster_keys(self.ctx.clone()).unwrap();
256-
let cluster_keys_len = ast_exprs.len();
257-
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
258-
let cluster_key_strs = ast_exprs.into_iter().fold(
259-
Vec::with_capacity(cluster_keys_len),
260-
|mut acc, mut ast| {
263+
if hilbert_info.is_none() {
264+
let partitions = settings.get_hilbert_num_range_ids()? as usize;
265+
let sample_size = settings.get_hilbert_sample_size_per_block()?;
266+
267+
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
268+
let (mut expr_bind_context, expr_metadata) = bind_table(tbl.clone())?;
269+
let mut type_checker = TypeChecker::try_create(
270+
&mut expr_bind_context,
271+
self.ctx.clone(),
272+
&name_resolution_ctx,
273+
expr_metadata,
274+
&[],
275+
true,
276+
)?;
277+
278+
let ast_exprs = tbl.resolve_cluster_keys(self.ctx.clone()).unwrap();
279+
let cluster_keys_len = ast_exprs.len();
280+
let mut cluster_key_strs = Vec::with_capacity(cluster_keys_len);
281+
let mut cluster_key_types = Vec::with_capacity(cluster_keys_len);
282+
let mut variable_map = Vec::with_capacity(cluster_keys_len + 1);
283+
for mut ast in ast_exprs {
261284
let mut normalizer = IdentifierNormalizer {
262285
ctx: &name_resolution_ctx,
263286
};
264287
ast.drive_mut(&mut normalizer);
265-
acc.push(format!("{:#}", &ast));
266-
acc
267-
},
268-
);
288+
cluster_key_strs.push(format!("{:#}", &ast));
289+
290+
let (scalar, _) = *type_checker.resolve(&ast)?;
291+
cluster_key_types.push(scalar.data_type()?);
292+
}
293+
294+
let metadata = MetadataRef::default();
295+
let mut keys_bounds = Vec::with_capacity(cluster_key_strs.len());
296+
let mut hilbert_keys = Vec::with_capacity(cluster_key_strs.len());
297+
for (index, cluster_key_str) in cluster_key_strs.into_iter().enumerate() {
298+
let bound = format!("_bound_{index}");
299+
keys_bounds.push(format!(
300+
"range_bound({partitions}, {sample_size})({cluster_key_str}) AS {bound}"
301+
));
302+
303+
hilbert_keys.push(format!(
304+
"hilbert_key(cast(range_partition_id({table}.{cluster_key_str}, {bound}) as uint16))"
305+
));
306+
307+
let idx = metadata.write().add_derived_column(
308+
bound,
309+
DataType::Array(Box::new(cluster_key_types[index].clone())),
310+
None,
311+
);
312+
variable_map.push(idx);
313+
}
314+
let hilbert_keys_str = hilbert_keys.join(", ");
315+
316+
let keys_bounds_query =
317+
format!("SELECT {} FROM {database}.{table}", keys_bounds.join(", "));
318+
let keys_bound = plan_hilbert_sql(
319+
self.ctx.clone(),
320+
MetadataRef::default(),
321+
&keys_bounds_query,
322+
)
323+
.await?;
324+
println!("keys_bound: {keys_bound:?}");
325+
326+
let index_bound_query = format!(
327+
"WITH _source_data AS ( \
328+
SELECT \
329+
hilbert_index([{hilbert_keys_str}], 2) AS index \
330+
FROM {database}.{table} \
331+
) \
332+
SELECT range_bound({total_partitions}, {sample_size})(index) AS bound \
333+
FROM _source_data"
334+
);
335+
println!("metadata: {:?}", metadata);
336+
let index_bound =
337+
plan_hilbert_sql(self.ctx.clone(), metadata.clone(), &index_bound_query)
338+
.await?;
339+
println!("index_bound: {index_bound:?}");
340+
341+
let quote = settings.get_sql_dialect()?.default_ident_quote();
342+
let schema = tbl.schema_with_stream();
343+
let mut output_with_table = Vec::with_capacity(schema.fields.len());
344+
for field in &schema.fields {
345+
output_with_table.push(format!(
346+
"{quote}{table}{quote}.{quote}{}{quote}",
347+
field.name
348+
));
349+
}
350+
let output_with_table_str = output_with_table.join(", ");
351+
let query = format!(
352+
"SELECT \
353+
{output_with_table_str}, \
354+
range_partition_id(hilbert_index([{hilbert_keys_str}], 2), _index_bound)AS _predicate \
355+
FROM {database}.{table}"
356+
);
357+
let idx = metadata.write().add_derived_column(
358+
"_index_bound".to_string(),
359+
DataType::Binary,
360+
None,
361+
);
362+
variable_map.push(idx);
363+
364+
let query =
365+
plan_hilbert_sql(self.ctx.clone(), metadata.clone(), &query).await?;
366+
367+
*hilbert_info = Some(HilbertBuildInfo {
368+
keys_bound,
369+
index_bound,
370+
query,
371+
variable_map,
372+
});
373+
}
269374

270-
let query_str = self.ctx.get_query_str();
271-
let write_progress = self.ctx.get_write_progress();
272-
let write_progress_value = write_progress.as_ref().get_values();
375+
let HilbertBuildInfo {
376+
keys_bound,
377+
index_bound,
378+
query,
379+
variable_map,
380+
} = hilbert_info.as_ref().unwrap();
273381

382+
let mut variables = HashMap::with_capacity(variable_map.len());
274383
let subquery_executor = Arc::new(ServiceQueryExecutor::new(
275384
QueryContext::create_from(self.ctx.as_ref()),
276385
));
277-
let partitions = std::cmp::max(
278-
total_partitions,
279-
settings.get_hilbert_num_range_ids()? as usize,
280-
);
281-
let sample_size = settings.get_hilbert_sample_size_per_block()?;
282-
let mut keys_bounds = Vec::with_capacity(cluster_key_strs.len());
283-
for (index, cluster_key_str) in cluster_key_strs.iter().enumerate() {
284-
keys_bounds.push(format!(
285-
"range_bound({partitions}, {sample_size})({cluster_key_str}) AS bound_{index}"
286-
));
287-
}
288-
let keys_bounds_query =
289-
format!("SELECT {} FROM {database}.{table}", keys_bounds.join(", "));
386+
387+
let Plan::Query {
388+
s_expr,
389+
metadata,
390+
bind_context,
391+
..
392+
} = keys_bound
393+
else {
394+
unreachable!()
395+
};
396+
let mut builder =
397+
PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone(), false);
398+
let plan = Box::new(builder.build(s_expr, bind_context.column_set()).await?);
290399
let data_blocks = subquery_executor
291-
.execute_query_with_sql_string(&keys_bounds_query)
400+
.execute_query_with_physical_plan(&plan)
292401
.await?;
293402
let keys_bounds = DataBlock::concat(&data_blocks)?;
294-
295-
let mut hilbert_keys = Vec::with_capacity(keys_bounds.num_columns());
296-
for (entry, cluster_key_str) in keys_bounds
297-
.columns()
298-
.iter()
299-
.zip(cluster_key_strs.into_iter())
300-
{
301-
let v = entry.value.index(0).unwrap().to_string();
302-
hilbert_keys.push(format!(
303-
"hilbert_key(cast(range_partition_id({table}.{cluster_key_str}, {v}) as uint16))"
304-
));
403+
for (index, entry) in keys_bounds.columns().iter().enumerate() {
404+
let v = entry.value.index(0).unwrap().to_owned();
405+
variables.insert(variable_map[index], v);
305406
}
306-
let hilbert_keys_str = hilbert_keys.join(", ");
307-
let index_bound_query = format!(
308-
"WITH _source_data AS ( \
309-
SELECT \
310-
hilbert_index([{hilbert_keys_str}], 2) AS index \
311-
FROM {database}.{table} \
312-
) \
313-
SELECT range_bound({total_partitions}, {sample_size})(index) AS bound \
314-
FROM _source_data"
315-
);
407+
408+
let Plan::Query {
409+
s_expr,
410+
metadata,
411+
bind_context,
412+
..
413+
} = index_bound
414+
else {
415+
unreachable!()
416+
};
417+
let s_expr = replace_with_constant(s_expr, &variables, total_partitions as u16);
418+
let mut builder =
419+
PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone(), false);
420+
let plan = Box::new(builder.build(&s_expr, bind_context.column_set()).await?);
316421
let data_blocks = subquery_executor
317-
.execute_query_with_sql_string(&index_bound_query)
422+
.execute_query_with_physical_plan(&plan)
318423
.await?;
319424
debug_assert!(data_blocks.len() == 1);
320-
let val = data_blocks[0].value_at(0, 0).unwrap();
321-
let col = val.as_array().unwrap().as_binary().unwrap();
322-
let index_bound_str = col
323-
.iter()
324-
.map(|s| {
325-
let binary = s
326-
.iter()
327-
.map(|byte| format!("{:02X}", byte))
328-
.collect::<Vec<String>>()
329-
.join("");
330-
format!("unhex('{}')", binary)
331-
})
332-
.join(", ");
333-
334-
let quote = settings.get_sql_dialect()?.default_ident_quote();
335-
let schema = tbl.schema_with_stream();
336-
let mut output_with_table = Vec::with_capacity(schema.fields.len());
337-
for field in &schema.fields {
338-
output_with_table.push(format!(
339-
"{quote}{table}{quote}.{quote}{}{quote}",
340-
field.name
341-
));
342-
}
343-
let output_with_table_str = output_with_table.join(", ");
344-
let query = format!(
345-
"SELECT \
346-
{output_with_table_str}, \
347-
range_partition_id(hilbert_index([{hilbert_keys_str}], 2), [{index_bound_str}])AS _predicate \
348-
FROM {database}.{table}"
349-
);
350-
let tokens = tokenize_sql(query.as_str())?;
351-
let sql_dialect = settings.get_sql_dialect().unwrap_or_default();
352-
let (stmt, _) = parse_sql(&tokens, sql_dialect)?;
425+
let val = data_blocks[0].value_at(0, 0).unwrap().to_owned();
426+
variables.insert(*variable_map.last().unwrap(), val);
353427

354-
let mut planner = Planner::new(self.ctx.clone());
355-
let plan = planner.plan_stmt(&stmt, false).await?;
356428
let Plan::Query {
357-
mut s_expr,
429+
s_expr,
358430
metadata,
359431
bind_context,
360432
..
361-
} = plan
433+
} = query
362434
else {
363435
unreachable!()
364436
};
437+
let mut s_expr = replace_with_constant(s_expr, &variables, total_partitions as u16);
365438
if tbl.change_tracking_enabled() {
366-
*s_expr = set_update_stream_columns(&s_expr)?;
439+
s_expr = set_update_stream_columns(&s_expr)?;
367440
}
368441

369-
write_progress.set(&write_progress_value);
370-
self.ctx.attach_query_str(QueryKind::Other, query_str);
371-
let mut builder = PhysicalPlanBuilder::new(metadata, self.ctx.clone(), false);
442+
let mut builder =
443+
PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone(), false);
372444
let mut plan = Box::new(builder.build(&s_expr, bind_context.column_set()).await?);
373445
let mut is_exchange = false;
374446
if let PhysicalPlan::Exchange(Exchange {
@@ -550,3 +622,10 @@ impl ReclusterTableInterpreter {
550622
Ok(false)
551623
}
552624
}
625+
626+
struct HilbertBuildInfo {
627+
keys_bound: Plan,
628+
index_bound: Plan,
629+
query: Plan,
630+
variable_map: Vec<IndexType>,
631+
}

0 commit comments

Comments
 (0)