Skip to content

Commit d46605e

Browse files
dqhl76zhang2014
andauthored
fix: drop channel if partition receiver finished (#17037)
* fix: partition receiver should implement on_finish * fix: partition receiver should implement on_finish --------- Co-authored-by: Winter Zhang <[email protected]>
1 parent b5365e3 commit d46605e

File tree

3 files changed

+45
-15
lines changed

3 files changed

+45
-15
lines changed

src/query/pipeline/sources/src/async_source.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ pub trait AsyncSource: Send {
3838
fn un_reacted(&self) -> Result<()> {
3939
Ok(())
4040
}
41+
42+
#[async_backtrace::framed]
43+
async fn on_finish(&mut self) -> Result<()> {
44+
Ok(())
45+
}
4146
}
4247

4348
// TODO: This can be refactored using proc macros
@@ -50,6 +55,7 @@ pub struct AsyncSourcer<T: 'static + AsyncSource> {
5055
output: Arc<OutputPort>,
5156
scan_progress: Arc<Progress>,
5257
generated_data: Option<DataBlock>,
58+
called_on_finish: bool,
5359
}
5460

5561
impl<T: 'static + AsyncSource> AsyncSourcer<T> {
@@ -65,6 +71,7 @@ impl<T: 'static + AsyncSource> AsyncSourcer<T> {
6571
scan_progress,
6672
is_finish: false,
6773
generated_data: None,
74+
called_on_finish: false,
6875
})))
6976
}
7077
}
@@ -81,12 +88,16 @@ impl<T: 'static + AsyncSource> Processor for AsyncSourcer<T> {
8188

8289
fn event(&mut self) -> Result<Event> {
8390
if self.is_finish {
91+
if !self.called_on_finish {
92+
return Ok(Event::Async);
93+
}
8494
self.output.finish();
8595
return Ok(Event::Finished);
8696
}
8797

8898
if self.output.is_finished() {
89-
return Ok(Event::Finished);
99+
self.is_finish = true;
100+
return Ok(Event::Async);
90101
}
91102

92103
if !self.output.can_push() {
@@ -112,6 +123,13 @@ impl<T: 'static + AsyncSource> Processor for AsyncSourcer<T> {
112123

113124
#[async_backtrace::framed]
114125
async fn async_process(&mut self) -> Result<()> {
126+
if self.is_finish {
127+
if !self.called_on_finish {
128+
self.called_on_finish = true;
129+
self.inner.on_finish().await?;
130+
}
131+
return Ok(());
132+
}
115133
match self.inner.generate().await? {
116134
None => self.is_finish = true,
117135
Some(data_block) => {

src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use databend_common_pipeline_sources::AsyncSourcer;
2727
use crate::operations::read::block_partition_meta::BlockPartitionMeta;
2828

2929
pub struct BlockPartitionReceiverSource {
30-
pub meta_receiver: Receiver<Result<PartInfoPtr>>,
30+
pub meta_receiver: Option<Receiver<Result<PartInfoPtr>>>,
3131
}
3232

3333
impl BlockPartitionReceiverSource {
@@ -37,7 +37,7 @@ impl BlockPartitionReceiverSource {
3737
output_port: Arc<OutputPort>,
3838
) -> Result<ProcessorPtr> {
3939
AsyncSourcer::create(ctx, output_port, Self {
40-
meta_receiver: receiver,
40+
meta_receiver: Some(receiver),
4141
})
4242
}
4343
}
@@ -49,18 +49,28 @@ impl AsyncSource for BlockPartitionReceiverSource {
4949

5050
#[async_backtrace::framed]
5151
async fn generate(&mut self) -> Result<Option<DataBlock>> {
52-
match self.meta_receiver.recv().await {
53-
Ok(Ok(part)) => Ok(Some(DataBlock::empty_with_meta(
54-
BlockPartitionMeta::create(vec![part]),
55-
))),
56-
Ok(Err(e)) => Err(
57-
// The error is occurred in pruning process
58-
e,
59-
),
60-
Err(_) => {
61-
// The channel is closed, we should return None to stop generating
62-
Ok(None)
52+
if let Some(rx) = &self.meta_receiver {
53+
match rx.recv().await {
54+
Ok(Ok(part)) => Ok(Some(DataBlock::empty_with_meta(
55+
BlockPartitionMeta::create(vec![part]),
56+
))),
57+
Ok(Err(e)) => Err(
58+
// The error is occurred in pruning process
59+
e,
60+
),
61+
Err(_) => {
62+
// The channel is closed, we should return None to stop generating
63+
Ok(None)
64+
}
6365
}
66+
} else {
67+
Ok(None)
6468
}
6569
}
70+
71+
#[async_backtrace::framed]
72+
async fn on_finish(&mut self) -> Result<()> {
73+
drop(self.meta_receiver.take());
74+
Ok(())
75+
}
6676
}

src/query/storages/fuse/src/pruning_pipeline/send_part_info_sink.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,9 @@ impl AsyncSink for SendPartInfoSink {
189189

190190
for info in info_ptr {
191191
if let Some(sender) = &self.sender {
192-
let _ = sender.send(Ok(info)).await;
192+
if let Err(_e) = sender.send(Ok(info)).await {
193+
break;
194+
}
193195
}
194196
}
195197

0 commit comments

Comments
 (0)