Skip to content

Commit 054d5df

Browse files
authored
fix(query): fix WindowPartitionTopNExchange painc with an empty block (#17453)
* fix * add top_n to format tree * fix test * skip empty data block
1 parent fa75e6e commit 054d5df

File tree

5 files changed

+15
-0
lines changed

5 files changed

+15
-0
lines changed

src/query/pipeline/core/src/processors/shuffle_processor.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub enum MultiwayStrategy {
3232

3333
pub trait Exchange: Send + Sync + 'static {
3434
const NAME: &'static str;
35+
const SKIP_EMPTY_DATA_BLOCK: bool = false;
3536
const STRATEGY: MultiwayStrategy = MultiwayStrategy::Random;
3637

3738
fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>>;
@@ -247,6 +248,10 @@ impl<T: Exchange> Processor for PartitionProcessor<T> {
247248

248249
fn process(&mut self) -> Result<()> {
249250
if let Some(block) = self.input_data.take() {
251+
if T::SKIP_EMPTY_DATA_BLOCK && block.is_empty() {
252+
return Ok(());
253+
}
254+
250255
let partitioned = self.exchange.partition(block, self.outputs.len())?;
251256

252257
for (index, block) in partitioned.into_iter().enumerate() {

src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_partial_top_n_exchange.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ impl WindowPartitionTopNExchange {
6767

6868
impl Exchange for WindowPartitionTopNExchange {
6969
const NAME: &'static str = "WindowTopN";
70+
const SKIP_EMPTY_DATA_BLOCK: bool = true;
71+
7072
fn partition(&self, block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
7173
let partition_permutation = self.partition_permutation(&block);
7274

src/query/sql/src/executor/format.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,6 +1262,10 @@ fn window_partition_to_format_tree(
12621262
FormatTreeNode::new(format!("hash keys: [{partition_by}]")),
12631263
];
12641264

1265+
if let Some(top_n) = &plan.top_n {
1266+
children.push(FormatTreeNode::new(format!("top: {}", top_n.top)));
1267+
}
1268+
12651269
if let Some(info) = &plan.stat_info {
12661270
let items = plan_stats_info_to_format_tree(info);
12671271
children.extend(items);

tests/sqllogictests/suites/mode/cluster/shuffle.test

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ Sort
5050
│ └── WindowPartition
5151
│ ├── output columns: [t2.number (#1)]
5252
│ ├── hash keys: [number]
53+
│ ├── top: 1
5354
│ ├── estimated rows: 14.00
5455
│ └── Exchange
5556
│ ├── output columns: [t2.number (#1)]

tests/sqllogictests/suites/mode/standalone/explain/window.test

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ Filter
109109
└── WindowPartition
110110
├── output columns: [k (#4), v (#5)]
111111
├── hash keys: [k]
112+
├── top: 1
112113
├── estimated rows: 0.00
113114
└── UnionAll
114115
├── output columns: [k (#4), v (#5)]
@@ -157,6 +158,7 @@ Filter
157158
└── WindowPartition
158159
├── output columns: [k (#4), v (#5)]
159160
├── hash keys: [v]
161+
├── top: 1
160162
├── estimated rows: 0.00
161163
└── UnionAll
162164
├── output columns: [k (#4), v (#5)]
@@ -793,6 +795,7 @@ AggregateFinal
793795
└── WindowPartition
794796
├── output columns: [numbers.number (#0), id (#1)]
795797
├── hash keys: [id]
798+
├── top: 1
796799
├── estimated rows: 1000.00
797800
└── EvalScalar
798801
├── output columns: [numbers.number (#0), id (#1)]

0 commit comments

Comments
 (0)