From c5d752b3991895166ac5bbbeb135ae02ac1ffa2f Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Thu, 26 Dec 2024 19:12:39 +0800 Subject: [PATCH 1/2] refactor: integrate segment pruning into pipeline --- .../fuse/src/operations/read_partitions.rs | 29 +++++-- .../src/pruning_pipeline/lazy_segment_meta.rs | 43 ++++++++++ ...rce.rs => lazy_segment_receiver_source.rs} | 21 ++--- .../storages/fuse/src/pruning_pipeline/mod.rs | 8 +- .../segment_prune_transform.rs | 86 +++++++++++++++++++ 5 files changed, 163 insertions(+), 24 deletions(-) create mode 100644 src/query/storages/fuse/src/pruning_pipeline/lazy_segment_meta.rs rename src/query/storages/fuse/src/pruning_pipeline/{pruned_segment_receiver_source.rs => lazy_segment_receiver_source.rs} (70%) create mode 100644 src/query/storages/fuse/src/pruning_pipeline/segment_prune_transform.rs diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index b116798722aaa..bc3730e9a7b03 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -48,7 +48,6 @@ use databend_storages_common_pruner::BlockMetaIndex; use databend_storages_common_pruner::TopNPrunner; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ColumnStatistics; -use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::table::ChangeType; use log::info; use opendal::Operator; @@ -62,10 +61,12 @@ use crate::pruning::table_sample; use crate::pruning::BlockPruner; use crate::pruning::FusePruner; use crate::pruning::SegmentLocation; +use crate::pruning::SegmentPruner; use crate::pruning_pipeline::AsyncBlockPruneTransform; use crate::pruning_pipeline::ExtractSegmentTransform; -use crate::pruning_pipeline::PrunedSegmentReceiverSource; +use crate::pruning_pipeline::LazySegmentReceiverSource; use crate::pruning_pipeline::SampleBlockMetasTransform; +use crate::pruning_pipeline::SegmentPruneTransform; use crate::pruning_pipeline::SendPartInfoSink; use crate::pruning_pipeline::SendPartState; use crate::pruning_pipeline::SyncBlockPruneTransform; @@ -241,13 +242,11 @@ impl FuseTable { // We cannot use the runtime associated with the query to avoid increasing its lifetime. GlobalIORuntime::instance().spawn(async move { // avoid block global io runtime - let runtime = Runtime::with_worker_threads(2, Some("prune-seg".to_string()))?; + let runtime = Runtime::with_worker_threads(2, Some("prune-pipeline".to_string()))?; let join_handler = runtime.spawn(async move { - let segment_pruned_result = - pruner.clone().segment_pruning(lazy_init_segments).await?; - for segment in segment_pruned_result { + for segment in lazy_init_segments { // the sql may be killed or early stop, ignore the error - if let Err(_e) = segment_tx.send(Ok(segment)).await { + if let Err(_e) = segment_tx.send(segment).await { break; } } @@ -341,15 +340,27 @@ impl FuseTable { pruner: Arc, prune_pipeline: &mut Pipeline, ctx: Arc, - segment_rx: Receiver)>>, + segment_rx: Receiver, part_info_tx: Sender>, derterministic_cache_key: Option, ) -> Result<()> { let max_threads = ctx.get_settings().get_max_threads()? as usize; prune_pipeline.add_source( - |output| PrunedSegmentReceiverSource::create(ctx.clone(), segment_rx.clone(), output), + |output| LazySegmentReceiverSource::create(ctx.clone(), segment_rx.clone(), output), max_threads, )?; + let segment_pruner = + SegmentPruner::create(pruner.pruning_ctx.clone(), pruner.table_schema.clone())?; + + prune_pipeline.add_transform(|input, output| { + SegmentPruneTransform::create( + input, + output, + segment_pruner.clone(), + pruner.pruning_ctx.clone(), + ) + })?; + prune_pipeline .add_transform(|input, output| ExtractSegmentTransform::create(input, output, true))?; let sample_probability = table_sample(&pruner.push_down)?; diff --git a/src/query/storages/fuse/src/pruning_pipeline/lazy_segment_meta.rs b/src/query/storages/fuse/src/pruning_pipeline/lazy_segment_meta.rs new file mode 100644 index 0000000000000..5d40cdd5d9b70 --- /dev/null +++ b/src/query/storages/fuse/src/pruning_pipeline/lazy_segment_meta.rs @@ -0,0 +1,43 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; + +use databend_common_expression::local_block_meta_serde; +use databend_common_expression::BlockMetaInfo; +use databend_common_expression::BlockMetaInfoPtr; + +use crate::SegmentLocation; + +pub struct LazySegmentMeta { + pub segment_location: SegmentLocation, +} + +impl LazySegmentMeta { + pub fn create(segment_location: SegmentLocation) -> BlockMetaInfoPtr { + Box::new(LazySegmentMeta { segment_location }) + } +} + +impl Debug for LazySegmentMeta { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LazySegmentMeta").finish() + } +} + +local_block_meta_serde!(LazySegmentMeta); + +#[typetag::serde(name = "lazy_segment_meta")] +impl BlockMetaInfo for LazySegmentMeta {} diff --git a/src/query/storages/fuse/src/pruning_pipeline/pruned_segment_receiver_source.rs b/src/query/storages/fuse/src/pruning_pipeline/lazy_segment_receiver_source.rs similarity index 70% rename from src/query/storages/fuse/src/pruning_pipeline/pruned_segment_receiver_source.rs rename to src/query/storages/fuse/src/pruning_pipeline/lazy_segment_receiver_source.rs index 115274703b9be..9b8a4a1fd12a2 100644 --- a/src/query/storages/fuse/src/pruning_pipeline/pruned_segment_receiver_source.rs +++ b/src/query/storages/fuse/src/pruning_pipeline/lazy_segment_receiver_source.rs @@ -22,19 +22,18 @@ use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_sources::AsyncSource; use databend_common_pipeline_sources::AsyncSourcer; -use databend_storages_common_table_meta::meta::CompactSegmentInfo; -use crate::pruning_pipeline::pruned_segment_meta::PrunedSegmentMeta; +use crate::pruning_pipeline::LazySegmentMeta; use crate::SegmentLocation; -pub struct PrunedSegmentReceiverSource { - pub meta_receiver: Receiver)>>, +pub struct LazySegmentReceiverSource { + pub meta_receiver: Receiver, } -impl PrunedSegmentReceiverSource { +impl LazySegmentReceiverSource { pub fn create( ctx: Arc, - receiver: Receiver)>>, + receiver: Receiver, output_port: Arc, ) -> Result { AsyncSourcer::create(ctx, output_port, Self { @@ -44,20 +43,16 @@ impl PrunedSegmentReceiverSource { } #[async_trait::async_trait] -impl AsyncSource for PrunedSegmentReceiverSource { - const NAME: &'static str = "PrunedSegmentReceiverSource"; +impl AsyncSource for LazySegmentReceiverSource { + const NAME: &'static str = "LazySegmentReceiverSource"; const SKIP_EMPTY_DATA_BLOCK: bool = false; #[async_backtrace::framed] async fn generate(&mut self) -> Result> { match self.meta_receiver.recv().await { - Ok(Ok(segments)) => Ok(Some(DataBlock::empty_with_meta(PrunedSegmentMeta::create( + Ok(segments) => Ok(Some(DataBlock::empty_with_meta(LazySegmentMeta::create( segments, )))), - Ok(Err(e)) => Err( - // The error is occurred in pruning process - e, - ), Err(_) => { // The channel is closed, we should return None to stop generating Ok(None) diff --git a/src/query/storages/fuse/src/pruning_pipeline/mod.rs b/src/query/storages/fuse/src/pruning_pipeline/mod.rs index 858a4d17f400e..a2909df071f5d 100644 --- a/src/query/storages/fuse/src/pruning_pipeline/mod.rs +++ b/src/query/storages/fuse/src/pruning_pipeline/mod.rs @@ -16,17 +16,21 @@ mod async_block_prune_transform; mod block_metas_meta; mod block_prune_result_meta; mod extract_segment_transform; +mod lazy_segment_meta; +mod lazy_segment_receiver_source; mod pruned_segment_meta; -mod pruned_segment_receiver_source; mod sample_block_metas_transform; +mod segment_prune_transform; mod send_part_info_sink; mod sync_block_prune_transform; mod topn_prune_transform; pub use async_block_prune_transform::AsyncBlockPruneTransform; pub use extract_segment_transform::ExtractSegmentTransform; -pub use pruned_segment_receiver_source::PrunedSegmentReceiverSource; +pub use lazy_segment_meta::LazySegmentMeta; +pub use lazy_segment_receiver_source::LazySegmentReceiverSource; pub use sample_block_metas_transform::SampleBlockMetasTransform; +pub use segment_prune_transform::SegmentPruneTransform; pub use send_part_info_sink::SendPartInfoSink; pub use send_part_info_sink::SendPartState; pub use sync_block_prune_transform::SyncBlockPruneTransform; diff --git a/src/query/storages/fuse/src/pruning_pipeline/segment_prune_transform.rs b/src/query/storages/fuse/src/pruning_pipeline/segment_prune_transform.rs new file mode 100644 index 0000000000000..f4f0360e87ef6 --- /dev/null +++ b/src/query/storages/fuse/src/pruning_pipeline/segment_prune_transform.rs @@ -0,0 +1,86 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_expression::SEGMENT_NAME_COL_NAME; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_transforms::AsyncAccumulatingTransform; +use databend_common_pipeline_transforms::AsyncAccumulatingTransformer; + +use crate::pruning::PruningContext; +use crate::pruning::SegmentPruner; +use crate::pruning_pipeline::pruned_segment_meta::PrunedSegmentMeta; +use crate::pruning_pipeline::LazySegmentMeta; + +pub struct SegmentPruneTransform { + pub segment_pruner: Arc, + pub pruning_ctx: Arc, +} + +impl SegmentPruneTransform { + pub fn create( + input: Arc, + output: Arc, + segment_pruner: Arc, + pruning_context: Arc, + ) -> Result { + Ok(ProcessorPtr::create(AsyncAccumulatingTransformer::create( + input, + output, + SegmentPruneTransform { + segment_pruner, + pruning_ctx: pruning_context, + }, + ))) + } +} + +#[async_trait::async_trait] +impl AsyncAccumulatingTransform for SegmentPruneTransform { + const NAME: &'static str = "SegmentPruneTransform"; + + async fn transform(&mut self, mut data: DataBlock) -> Result> { + if let Some(ptr) = data.take_meta() { + if let Some(meta) = LazySegmentMeta::downcast_from(ptr) { + let location = meta.segment_location; + if let Some(pruner) = &self.pruning_ctx.internal_column_pruner { + if !pruner.should_keep(SEGMENT_NAME_COL_NAME, &location.location.0) { + return Ok(None); + } + } + let mut pruned_segments = self.segment_pruner.pruning(vec![location]).await?; + + if pruned_segments.is_empty() { + return Ok(None); + } + + debug_assert!(pruned_segments.len() == 1); + + return Ok(Some(DataBlock::empty_with_meta(PrunedSegmentMeta::create( + pruned_segments.pop().unwrap(), + )))); + } + } + Err(ErrorCode::Internal( + "Cannot downcast meta to LazySegmentMeta", + )) + } +} From 3672851d89cb1339a3d99f204624ebbd9eb7cef4 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Fri, 27 Dec 2024 11:26:08 +0800 Subject: [PATCH 2/2] chore: fix unit test and clean code --- .../it/storages/fuse/pruning_pipeline.rs | 13 ++--- .../storages/fuse/src/pruning/fuse_pruner.rs | 47 ------------------- 2 files changed, 5 insertions(+), 55 deletions(-) diff --git a/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs b/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs index e7a5a946e39a2..ee9e670ca56a9 100644 --- a/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs +++ b/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs @@ -62,7 +62,7 @@ use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID; use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION; use opendal::Operator; -async fn apply_block_pruning( +async fn apply_snapshot_pruning( table_snapshot: Arc, schema: TableSchemaRef, push_down: &Option, @@ -102,12 +102,9 @@ async fn apply_block_pruning( GlobalIORuntime::instance().spawn(async move { // avoid block global io runtime let runtime = Runtime::with_worker_threads(2, None)?; - let join_handler = runtime.spawn(async move { - let segment_pruned_result = - fuse_pruner.clone().segment_pruning(segment_locs).await?; - for segment in segment_pruned_result { - let _ = segment_tx.send(Ok(segment)).await; + for segment in segment_locs { + let _ = segment_tx.send(segment).await; } Ok::<_, ErrorCode>(()) }); @@ -140,7 +137,7 @@ async fn apply_block_pruning( } #[tokio::test(flavor = "multi_thread")] -async fn test_block_pruner() -> Result<()> { +async fn test_snapshot_pruner() -> Result<()> { let fixture = TestFixture::setup().await?; let ctx = fixture.new_query_ctx().await?; @@ -320,7 +317,7 @@ async fn test_block_pruner() -> Result<()> { for (id, (extra, expected_blocks, expected_rows)) in extras.into_iter().enumerate() { let cache_key = Some(format!("test_block_pruner_{}", id)); - let parts = apply_block_pruning( + let parts = apply_snapshot_pruning( snapshot.clone(), table.get_table_info().schema(), &extra, diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index 3e736d606e74e..1e52bc3578c47 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -443,53 +443,6 @@ impl FusePruner { } } - // Temporarily using, will remove after finish pruning refactor. - pub async fn segment_pruning( - &self, - mut segment_locs: Vec, - ) -> Result)>> { - // Segment pruner. - let segment_pruner = - SegmentPruner::create(self.pruning_ctx.clone(), self.table_schema.clone())?; - - let mut remain = segment_locs.len() % self.max_concurrency; - let batch_size = segment_locs.len() / self.max_concurrency; - let mut works = Vec::with_capacity(self.max_concurrency); - while !segment_locs.is_empty() { - let gap_size = std::cmp::min(1, remain); - let batch_size = batch_size + gap_size; - remain -= gap_size; - - let mut batch = segment_locs.drain(0..batch_size).collect::>(); - works.push(self.pruning_ctx.pruning_runtime.spawn({ - let segment_pruner = segment_pruner.clone(); - let pruning_ctx = self.pruning_ctx.clone(); - async move { - // Build pruning tasks. - if let Some(internal_column_pruner) = &pruning_ctx.internal_column_pruner { - batch = batch - .into_iter() - .filter(|segment| { - internal_column_pruner - .should_keep(SEGMENT_NAME_COL_NAME, &segment.location.0) - }) - .collect::>(); - } - let pruned_segments = segment_pruner.pruning(batch).await?; - Result::<_>::Ok(pruned_segments) - } - })); - } - - let workers = futures::future::try_join_all(works).await?; - let mut pruned_segments = vec![]; - for worker in workers { - let res = worker?; - pruned_segments.extend(res); - } - Ok(pruned_segments) - } - fn extract_block_metas( segment_path: &str, segment: &CompactSegmentInfo,