Skip to content

Commit c5d752b

Browse files
committed
refactor: integrate segment pruning into pipeline
1 parent b8ee29b commit c5d752b

File tree

5 files changed

+163
-24
lines changed

5 files changed

+163
-24
lines changed

src/query/storages/fuse/src/operations/read_partitions.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ use databend_storages_common_pruner::BlockMetaIndex;
4848
use databend_storages_common_pruner::TopNPrunner;
4949
use databend_storages_common_table_meta::meta::BlockMeta;
5050
use databend_storages_common_table_meta::meta::ColumnStatistics;
51-
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
5251
use databend_storages_common_table_meta::table::ChangeType;
5352
use log::info;
5453
use opendal::Operator;
@@ -62,10 +61,12 @@ use crate::pruning::table_sample;
6261
use crate::pruning::BlockPruner;
6362
use crate::pruning::FusePruner;
6463
use crate::pruning::SegmentLocation;
64+
use crate::pruning::SegmentPruner;
6565
use crate::pruning_pipeline::AsyncBlockPruneTransform;
6666
use crate::pruning_pipeline::ExtractSegmentTransform;
67-
use crate::pruning_pipeline::PrunedSegmentReceiverSource;
67+
use crate::pruning_pipeline::LazySegmentReceiverSource;
6868
use crate::pruning_pipeline::SampleBlockMetasTransform;
69+
use crate::pruning_pipeline::SegmentPruneTransform;
6970
use crate::pruning_pipeline::SendPartInfoSink;
7071
use crate::pruning_pipeline::SendPartState;
7172
use crate::pruning_pipeline::SyncBlockPruneTransform;
@@ -241,13 +242,11 @@ impl FuseTable {
241242
// We cannot use the runtime associated with the query to avoid increasing its lifetime.
242243
GlobalIORuntime::instance().spawn(async move {
243244
// avoid block global io runtime
244-
let runtime = Runtime::with_worker_threads(2, Some("prune-seg".to_string()))?;
245+
let runtime = Runtime::with_worker_threads(2, Some("prune-pipeline".to_string()))?;
245246
let join_handler = runtime.spawn(async move {
246-
let segment_pruned_result =
247-
pruner.clone().segment_pruning(lazy_init_segments).await?;
248-
for segment in segment_pruned_result {
247+
for segment in lazy_init_segments {
249248
// the sql may be killed or early stop, ignore the error
250-
if let Err(_e) = segment_tx.send(Ok(segment)).await {
249+
if let Err(_e) = segment_tx.send(segment).await {
251250
break;
252251
}
253252
}
@@ -341,15 +340,27 @@ impl FuseTable {
341340
pruner: Arc<FusePruner>,
342341
prune_pipeline: &mut Pipeline,
343342
ctx: Arc<dyn TableContext>,
344-
segment_rx: Receiver<Result<(SegmentLocation, Arc<CompactSegmentInfo>)>>,
343+
segment_rx: Receiver<SegmentLocation>,
345344
part_info_tx: Sender<Result<PartInfoPtr>>,
346345
derterministic_cache_key: Option<String>,
347346
) -> Result<()> {
348347
let max_threads = ctx.get_settings().get_max_threads()? as usize;
349348
prune_pipeline.add_source(
350-
|output| PrunedSegmentReceiverSource::create(ctx.clone(), segment_rx.clone(), output),
349+
|output| LazySegmentReceiverSource::create(ctx.clone(), segment_rx.clone(), output),
351350
max_threads,
352351
)?;
352+
let segment_pruner =
353+
SegmentPruner::create(pruner.pruning_ctx.clone(), pruner.table_schema.clone())?;
354+
355+
prune_pipeline.add_transform(|input, output| {
356+
SegmentPruneTransform::create(
357+
input,
358+
output,
359+
segment_pruner.clone(),
360+
pruner.pruning_ctx.clone(),
361+
)
362+
})?;
363+
353364
prune_pipeline
354365
.add_transform(|input, output| ExtractSegmentTransform::create(input, output, true))?;
355366
let sample_probability = table_sample(&pruner.push_down)?;
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt::Debug;
16+
use std::fmt::Formatter;
17+
18+
use databend_common_expression::local_block_meta_serde;
19+
use databend_common_expression::BlockMetaInfo;
20+
use databend_common_expression::BlockMetaInfoPtr;
21+
22+
use crate::SegmentLocation;
23+
24+
pub struct LazySegmentMeta {
25+
pub segment_location: SegmentLocation,
26+
}
27+
28+
impl LazySegmentMeta {
29+
pub fn create(segment_location: SegmentLocation) -> BlockMetaInfoPtr {
30+
Box::new(LazySegmentMeta { segment_location })
31+
}
32+
}
33+
34+
impl Debug for LazySegmentMeta {
35+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36+
f.debug_struct("LazySegmentMeta").finish()
37+
}
38+
}
39+
40+
local_block_meta_serde!(LazySegmentMeta);
41+
42+
#[typetag::serde(name = "lazy_segment_meta")]
43+
impl BlockMetaInfo for LazySegmentMeta {}

src/query/storages/fuse/src/pruning_pipeline/pruned_segment_receiver_source.rs renamed to src/query/storages/fuse/src/pruning_pipeline/lazy_segment_receiver_source.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,18 @@ use databend_common_pipeline_core::processors::OutputPort;
2222
use databend_common_pipeline_core::processors::ProcessorPtr;
2323
use databend_common_pipeline_sources::AsyncSource;
2424
use databend_common_pipeline_sources::AsyncSourcer;
25-
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
2625

27-
use crate::pruning_pipeline::pruned_segment_meta::PrunedSegmentMeta;
26+
use crate::pruning_pipeline::LazySegmentMeta;
2827
use crate::SegmentLocation;
2928

30-
pub struct PrunedSegmentReceiverSource {
31-
pub meta_receiver: Receiver<Result<(SegmentLocation, Arc<CompactSegmentInfo>)>>,
29+
pub struct LazySegmentReceiverSource {
30+
pub meta_receiver: Receiver<SegmentLocation>,
3231
}
3332

34-
impl PrunedSegmentReceiverSource {
33+
impl LazySegmentReceiverSource {
3534
pub fn create(
3635
ctx: Arc<dyn TableContext>,
37-
receiver: Receiver<Result<(SegmentLocation, Arc<CompactSegmentInfo>)>>,
36+
receiver: Receiver<SegmentLocation>,
3837
output_port: Arc<OutputPort>,
3938
) -> Result<ProcessorPtr> {
4039
AsyncSourcer::create(ctx, output_port, Self {
@@ -44,20 +43,16 @@ impl PrunedSegmentReceiverSource {
4443
}
4544

4645
#[async_trait::async_trait]
47-
impl AsyncSource for PrunedSegmentReceiverSource {
48-
const NAME: &'static str = "PrunedSegmentReceiverSource";
46+
impl AsyncSource for LazySegmentReceiverSource {
47+
const NAME: &'static str = "LazySegmentReceiverSource";
4948
const SKIP_EMPTY_DATA_BLOCK: bool = false;
5049

5150
#[async_backtrace::framed]
5251
async fn generate(&mut self) -> Result<Option<DataBlock>> {
5352
match self.meta_receiver.recv().await {
54-
Ok(Ok(segments)) => Ok(Some(DataBlock::empty_with_meta(PrunedSegmentMeta::create(
53+
Ok(segments) => Ok(Some(DataBlock::empty_with_meta(LazySegmentMeta::create(
5554
segments,
5655
)))),
57-
Ok(Err(e)) => Err(
58-
// The error is occurred in pruning process
59-
e,
60-
),
6156
Err(_) => {
6257
// The channel is closed, we should return None to stop generating
6358
Ok(None)

src/query/storages/fuse/src/pruning_pipeline/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@ mod async_block_prune_transform;
1616
mod block_metas_meta;
1717
mod block_prune_result_meta;
1818
mod extract_segment_transform;
19+
mod lazy_segment_meta;
20+
mod lazy_segment_receiver_source;
1921
mod pruned_segment_meta;
20-
mod pruned_segment_receiver_source;
2122
mod sample_block_metas_transform;
23+
mod segment_prune_transform;
2224
mod send_part_info_sink;
2325
mod sync_block_prune_transform;
2426
mod topn_prune_transform;
2527

2628
pub use async_block_prune_transform::AsyncBlockPruneTransform;
2729
pub use extract_segment_transform::ExtractSegmentTransform;
28-
pub use pruned_segment_receiver_source::PrunedSegmentReceiverSource;
30+
pub use lazy_segment_meta::LazySegmentMeta;
31+
pub use lazy_segment_receiver_source::LazySegmentReceiverSource;
2932
pub use sample_block_metas_transform::SampleBlockMetasTransform;
33+
pub use segment_prune_transform::SegmentPruneTransform;
3034
pub use send_part_info_sink::SendPartInfoSink;
3135
pub use send_part_info_sink::SendPartState;
3236
pub use sync_block_prune_transform::SyncBlockPruneTransform;
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_exception::ErrorCode;
18+
use databend_common_exception::Result;
19+
use databend_common_expression::BlockMetaInfoDowncast;
20+
use databend_common_expression::DataBlock;
21+
use databend_common_expression::SEGMENT_NAME_COL_NAME;
22+
use databend_common_pipeline_core::processors::InputPort;
23+
use databend_common_pipeline_core::processors::OutputPort;
24+
use databend_common_pipeline_core::processors::ProcessorPtr;
25+
use databend_common_pipeline_transforms::AsyncAccumulatingTransform;
26+
use databend_common_pipeline_transforms::AsyncAccumulatingTransformer;
27+
28+
use crate::pruning::PruningContext;
29+
use crate::pruning::SegmentPruner;
30+
use crate::pruning_pipeline::pruned_segment_meta::PrunedSegmentMeta;
31+
use crate::pruning_pipeline::LazySegmentMeta;
32+
33+
pub struct SegmentPruneTransform {
34+
pub segment_pruner: Arc<SegmentPruner>,
35+
pub pruning_ctx: Arc<PruningContext>,
36+
}
37+
38+
impl SegmentPruneTransform {
39+
pub fn create(
40+
input: Arc<InputPort>,
41+
output: Arc<OutputPort>,
42+
segment_pruner: Arc<SegmentPruner>,
43+
pruning_context: Arc<PruningContext>,
44+
) -> Result<ProcessorPtr> {
45+
Ok(ProcessorPtr::create(AsyncAccumulatingTransformer::create(
46+
input,
47+
output,
48+
SegmentPruneTransform {
49+
segment_pruner,
50+
pruning_ctx: pruning_context,
51+
},
52+
)))
53+
}
54+
}
55+
56+
#[async_trait::async_trait]
57+
impl AsyncAccumulatingTransform for SegmentPruneTransform {
58+
const NAME: &'static str = "SegmentPruneTransform";
59+
60+
async fn transform(&mut self, mut data: DataBlock) -> Result<Option<DataBlock>> {
61+
if let Some(ptr) = data.take_meta() {
62+
if let Some(meta) = LazySegmentMeta::downcast_from(ptr) {
63+
let location = meta.segment_location;
64+
if let Some(pruner) = &self.pruning_ctx.internal_column_pruner {
65+
if !pruner.should_keep(SEGMENT_NAME_COL_NAME, &location.location.0) {
66+
return Ok(None);
67+
}
68+
}
69+
let mut pruned_segments = self.segment_pruner.pruning(vec![location]).await?;
70+
71+
if pruned_segments.is_empty() {
72+
return Ok(None);
73+
}
74+
75+
debug_assert!(pruned_segments.len() == 1);
76+
77+
return Ok(Some(DataBlock::empty_with_meta(PrunedSegmentMeta::create(
78+
pruned_segments.pop().unwrap(),
79+
))));
80+
}
81+
}
82+
Err(ErrorCode::Internal(
83+
"Cannot downcast meta to LazySegmentMeta",
84+
))
85+
}
86+
}

0 commit comments

Comments
 (0)