Skip to content

Commit 9dac217

Browse files
committed
restore m cte
1 parent 810e119 commit 9dac217

File tree

1 file changed

+30
-54
lines changed

1 file changed

+30
-54
lines changed

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 30 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use databend_common_catalog::query_kind::QueryKind;
2727
use databend_common_catalog::table::TableExt;
2828
use databend_common_exception::ErrorCode;
2929
use databend_common_exception::Result;
30-
use databend_common_expression::DataBlock;
3130
use databend_common_license::license::Feature;
3231
use databend_common_license::license_manager::LicenseManagerSwitch;
3332
use databend_common_sql::bind_table;
@@ -45,7 +44,6 @@ use databend_common_sql::plans::set_update_stream_columns;
4544
use databend_common_sql::plans::BoundColumnRef;
4645
use databend_common_sql::plans::Plan;
4746
use databend_common_sql::plans::ReclusterPlan;
48-
use databend_common_sql::query_executor::QueryExecutor;
4947
use databend_common_sql::IdentifierNormalizer;
5048
use databend_common_sql::NameResolutionContext;
5149
use databend_common_sql::Planner;
@@ -54,7 +52,6 @@ use databend_common_sql::TypeChecker;
5452
use databend_enterprise_hilbert_clustering::get_hilbert_clustering_handler;
5553
use databend_storages_common_table_meta::table::ClusterType;
5654
use derive_visitor::DriveMut;
57-
use itertools::Itertools;
5855
use log::error;
5956
use log::warn;
6057

@@ -272,62 +269,22 @@ impl ReclusterTableInterpreter {
272269
let write_progress = self.ctx.get_write_progress();
273270
let write_progress_value = write_progress.as_ref().get_values();
274271

275-
let subquery_executor = Arc::new(ServiceQueryExecutor::new(
276-
QueryContext::create_from(self.ctx.as_ref()),
277-
));
278272
let partitions = settings.get_hilbert_num_range_ids()?;
279273
let sample_size = settings.get_hilbert_sample_size_per_block()?;
274+
280275
let mut keys_bounds = Vec::with_capacity(cluster_key_strs.len());
281-
for (index, cluster_key_str) in cluster_key_strs.iter().enumerate() {
276+
let mut hilbert_keys = Vec::with_capacity(cluster_key_strs.len());
277+
for (index, cluster_key_str) in cluster_key_strs.into_iter().enumerate() {
282278
keys_bounds.push(format!(
283279
"range_bound({partitions}, {sample_size})({cluster_key_str}) AS bound_{index}"
284280
));
285-
}
286-
let keys_bounds_query =
287-
format!("SELECT {} FROM {database}.{table}", keys_bounds.join(", "));
288-
let data_blocks = subquery_executor
289-
.execute_query_with_sql_string(&keys_bounds_query)
290-
.await?;
291-
let keys_bounds = DataBlock::concat(&data_blocks)?;
292-
293-
let mut hilbert_keys = Vec::with_capacity(keys_bounds.num_columns());
294-
for (entry, cluster_key_str) in keys_bounds
295-
.columns()
296-
.iter()
297-
.zip(cluster_key_strs.into_iter())
298-
{
299-
let v = entry.value.index(0).unwrap().to_string();
300281
hilbert_keys.push(format!(
301-
"hilbert_key(cast(ifnull(range_partition_id({table}.{cluster_key_str}, {v}), {partitions}) as uint16))"
282+
"hilbert_key(cast(ifnull(range_partition_id({table}.{cluster_key_str}, \
283+
_keys_bound.bound_{index}), {partitions}) as uint16))"
302284
));
303285
}
286+
let keys_bounds_str = keys_bounds.join(", ");
304287
let hilbert_keys_str = hilbert_keys.join(", ");
305-
let index_bound_query = format!(
306-
"WITH _source_data AS ( \
307-
SELECT \
308-
hilbert_index([{hilbert_keys_str}], 2) AS index \
309-
FROM {database}.{table} \
310-
) \
311-
SELECT range_bound({total_partitions}, {sample_size})(index) AS bound \
312-
FROM _source_data"
313-
);
314-
let data_blocks = subquery_executor
315-
.execute_query_with_sql_string(&index_bound_query)
316-
.await?;
317-
debug_assert!(data_blocks.len() == 1);
318-
let val = data_blocks[0].value_at(0, 0).unwrap();
319-
let col = val.as_array().unwrap().as_binary().unwrap();
320-
let index_bound_str = col
321-
.iter()
322-
.map(|s| {
323-
let binary = s
324-
.iter()
325-
.map(|byte| format!("{:02X}", byte))
326-
.collect::<Vec<String>>()
327-
.join("");
328-
format!("unhex('{}')", binary)
329-
})
330-
.join(", ");
331288

332289
let quote = settings.get_sql_dialect()?.default_ident_quote();
333290
let schema = tbl.schema_with_stream();
@@ -339,11 +296,26 @@ impl ReclusterTableInterpreter {
339296
));
340297
}
341298
let output_with_table_str = output_with_table.join(", ");
299+
342300
let query = format!(
343-
"SELECT \
301+
"WITH _keys_bound AS materialized ( \
302+
SELECT \
303+
{keys_bounds_str} \
304+
FROM {database}.{table} \
305+
), \
306+
_source_data AS ( \
307+
SELECT \
308+
hilbert_index([{hilbert_keys_str}], 2) AS index \
309+
FROM _keys_bound, {database}.{table} \
310+
), \
311+
_index_bound AS materialized ( \
312+
SELECT range_bound({total_partitions}, {sample_size})(index) AS bound \
313+
FROM _source_data \
314+
) \
315+
SELECT \
344316
{output_with_table_str}, \
345-
range_partition_id(hilbert_index([{hilbert_keys_str}], 2), [{index_bound_str}])AS _predicate \
346-
FROM {database}.{table}"
317+
range_partition_id(hilbert_index([{hilbert_keys_str}], 2), _index_bound.bound)AS _predicate \
318+
FROM {database}.{table}, _index_bound, _keys_bound"
347319
);
348320
let tokens = tokenize_sql(query.as_str())?;
349321
let sql_dialect = self
@@ -353,8 +325,12 @@ impl ReclusterTableInterpreter {
353325
.unwrap_or_default();
354326
let (stmt, _) = parse_sql(&tokens, sql_dialect)?;
355327

356-
let mut planner =
357-
Planner::new_with_query_executor(self.ctx.clone(), subquery_executor);
328+
let mut planner = Planner::new_with_query_executor(
329+
self.ctx.clone(),
330+
Arc::new(ServiceQueryExecutor::new(QueryContext::create_from(
331+
self.ctx.as_ref(),
332+
))),
333+
);
358334
let plan = planner.plan_stmt(&stmt, false).await?;
359335
let Plan::Query {
360336
mut s_expr,

0 commit comments

Comments
 (0)