Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions src/query/service/tests/it/storages/fuse/pruning_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use opendal::Operator;

async fn apply_block_pruning(
async fn apply_snapshot_pruning(
table_snapshot: Arc<TableSnapshot>,
schema: TableSchemaRef,
push_down: &Option<PushDownInfo>,
Expand Down Expand Up @@ -102,12 +102,9 @@ async fn apply_block_pruning(
GlobalIORuntime::instance().spawn(async move {
// avoid block global io runtime
let runtime = Runtime::with_worker_threads(2, None)?;

let join_handler = runtime.spawn(async move {
let segment_pruned_result =
fuse_pruner.clone().segment_pruning(segment_locs).await?;
for segment in segment_pruned_result {
let _ = segment_tx.send(Ok(segment)).await;
for segment in segment_locs {
let _ = segment_tx.send(segment).await;
}
Ok::<_, ErrorCode>(())
});
Expand Down Expand Up @@ -140,7 +137,7 @@ async fn apply_block_pruning(
}

#[tokio::test(flavor = "multi_thread")]
async fn test_block_pruner() -> Result<()> {
async fn test_snapshot_pruner() -> Result<()> {
let fixture = TestFixture::setup().await?;
let ctx = fixture.new_query_ctx().await?;

Expand Down Expand Up @@ -320,7 +317,7 @@ async fn test_block_pruner() -> Result<()> {

for (id, (extra, expected_blocks, expected_rows)) in extras.into_iter().enumerate() {
let cache_key = Some(format!("test_block_pruner_{}", id));
let parts = apply_block_pruning(
let parts = apply_snapshot_pruning(
snapshot.clone(),
table.get_table_info().schema(),
&extra,
Expand Down
29 changes: 20 additions & 9 deletions src/query/storages/fuse/src/operations/read_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use databend_storages_common_pruner::BlockMetaIndex;
use databend_storages_common_pruner::TopNPrunner;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::ColumnStatistics;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::table::ChangeType;
use log::info;
use opendal::Operator;
Expand All @@ -62,10 +61,12 @@ use crate::pruning::table_sample;
use crate::pruning::BlockPruner;
use crate::pruning::FusePruner;
use crate::pruning::SegmentLocation;
use crate::pruning::SegmentPruner;
use crate::pruning_pipeline::AsyncBlockPruneTransform;
use crate::pruning_pipeline::ExtractSegmentTransform;
use crate::pruning_pipeline::PrunedSegmentReceiverSource;
use crate::pruning_pipeline::LazySegmentReceiverSource;
use crate::pruning_pipeline::SampleBlockMetasTransform;
use crate::pruning_pipeline::SegmentPruneTransform;
use crate::pruning_pipeline::SendPartInfoSink;
use crate::pruning_pipeline::SendPartState;
use crate::pruning_pipeline::SyncBlockPruneTransform;
Expand Down Expand Up @@ -241,13 +242,11 @@ impl FuseTable {
// We cannot use the runtime associated with the query to avoid increasing its lifetime.
GlobalIORuntime::instance().spawn(async move {
// avoid block global io runtime
let runtime = Runtime::with_worker_threads(2, Some("prune-seg".to_string()))?;
let runtime = Runtime::with_worker_threads(2, Some("prune-pipeline".to_string()))?;
let join_handler = runtime.spawn(async move {
let segment_pruned_result =
pruner.clone().segment_pruning(lazy_init_segments).await?;
for segment in segment_pruned_result {
for segment in lazy_init_segments {
// the sql may be killed or early stop, ignore the error
if let Err(_e) = segment_tx.send(Ok(segment)).await {
if let Err(_e) = segment_tx.send(segment).await {
break;
}
}
Expand Down Expand Up @@ -341,15 +340,27 @@ impl FuseTable {
pruner: Arc<FusePruner>,
prune_pipeline: &mut Pipeline,
ctx: Arc<dyn TableContext>,
segment_rx: Receiver<Result<(SegmentLocation, Arc<CompactSegmentInfo>)>>,
segment_rx: Receiver<SegmentLocation>,
part_info_tx: Sender<Result<PartInfoPtr>>,
derterministic_cache_key: Option<String>,
) -> Result<()> {
let max_threads = ctx.get_settings().get_max_threads()? as usize;
prune_pipeline.add_source(
|output| PrunedSegmentReceiverSource::create(ctx.clone(), segment_rx.clone(), output),
|output| LazySegmentReceiverSource::create(ctx.clone(), segment_rx.clone(), output),
max_threads,
)?;
let segment_pruner =
SegmentPruner::create(pruner.pruning_ctx.clone(), pruner.table_schema.clone())?;

prune_pipeline.add_transform(|input, output| {
SegmentPruneTransform::create(
input,
output,
segment_pruner.clone(),
pruner.pruning_ctx.clone(),
)
})?;

prune_pipeline
.add_transform(|input, output| ExtractSegmentTransform::create(input, output, true))?;
let sample_probability = table_sample(&pruner.push_down)?;
Expand Down
47 changes: 0 additions & 47 deletions src/query/storages/fuse/src/pruning/fuse_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,53 +443,6 @@ impl FusePruner {
}
}

// Temporarily using, will remove after finish pruning refactor.
pub async fn segment_pruning(
&self,
mut segment_locs: Vec<SegmentLocation>,
) -> Result<Vec<(SegmentLocation, Arc<CompactSegmentInfo>)>> {
// Segment pruner.
let segment_pruner =
SegmentPruner::create(self.pruning_ctx.clone(), self.table_schema.clone())?;

let mut remain = segment_locs.len() % self.max_concurrency;
let batch_size = segment_locs.len() / self.max_concurrency;
let mut works = Vec::with_capacity(self.max_concurrency);
while !segment_locs.is_empty() {
let gap_size = std::cmp::min(1, remain);
let batch_size = batch_size + gap_size;
remain -= gap_size;

let mut batch = segment_locs.drain(0..batch_size).collect::<Vec<_>>();
works.push(self.pruning_ctx.pruning_runtime.spawn({
let segment_pruner = segment_pruner.clone();
let pruning_ctx = self.pruning_ctx.clone();
async move {
// Build pruning tasks.
if let Some(internal_column_pruner) = &pruning_ctx.internal_column_pruner {
batch = batch
.into_iter()
.filter(|segment| {
internal_column_pruner
.should_keep(SEGMENT_NAME_COL_NAME, &segment.location.0)
})
.collect::<Vec<_>>();
}
let pruned_segments = segment_pruner.pruning(batch).await?;
Result::<_>::Ok(pruned_segments)
}
}));
}

let workers = futures::future::try_join_all(works).await?;
let mut pruned_segments = vec![];
for worker in workers {
let res = worker?;
pruned_segments.extend(res);
}
Ok(pruned_segments)
}

fn extract_block_metas(
segment_path: &str,
segment: &CompactSegmentInfo,
Expand Down
43 changes: 43 additions & 0 deletions src/query/storages/fuse/src/pruning_pipeline/lazy_segment_meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Debug;
use std::fmt::Formatter;

use databend_common_expression::local_block_meta_serde;
use databend_common_expression::BlockMetaInfo;
use databend_common_expression::BlockMetaInfoPtr;

use crate::SegmentLocation;

pub struct LazySegmentMeta {
pub segment_location: SegmentLocation,
}

impl LazySegmentMeta {
pub fn create(segment_location: SegmentLocation) -> BlockMetaInfoPtr {
Box::new(LazySegmentMeta { segment_location })
}
}

impl Debug for LazySegmentMeta {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LazySegmentMeta").finish()
}
}

local_block_meta_serde!(LazySegmentMeta);

#[typetag::serde(name = "lazy_segment_meta")]
impl BlockMetaInfo for LazySegmentMeta {}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@ use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_sources::AsyncSource;
use databend_common_pipeline_sources::AsyncSourcer;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;

use crate::pruning_pipeline::pruned_segment_meta::PrunedSegmentMeta;
use crate::pruning_pipeline::LazySegmentMeta;
use crate::SegmentLocation;

pub struct PrunedSegmentReceiverSource {
pub meta_receiver: Receiver<Result<(SegmentLocation, Arc<CompactSegmentInfo>)>>,
pub struct LazySegmentReceiverSource {
pub meta_receiver: Receiver<SegmentLocation>,
}

impl PrunedSegmentReceiverSource {
impl LazySegmentReceiverSource {
pub fn create(
ctx: Arc<dyn TableContext>,
receiver: Receiver<Result<(SegmentLocation, Arc<CompactSegmentInfo>)>>,
receiver: Receiver<SegmentLocation>,
output_port: Arc<OutputPort>,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(ctx, output_port, Self {
Expand All @@ -44,20 +43,16 @@ impl PrunedSegmentReceiverSource {
}

#[async_trait::async_trait]
impl AsyncSource for PrunedSegmentReceiverSource {
const NAME: &'static str = "PrunedSegmentReceiverSource";
impl AsyncSource for LazySegmentReceiverSource {
const NAME: &'static str = "LazySegmentReceiverSource";
const SKIP_EMPTY_DATA_BLOCK: bool = false;

#[async_backtrace::framed]
async fn generate(&mut self) -> Result<Option<DataBlock>> {
match self.meta_receiver.recv().await {
Ok(Ok(segments)) => Ok(Some(DataBlock::empty_with_meta(PrunedSegmentMeta::create(
Ok(segments) => Ok(Some(DataBlock::empty_with_meta(LazySegmentMeta::create(
segments,
)))),
Ok(Err(e)) => Err(
// The error is occurred in pruning process
e,
),
Err(_) => {
// The channel is closed, we should return None to stop generating
Ok(None)
Expand Down
8 changes: 6 additions & 2 deletions src/query/storages/fuse/src/pruning_pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ mod async_block_prune_transform;
mod block_metas_meta;
mod block_prune_result_meta;
mod extract_segment_transform;
mod lazy_segment_meta;
mod lazy_segment_receiver_source;
mod pruned_segment_meta;
mod pruned_segment_receiver_source;
mod sample_block_metas_transform;
mod segment_prune_transform;
mod send_part_info_sink;
mod sync_block_prune_transform;
mod topn_prune_transform;

pub use async_block_prune_transform::AsyncBlockPruneTransform;
pub use extract_segment_transform::ExtractSegmentTransform;
pub use pruned_segment_receiver_source::PrunedSegmentReceiverSource;
pub use lazy_segment_meta::LazySegmentMeta;
pub use lazy_segment_receiver_source::LazySegmentReceiverSource;
pub use sample_block_metas_transform::SampleBlockMetasTransform;
pub use segment_prune_transform::SegmentPruneTransform;
pub use send_part_info_sink::SendPartInfoSink;
pub use send_part_info_sink::SendPartState;
pub use sync_block_prune_transform::SyncBlockPruneTransform;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::DataBlock;
use databend_common_expression::SEGMENT_NAME_COL_NAME;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_transforms::AsyncAccumulatingTransform;
use databend_common_pipeline_transforms::AsyncAccumulatingTransformer;

use crate::pruning::PruningContext;
use crate::pruning::SegmentPruner;
use crate::pruning_pipeline::pruned_segment_meta::PrunedSegmentMeta;
use crate::pruning_pipeline::LazySegmentMeta;

pub struct SegmentPruneTransform {
pub segment_pruner: Arc<SegmentPruner>,
pub pruning_ctx: Arc<PruningContext>,
}

impl SegmentPruneTransform {
pub fn create(
input: Arc<InputPort>,
output: Arc<OutputPort>,
segment_pruner: Arc<SegmentPruner>,
pruning_context: Arc<PruningContext>,
) -> Result<ProcessorPtr> {
Ok(ProcessorPtr::create(AsyncAccumulatingTransformer::create(
input,
output,
SegmentPruneTransform {
segment_pruner,
pruning_ctx: pruning_context,
},
)))
}
}

#[async_trait::async_trait]
impl AsyncAccumulatingTransform for SegmentPruneTransform {
const NAME: &'static str = "SegmentPruneTransform";

async fn transform(&mut self, mut data: DataBlock) -> Result<Option<DataBlock>> {
if let Some(ptr) = data.take_meta() {
if let Some(meta) = LazySegmentMeta::downcast_from(ptr) {
let location = meta.segment_location;
if let Some(pruner) = &self.pruning_ctx.internal_column_pruner {
if !pruner.should_keep(SEGMENT_NAME_COL_NAME, &location.location.0) {
return Ok(None);
}
}
let mut pruned_segments = self.segment_pruner.pruning(vec![location]).await?;

if pruned_segments.is_empty() {
return Ok(None);
}

debug_assert!(pruned_segments.len() == 1);

return Ok(Some(DataBlock::empty_with_meta(PrunedSegmentMeta::create(
pruned_segments.pop().unwrap(),
))));
}
}
Err(ErrorCode::Internal(
"Cannot downcast meta to LazySegmentMeta",
))
}
}
Loading