Skip to content

Commit 6586eee

Browse files
committed
remove m_cte
1 parent ea64d05 commit 6586eee

File tree

2 files changed

+60
-31
lines changed

2 files changed

+60
-31
lines changed

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use databend_common_catalog::query_kind::QueryKind;
2727
use databend_common_catalog::table::TableExt;
2828
use databend_common_exception::ErrorCode;
2929
use databend_common_exception::Result;
30+
use databend_common_expression::DataBlock;
3031
use databend_common_license::license::Feature;
3132
use databend_common_license::license_manager::LicenseManagerSwitch;
3233
use databend_common_sql::bind_table;
@@ -44,6 +45,7 @@ use databend_common_sql::plans::set_update_stream_columns;
4445
use databend_common_sql::plans::BoundColumnRef;
4546
use databend_common_sql::plans::Plan;
4647
use databend_common_sql::plans::ReclusterPlan;
48+
use databend_common_sql::query_executor::QueryExecutor;
4749
use databend_common_sql::IdentifierNormalizer;
4850
use databend_common_sql::NameResolutionContext;
4951
use databend_common_sql::Planner;
@@ -52,6 +54,7 @@ use databend_common_sql::TypeChecker;
5254
use databend_enterprise_hilbert_clustering::get_hilbert_clustering_handler;
5355
use databend_storages_common_table_meta::table::ClusterType;
5456
use derive_visitor::DriveMut;
57+
use itertools::Itertools;
5558
use log::error;
5659
use log::warn;
5760

@@ -265,21 +268,62 @@ impl ReclusterTableInterpreter {
265268
},
266269
);
267270

271+
let subquery_executor = Arc::new(ServiceQueryExecutor::new(
272+
QueryContext::create_from(self.ctx.as_ref()),
273+
));
268274
let partitions = settings.get_hilbert_num_range_ids()?;
269275
let sample_size = settings.get_hilbert_sample_size_per_block()?;
270276
let mut keys_bounds = Vec::with_capacity(cluster_key_strs.len());
271-
let mut hilbert_keys = Vec::with_capacity(cluster_key_strs.len());
272-
for (index, cluster_key_str) in cluster_key_strs.into_iter().enumerate() {
277+
for (index, cluster_key_str) in cluster_key_strs.iter().enumerate() {
273278
keys_bounds.push(format!(
274279
"range_bound({partitions}, {sample_size})({cluster_key_str}) AS bound_{index}"
275280
));
281+
}
282+
let keys_bounds_query =
283+
format!("SELECT {} FROM {database}.{table}", keys_bounds.join(", "));
284+
let data_blocks = subquery_executor
285+
.execute_query_with_sql_string(&keys_bounds_query)
286+
.await?;
287+
let keys_bounds = DataBlock::concat(&data_blocks)?;
288+
289+
let mut hilbert_keys = Vec::with_capacity(keys_bounds.num_columns());
290+
for (entry, cluster_key_str) in keys_bounds
291+
.columns()
292+
.iter()
293+
.zip(cluster_key_strs.into_iter())
294+
{
295+
let v = entry.value.index(0).unwrap().to_string();
276296
hilbert_keys.push(format!(
277-
"hilbert_key(cast(ifnull(range_partition_id({table}.{cluster_key_str}, \
278-
_keys_bound.bound_{index}), {partitions}) as uint16))"
297+
"hilbert_key(cast(ifnull(range_partition_id({table}.{cluster_key_str}, {v}), {partitions}) as uint16))"
279298
));
280299
}
281-
let keys_bounds_str = keys_bounds.join(", ");
282300
let hilbert_keys_str = hilbert_keys.join(", ");
301+
let index_bound_query = format!(
302+
"WITH _source_data AS ( \
303+
SELECT \
304+
hilbert_index([{hilbert_keys_str}], 2) AS index \
305+
FROM {database}.{table} \
306+
) \
307+
SELECT range_bound({total_partitions}, {sample_size})(index) AS bound \
308+
FROM _source_data"
309+
);
310+
let data_blocks = subquery_executor
311+
.execute_query_with_sql_string(&index_bound_query)
312+
.await?;
313+
debug_assert!(data_blocks.len() == 1);
314+
let val = data_blocks[0].value_at(0, 0).unwrap();
315+
let col = val.as_array().unwrap().as_binary().unwrap();
316+
let index_bound_str = col
317+
.iter()
318+
.map(|s| {
319+
let binary = s
320+
.iter()
321+
.map(|byte| format!("{:02X}", byte))
322+
.collect::<Vec<String>>()
323+
.join("");
324+
format!("unhex('{}')", binary)
325+
})
326+
.join(", ");
283327

284328
let quote = settings.get_sql_dialect()?.default_ident_quote();
285329
let schema = tbl.schema_with_stream();
@@ -291,26 +335,11 @@ impl ReclusterTableInterpreter {
291335
));
292336
}
293337
let output_with_table_str = output_with_table.join(", ");
294-
295338
let query = format!(
296-
"WITH _keys_bound AS materialized ( \
297-
SELECT \
298-
{keys_bounds_str} \
299-
FROM {database}.{table} \
300-
), \
301-
_source_data AS ( \
302-
SELECT \
303-
hilbert_index([{hilbert_keys_str}], 2) AS index \
304-
FROM _keys_bound, {database}.{table} \
305-
), \
306-
_index_bound AS materialized ( \
307-
SELECT range_bound({total_partitions}, {sample_size})(index) AS bound \
308-
FROM _source_data \
309-
) \
310-
SELECT \
339+
"SELECT \
311340
{output_with_table_str}, \
312-
range_partition_id(hilbert_index([{hilbert_keys_str}], 2), _index_bound.bound)AS _predicate \
313-
FROM {database}.{table}, _index_bound, _keys_bound"
341+
range_partition_id(hilbert_index([{hilbert_keys_str}], 2), [{index_bound_str}])AS _predicate \
342+
FROM {database}.{table}"
314343
);
315344
let tokens = tokenize_sql(query.as_str())?;
316345
let sql_dialect = self
@@ -323,12 +352,8 @@ impl ReclusterTableInterpreter {
323352
let query_str = self.ctx.get_query_str();
324353
let write_progress = self.ctx.get_write_progress();
325354
let write_progress_value = write_progress.as_ref().get_values();
326-
let mut planner = Planner::new_with_query_executor(
327-
self.ctx.clone(),
328-
Arc::new(ServiceQueryExecutor::new(QueryContext::create_from(
329-
self.ctx.as_ref(),
330-
))),
331-
);
355+
let mut planner =
356+
Planner::new_with_query_executor(self.ctx.clone(), subquery_executor);
332357
let plan = planner.plan_stmt(&stmt, false).await?;
333358
let Plan::Query {
334359
mut s_expr,

src/query/service/src/pipelines/processors/transforms/transform_partition_collect.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,12 @@ impl Exchange for HilbertPartitionExchange {
5454
fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
5555
// Extract the columns used for hash computation.
5656
let mut data_block = data_block.consume_convert_to_full();
57-
let last = data_block.get_last_column().as_nullable().unwrap();
58-
let range_ids = last.column.as_number().unwrap().as_u_int64().unwrap();
57+
let range_ids = data_block
58+
.get_last_column()
59+
.as_number()
60+
.unwrap()
61+
.as_u_int64()
62+
.unwrap();
5963

6064
// Scatter the data block to different partitions.
6165
let indices = range_ids

0 commit comments

Comments
 (0)