Skip to content

Commit b2d90d9

Browse files
committed
fix
1 parent ed965a3 commit b2d90d9

File tree

5 files changed

+12
-12
lines changed

5 files changed

+12
-12
lines changed

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,11 @@ impl ReclusterTableInterpreter {
267267
acc
268268
},
269269
);
270-
270+
271271
let query_str = self.ctx.get_query_str();
272272
let write_progress = self.ctx.get_write_progress();
273273
let write_progress_value = write_progress.as_ref().get_values();
274-
274+
275275
let subquery_executor = Arc::new(ServiceQueryExecutor::new(
276276
QueryContext::create_from(self.ctx.as_ref()),
277277
));
@@ -408,7 +408,6 @@ impl ReclusterTableInterpreter {
408408
plan_id: 0,
409409
input: plan,
410410
table_info: table_info.clone(),
411-
range_id: bind_context.columns.last().unwrap().index,
412411
num_partitions: total_partitions,
413412
rows_per_block,
414413
}));

src/query/service/src/pipelines/processors/transforms/transform_partition_collect.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,19 +60,24 @@ impl Exchange for HilbertPartitionExchange {
6060
.unwrap()
6161
.as_u_int64()
6262
.unwrap();
63+
data_block.pop_columns(1);
6364

6465
// Scatter the data block to different partitions.
6566
let indices = range_ids
6667
.iter()
6768
.map(|&id| (id % self.num_partitions as u64) as u16)
6869
.collect::<Vec<_>>();
69-
data_block.pop_columns(1);
70-
let scatter_blocks = DataBlock::scatter(&data_block, &indices, self.num_partitions)?;
71-
70+
let scatter_indices =
71+
DataBlock::divide_indices_by_scatter_size(&indices, self.num_partitions);
7272
// Partition the data blocks to different processors.
7373
let mut output_data_blocks = vec![vec![]; n];
74-
for (partition_id, data_block) in scatter_blocks.into_iter().enumerate() {
75-
output_data_blocks[partition_id % n].push((partition_id, data_block));
74+
for (partition_id, indices) in scatter_indices.iter().take(self.num_partitions).enumerate()
75+
{
76+
if indices.is_empty() {
77+
continue;
78+
}
79+
let block = data_block.take_with_optimize_size(indices)?;
80+
output_data_blocks[partition_id % n].push((partition_id, block));
7681
}
7782

7883
// Union data blocks for each processor.

src/query/service/src/servers/flight/v1/exchange/exchange_injector.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ impl ExchangeInjector for DefaultExchangeInjector {
9393
.iter()
9494
.position(|x| x == local_id)
9595
.unwrap();
96-
println!("local_pos {}", local_pos);
9796
HashFlightScatter::try_create(
9897
ctx.get_function_context()?,
9998
exchange.shuffle_keys.clone(),

src/query/sql/src/executor/physical_plan_visitor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ pub trait PhysicalPlanReplacer {
133133
plan_id: plan.plan_id,
134134
input: Box::new(input),
135135
table_info: plan.table_info.clone(),
136-
range_id: plan.range_id,
137136
num_partitions: plan.num_partitions,
138137
rows_per_block: plan.rows_per_block,
139138
})))

src/query/sql/src/executor/physical_plans/physical_recluster.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use databend_common_catalog::plan::ReclusterTask;
1616
use databend_common_meta_app::schema::TableInfo;
1717

1818
use crate::executor::PhysicalPlan;
19-
use crate::IndexType;
2019

2120
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
2221
pub struct Recluster {
@@ -30,7 +29,6 @@ pub struct HilbertPartition {
3029
pub plan_id: u32,
3130
pub input: Box<PhysicalPlan>,
3231
pub table_info: TableInfo,
33-
pub range_id: IndexType,
3432
pub num_partitions: usize,
3533
pub rows_per_block: usize,
3634
}

0 commit comments

Comments
 (0)