Skip to content

Commit 3672851

Browse files
committed
chore: fix unit test and clean code
1 parent c5d752b commit 3672851

File tree

2 files changed

+5
-55
lines changed

2 files changed

+5
-55
lines changed

src/query/service/tests/it/storages/fuse/pruning_pipeline.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
6262
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
6363
use opendal::Operator;
6464

65-
async fn apply_block_pruning(
65+
async fn apply_snapshot_pruning(
6666
table_snapshot: Arc<TableSnapshot>,
6767
schema: TableSchemaRef,
6868
push_down: &Option<PushDownInfo>,
@@ -102,12 +102,9 @@ async fn apply_block_pruning(
102102
GlobalIORuntime::instance().spawn(async move {
103103
// avoid block global io runtime
104104
let runtime = Runtime::with_worker_threads(2, None)?;
105-
106105
let join_handler = runtime.spawn(async move {
107-
let segment_pruned_result =
108-
fuse_pruner.clone().segment_pruning(segment_locs).await?;
109-
for segment in segment_pruned_result {
110-
let _ = segment_tx.send(Ok(segment)).await;
106+
for segment in segment_locs {
107+
let _ = segment_tx.send(segment).await;
111108
}
112109
Ok::<_, ErrorCode>(())
113110
});
@@ -140,7 +137,7 @@ async fn apply_block_pruning(
140137
}
141138

142139
#[tokio::test(flavor = "multi_thread")]
143-
async fn test_block_pruner() -> Result<()> {
140+
async fn test_snapshot_pruner() -> Result<()> {
144141
let fixture = TestFixture::setup().await?;
145142
let ctx = fixture.new_query_ctx().await?;
146143

@@ -320,7 +317,7 @@ async fn test_block_pruner() -> Result<()> {
320317

321318
for (id, (extra, expected_blocks, expected_rows)) in extras.into_iter().enumerate() {
322319
let cache_key = Some(format!("test_block_pruner_{}", id));
323-
let parts = apply_block_pruning(
320+
let parts = apply_snapshot_pruning(
324321
snapshot.clone(),
325322
table.get_table_info().schema(),
326323
&extra,

src/query/storages/fuse/src/pruning/fuse_pruner.rs

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -443,53 +443,6 @@ impl FusePruner {
443443
}
444444
}
445445

446-
// Temporarily using, will remove after finish pruning refactor.
447-
pub async fn segment_pruning(
448-
&self,
449-
mut segment_locs: Vec<SegmentLocation>,
450-
) -> Result<Vec<(SegmentLocation, Arc<CompactSegmentInfo>)>> {
451-
// Segment pruner.
452-
let segment_pruner =
453-
SegmentPruner::create(self.pruning_ctx.clone(), self.table_schema.clone())?;
454-
455-
let mut remain = segment_locs.len() % self.max_concurrency;
456-
let batch_size = segment_locs.len() / self.max_concurrency;
457-
let mut works = Vec::with_capacity(self.max_concurrency);
458-
while !segment_locs.is_empty() {
459-
let gap_size = std::cmp::min(1, remain);
460-
let batch_size = batch_size + gap_size;
461-
remain -= gap_size;
462-
463-
let mut batch = segment_locs.drain(0..batch_size).collect::<Vec<_>>();
464-
works.push(self.pruning_ctx.pruning_runtime.spawn({
465-
let segment_pruner = segment_pruner.clone();
466-
let pruning_ctx = self.pruning_ctx.clone();
467-
async move {
468-
// Build pruning tasks.
469-
if let Some(internal_column_pruner) = &pruning_ctx.internal_column_pruner {
470-
batch = batch
471-
.into_iter()
472-
.filter(|segment| {
473-
internal_column_pruner
474-
.should_keep(SEGMENT_NAME_COL_NAME, &segment.location.0)
475-
})
476-
.collect::<Vec<_>>();
477-
}
478-
let pruned_segments = segment_pruner.pruning(batch).await?;
479-
Result::<_>::Ok(pruned_segments)
480-
}
481-
}));
482-
}
483-
484-
let workers = futures::future::try_join_all(works).await?;
485-
let mut pruned_segments = vec![];
486-
for worker in workers {
487-
let res = worker?;
488-
pruned_segments.extend(res);
489-
}
490-
Ok(pruned_segments)
491-
}
492-
493446
fn extract_block_metas(
494447
segment_path: &str,
495448
segment: &CompactSegmentInfo,

0 commit comments

Comments
 (0)