Skip to content

Commit f82331b

Browse files
committed
fix
1 parent 49a1c52 commit f82331b

File tree

1 file changed

+4
-15
lines changed

1 file changed

+4
-15
lines changed

src/query/service/src/pipelines/processors/transforms/transform_partition_collect.rs

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl HilbertPartitionExchange {
5050
}
5151

5252
impl Exchange for HilbertPartitionExchange {
53-
const NAME: &'static str = "Window";
53+
const NAME: &'static str = "Hilbert";
5454
fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
5555
// Extract the columns used for hash computation.
5656
let mut data_block = data_block.consume_convert_to_full();
@@ -62,24 +62,13 @@ impl Exchange for HilbertPartitionExchange {
6262
.iter()
6363
.map(|&id| (id % self.num_partitions as u64) as u16)
6464
.collect::<Vec<_>>();
65-
let scatter_indices =
66-
DataBlock::divide_indices_by_scatter_size(&indices, self.num_partitions);
67-
68-
let mut scatter_blocks = Vec::with_capacity(self.num_partitions);
6965
data_block.pop_columns(1);
70-
for (partition_id, indices) in scatter_indices.iter().take(self.num_partitions).enumerate()
71-
{
72-
if indices.is_empty() {
73-
continue;
74-
}
75-
let block = data_block.take_with_optimize_size(indices)?;
76-
scatter_blocks.push((partition_id, block));
77-
}
66+
let scatter_blocks = DataBlock::scatter(&data_block, &indices, self.num_partitions)?;
7867

7968
// Partition the data blocks to different processors.
8069
let mut output_data_blocks = vec![vec![]; n];
81-
for (idx, data_block) in scatter_blocks.into_iter().enumerate() {
82-
output_data_blocks[idx % n].push(data_block);
70+
for (partition_id, data_block) in scatter_blocks.into_iter().enumerate() {
71+
output_data_blocks[partition_id % n].push((partition_id, data_block));
8372
}
8473

8574
// Union data blocks for each processor.

0 commit comments

Comments
 (0)