Skip to content

Commit d560f54

Browse files
committed
fix
1 parent a4efe5d commit d560f54

File tree

3 files changed

+44
-62
lines changed

3 files changed

+44
-62
lines changed

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 25 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::HashMap;
15+
use std::collections::VecDeque;
1616
use std::sync::Arc;
1717
use std::time::Duration;
1818
use std::time::SystemTime;
@@ -25,7 +25,6 @@ use databend_common_catalog::plan::ReclusterParts;
2525
use databend_common_catalog::table::TableExt;
2626
use databend_common_exception::ErrorCode;
2727
use databend_common_exception::Result;
28-
use databend_common_expression::types::DataType;
2928
use databend_common_expression::DataBlock;
3029
use databend_common_license::license::Feature;
3130
use databend_common_license::license_manager::LicenseManagerSwitch;
@@ -48,7 +47,6 @@ use databend_common_sql::plans::Plan;
4847
use databend_common_sql::plans::ReclusterPlan;
4948
use databend_common_sql::query_executor::QueryExecutor;
5049
use databend_common_sql::IdentifierNormalizer;
51-
use databend_common_sql::IndexType;
5250
use databend_common_sql::MetadataRef;
5351
use databend_common_sql::NameResolutionContext;
5452
use databend_common_sql::ScalarExpr;
@@ -260,6 +258,9 @@ impl ReclusterTableInterpreter {
260258
let rows_per_block = block_thresholds.calc_rows_per_block(total_bytes, total_rows);
261259
let total_partitions = total_rows / rows_per_block;
262260

261+
let subquery_executor = Arc::new(ServiceQueryExecutor::new(
262+
QueryContext::create_from(self.ctx.as_ref()),
263+
));
263264
if hilbert_info.is_none() {
264265
let partitions = settings.get_hilbert_num_range_ids()? as usize;
265266
let sample_size = settings.get_hilbert_sample_size_per_block()?;
@@ -279,7 +280,6 @@ impl ReclusterTableInterpreter {
279280
let cluster_keys_len = ast_exprs.len();
280281
let mut cluster_key_strs = Vec::with_capacity(cluster_keys_len);
281282
let mut cluster_key_types = Vec::with_capacity(cluster_keys_len);
282-
let mut variable_map = Vec::with_capacity(cluster_keys_len + 1);
283283
for mut ast in ast_exprs {
284284
let mut normalizer = IdentifierNormalizer {
285285
ctx: &name_resolution_ctx,
@@ -291,25 +291,16 @@ impl ReclusterTableInterpreter {
291291
cluster_key_types.push(scalar.data_type()?);
292292
}
293293

294-
let metadata = MetadataRef::default();
295294
let mut keys_bounds = Vec::with_capacity(cluster_key_strs.len());
296295
let mut hilbert_keys = Vec::with_capacity(cluster_key_strs.len());
297-
for (index, cluster_key_str) in cluster_key_strs.into_iter().enumerate() {
298-
let bound = format!("_bound_{index}");
296+
for cluster_key_str in cluster_key_strs.into_iter() {
299297
keys_bounds.push(format!(
300-
"range_bound({partitions}, {sample_size})({cluster_key_str}) AS {bound}"
298+
"range_bound({partitions}, {sample_size})({cluster_key_str})"
301299
));
302300

303301
hilbert_keys.push(format!(
304-
"hilbert_key(cast(range_partition_id({table}.{cluster_key_str}, {bound}) as uint16))"
302+
"hilbert_key(cast(range_partition_id({table}.{cluster_key_str}, []) as uint16))"
305303
));
306-
307-
let idx = metadata.write().add_derived_column(
308-
bound,
309-
DataType::Array(Box::new(cluster_key_types[index].clone())),
310-
None,
311-
);
312-
variable_map.push(idx);
313304
}
314305
let hilbert_keys_str = hilbert_keys.join(", ");
315306

@@ -321,22 +312,18 @@ impl ReclusterTableInterpreter {
321312
&keys_bounds_query,
322313
)
323314
.await?;
324-
println!("keys_bound: {keys_bound:?}");
325315

326316
let index_bound_query = format!(
327-
"WITH _source_data AS ( \
328-
SELECT \
329-
hilbert_index([{hilbert_keys_str}], 2) AS index \
330-
FROM {database}.{table} \
331-
) \
332-
SELECT range_bound({total_partitions}, {sample_size})(index) AS bound \
333-
FROM _source_data"
317+
"SELECT \
318+
range_bound({total_partitions}, {sample_size})(hilbert_index([{hilbert_keys_str}], 2)) \
319+
FROM {database}.{table}"
334320
);
335-
println!("metadata: {:?}", metadata);
336-
let index_bound =
337-
plan_hilbert_sql(self.ctx.clone(), metadata.clone(), &index_bound_query)
338-
.await?;
339-
println!("index_bound: {index_bound:?}");
321+
let index_bound = plan_hilbert_sql(
322+
self.ctx.clone(),
323+
MetadataRef::default(),
324+
&index_bound_query,
325+
)
326+
.await?;
340327

341328
let quote = settings.get_sql_dialect()?.default_ident_quote();
342329
let schema = tbl.schema_with_stream();
@@ -351,38 +338,26 @@ impl ReclusterTableInterpreter {
351338
let query = format!(
352339
"SELECT \
353340
{output_with_table_str}, \
354-
range_partition_id(hilbert_index([{hilbert_keys_str}], 2), _index_bound)AS _predicate \
341+
range_partition_id(hilbert_index([{hilbert_keys_str}], 2), [])AS _predicate \
355342
FROM {database}.{table}"
356343
);
357-
let idx = metadata.write().add_derived_column(
358-
"_index_bound".to_string(),
359-
DataType::Binary,
360-
None,
361-
);
362-
variable_map.push(idx);
363-
364344
let query =
365-
plan_hilbert_sql(self.ctx.clone(), metadata.clone(), &query).await?;
345+
plan_hilbert_sql(self.ctx.clone(), MetadataRef::default(), &query).await?;
366346

367347
*hilbert_info = Some(HilbertBuildInfo {
368348
keys_bound,
369349
index_bound,
370350
query,
371-
variable_map,
372351
});
373352
}
374353

375354
let HilbertBuildInfo {
376355
keys_bound,
377356
index_bound,
378357
query,
379-
variable_map,
380358
} = hilbert_info.as_ref().unwrap();
381359

382-
let mut variables = HashMap::with_capacity(variable_map.len());
383-
let subquery_executor = Arc::new(ServiceQueryExecutor::new(
384-
QueryContext::create_from(self.ctx.as_ref()),
385-
));
360+
let mut variables = VecDeque::new();
386361

387362
let Plan::Query {
388363
s_expr,
@@ -393,16 +368,17 @@ impl ReclusterTableInterpreter {
393368
else {
394369
unreachable!()
395370
};
371+
metadata.write().replace_all_tables(tbl.clone());
396372
let mut builder =
397373
PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone(), false);
398374
let plan = Box::new(builder.build(s_expr, bind_context.column_set()).await?);
399375
let data_blocks = subquery_executor
400376
.execute_query_with_physical_plan(&plan)
401377
.await?;
402378
let keys_bounds = DataBlock::concat(&data_blocks)?;
403-
for (index, entry) in keys_bounds.columns().iter().enumerate() {
379+
for entry in keys_bounds.columns().iter() {
404380
let v = entry.value.index(0).unwrap().to_owned();
405-
variables.insert(variable_map[index], v);
381+
variables.push_back(v);
406382
}
407383

408384
let Plan::Query {
@@ -415,6 +391,7 @@ impl ReclusterTableInterpreter {
415391
unreachable!()
416392
};
417393
let s_expr = replace_with_constant(s_expr, &variables, total_partitions as u16);
394+
metadata.write().replace_all_tables(tbl.clone());
418395
let mut builder =
419396
PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone(), false);
420397
let plan = Box::new(builder.build(&s_expr, bind_context.column_set()).await?);
@@ -423,7 +400,7 @@ impl ReclusterTableInterpreter {
423400
.await?;
424401
debug_assert!(data_blocks.len() == 1);
425402
let val = data_blocks[0].value_at(0, 0).unwrap().to_owned();
426-
variables.insert(*variable_map.last().unwrap(), val);
403+
variables.push_front(val);
427404

428405
let Plan::Query {
429406
s_expr,
@@ -439,6 +416,7 @@ impl ReclusterTableInterpreter {
439416
s_expr = set_update_stream_columns(&s_expr)?;
440417
}
441418

419+
metadata.write().replace_all_tables(tbl.clone());
442420
let mut builder =
443421
PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone(), false);
444422
let mut plan = Box::new(builder.build(&s_expr, bind_context.column_set()).await?);
@@ -627,5 +605,4 @@ struct HilbertBuildInfo {
627605
keys_bound: Plan,
628606
index_bound: Plan,
629607
query: Plan,
630-
variable_map: Vec<IndexType>,
631608
}

src/query/sql/src/planner/metadata.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,12 @@ impl Metadata {
498498
}
499499
table_name
500500
}
501+
502+
pub fn replace_all_tables(&mut self, table: Arc<dyn Table>) {
503+
for entry in self.tables.iter_mut() {
504+
entry.table = table.clone();
505+
}
506+
}
501507
}
502508

503509
#[derive(Clone)]

src/query/sql/src/planner/plans/recluster.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::HashMap;
15+
use std::collections::VecDeque;
1616
use std::sync::Arc;
1717

1818
use databend_common_ast::ast::Expr;
@@ -31,7 +31,6 @@ use crate::plans::ConstantExpr;
3131
use crate::plans::Plan;
3232
use crate::plans::RelOperator;
3333
use crate::Binder;
34-
use crate::IndexType;
3534
use crate::MetadataRef;
3635
use crate::NameResolutionContext;
3736
use crate::ScalarExpr;
@@ -96,24 +95,23 @@ pub async fn plan_hilbert_sql(
9695

9796
pub fn replace_with_constant(
9897
expr: &SExpr,
99-
variables: &HashMap<IndexType, Scalar>,
98+
variables: &VecDeque<Scalar>,
10099
total_partitions: u16,
101100
) -> SExpr {
102101
#[recursive::recursive]
103-
fn visit_expr_column(expr: &mut ScalarExpr, variables: &HashMap<IndexType, Scalar>) {
102+
fn visit_expr_column(expr: &mut ScalarExpr, variables: &mut VecDeque<Scalar>) {
104103
match expr {
105-
ScalarExpr::BoundColumnRef(col) => {
106-
if let Some(value) = variables.get(&col.column.index).cloned() {
107-
*expr = ScalarExpr::ConstantExpr(ConstantExpr {
108-
span: col.span,
109-
value,
110-
});
111-
}
112-
}
113104
ScalarExpr::CastExpr(cast) => {
114105
visit_expr_column(&mut cast.argument, variables);
115106
}
116107
ScalarExpr::FunctionCall(call) => {
108+
if call.func_name == "range_partition_id" {
109+
debug_assert_eq!(call.arguments.len(), 2);
110+
let last = call.arguments.last_mut().unwrap();
111+
let value = variables.pop_front().unwrap();
112+
*last = ScalarExpr::ConstantExpr(ConstantExpr { span: None, value });
113+
}
114+
117115
for arg in &mut call.arguments {
118116
visit_expr_column(arg, variables);
119117
}
@@ -125,7 +123,7 @@ pub fn replace_with_constant(
125123
#[recursive::recursive]
126124
fn replace_with_constant_into_child(
127125
s_expr: &SExpr,
128-
variables: &HashMap<IndexType, Scalar>,
126+
variables: &mut VecDeque<Scalar>,
129127
total_partitions: u16,
130128
) -> SExpr {
131129
let mut s_expr = s_expr.clone();
@@ -169,5 +167,6 @@ pub fn replace_with_constant(
169167
}
170168
}
171169

172-
replace_with_constant_into_child(expr, variables, total_partitions)
170+
let mut variables = variables.clone();
171+
replace_with_constant_into_child(expr, &mut variables, total_partitions)
173172
}

0 commit comments

Comments
 (0)