Skip to content

Commit 1eced25

Browse files
committed
fix
1 parent d560f54 commit 1eced25

File tree

2 files changed

+21
-24
lines changed

2 files changed

+21
-24
lines changed

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -255,14 +255,17 @@ impl ReclusterTableInterpreter {
255255
let block_thresholds = tbl.get_block_thresholds();
256256
let total_bytes = recluster_info.removed_statistics.uncompressed_byte_size as usize;
257257
let total_rows = recluster_info.removed_statistics.row_count as usize;
258-
let rows_per_block = block_thresholds.calc_rows_per_block(total_bytes, total_rows);
259-
let total_partitions = total_rows / rows_per_block;
258+
let block_size = settings.get_max_block_size()?;
259+
let rows_per_block = block_thresholds
260+
.calc_rows_per_block(total_bytes, total_rows)
261+
.max(block_size as usize);
262+
let total_partitions = std::cmp::max(total_rows / rows_per_block, 1);
260263

261264
let subquery_executor = Arc::new(ServiceQueryExecutor::new(
262265
QueryContext::create_from(self.ctx.as_ref()),
263266
));
267+
let partitions = settings.get_hilbert_num_range_ids()? as usize;
264268
if hilbert_info.is_none() {
265-
let partitions = settings.get_hilbert_num_range_ids()? as usize;
266269
let sample_size = settings.get_hilbert_sample_size_per_block()?;
267270

268271
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
@@ -360,18 +363,20 @@ impl ReclusterTableInterpreter {
360363
let mut variables = VecDeque::new();
361364

362365
let Plan::Query {
363-
s_expr,
366+
mut s_expr,
364367
metadata,
365368
bind_context,
366369
..
367-
} = keys_bound
370+
} = keys_bound.clone()
368371
else {
369372
unreachable!()
370373
};
374+
if total_partitions > partitions {
375+
*s_expr = replace_with_constant(&s_expr, &variables, total_partitions as u16)
376+
}
371377
metadata.write().replace_all_tables(tbl.clone());
372-
let mut builder =
373-
PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone(), false);
374-
let plan = Box::new(builder.build(s_expr, bind_context.column_set()).await?);
378+
let mut builder = PhysicalPlanBuilder::new(metadata, self.ctx.clone(), false);
379+
let plan = Box::new(builder.build(&s_expr, bind_context.column_set()).await?);
375380
let data_blocks = subquery_executor
376381
.execute_query_with_physical_plan(&plan)
377382
.await?;

src/query/sql/src/planner/plans/recluster.rs

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,7 @@ pub async fn plan_hilbert_sql(
9393
optimize(opt_ctx, plan).await
9494
}
9595

96-
pub fn replace_with_constant(
97-
expr: &SExpr,
98-
variables: &VecDeque<Scalar>,
99-
total_partitions: u16,
100-
) -> SExpr {
96+
pub fn replace_with_constant(expr: &SExpr, variables: &VecDeque<Scalar>, partitions: u16) -> SExpr {
10197
#[recursive::recursive]
10298
fn visit_expr_column(expr: &mut ScalarExpr, variables: &mut VecDeque<Scalar>) {
10399
match expr {
@@ -124,11 +120,11 @@ pub fn replace_with_constant(
124120
fn replace_with_constant_into_child(
125121
s_expr: &SExpr,
126122
variables: &mut VecDeque<Scalar>,
127-
total_partitions: u16,
123+
partitions: u16,
128124
) -> SExpr {
129125
let mut s_expr = s_expr.clone();
130126
s_expr.plan = match s_expr.plan.as_ref() {
131-
RelOperator::EvalScalar(expr) => {
127+
RelOperator::EvalScalar(expr) if !variables.is_empty() => {
132128
let mut expr = expr.clone();
133129
for item in &mut expr.items {
134130
visit_expr_column(&mut item.scalar, variables);
@@ -137,12 +133,10 @@ pub fn replace_with_constant(
137133
}
138134
RelOperator::Aggregate(aggr) => {
139135
let mut aggr = aggr.clone();
140-
if aggr.aggregate_functions.len() == 1 {
141-
let mut agg_func = aggr.aggregate_functions[0].clone();
142-
if let ScalarExpr::AggregateFunction(func) = &mut agg_func.scalar {
136+
for item in &mut aggr.aggregate_functions {
137+
if let ScalarExpr::AggregateFunction(func) = &mut item.scalar {
143138
if func.func_name == "range_bound" {
144-
func.params[0] = Scalar::Number(NumberScalar::UInt16(total_partitions));
145-
aggr.aggregate_functions = vec![agg_func];
139+
func.params[0] = Scalar::Number(NumberScalar::UInt16(partitions));
146140
}
147141
}
148142
}
@@ -157,9 +151,7 @@ pub fn replace_with_constant(
157151
let mut children = Vec::with_capacity(s_expr.children.len());
158152
for child in s_expr.children.iter() {
159153
children.push(Arc::new(replace_with_constant_into_child(
160-
child,
161-
variables,
162-
total_partitions,
154+
child, variables, partitions,
163155
)));
164156
}
165157
s_expr.children = children;
@@ -168,5 +160,5 @@ pub fn replace_with_constant(
168160
}
169161

170162
let mut variables = variables.clone();
171-
replace_with_constant_into_child(expr, &mut variables, total_partitions)
163+
replace_with_constant_into_child(expr, &mut variables, partitions)
172164
}

0 commit comments

Comments
 (0)