@@ -43,6 +43,9 @@ use databend_common_metrics::storage::*;
4343use databend_common_sql:: evaluator:: BlockOperator ;
4444use databend_common_sql:: executor:: physical_plans:: OnConflictField ;
4545use databend_common_sql:: StreamContext ;
46+ use databend_storages_common_cache:: BlockMetaCache ;
47+ use databend_storages_common_cache:: CacheAccessor ;
48+ use databend_storages_common_cache:: CacheManager ;
4649use databend_storages_common_cache:: LoadParams ;
4750use databend_storages_common_index:: filters:: Filter ;
4851use databend_storages_common_index:: filters:: Xor8Filter ;
@@ -100,6 +103,8 @@ struct AggregationContext {
100103 io_request_semaphore : Arc < Semaphore > ,
101104 // generate stream columns if necessary
102105 stream_ctx : Option < StreamContext > ,
106+
107+ block_meta_cache : Option < BlockMetaCache > ,
103108}
104109
105110// Apply MergeIntoOperations to segments
@@ -209,6 +214,7 @@ impl ReplaceIntoOperationAggregator {
209214 block_builder,
210215 io_request_semaphore,
211216 stream_ctx,
217+ block_meta_cache : CacheManager :: instance ( ) . get_block_meta_cache ( ) ,
212218 } ) ,
213219 } )
214220 }
@@ -291,6 +297,8 @@ impl ReplaceIntoOperationAggregator {
291297impl ReplaceIntoOperationAggregator {
292298 #[ async_backtrace:: framed]
293299 pub async fn apply ( & mut self ) -> Result < Option < MutationLogs > > {
300+ let block_meta_cache = & self . aggregation_ctx . block_meta_cache ;
301+
294302 metrics_inc_replace_number_apply_deletion ( ) ;
295303
296304 // track number of segments and blocks after pruning (per merge action application)
@@ -317,7 +325,7 @@ impl ReplaceIntoOperationAggregator {
317325 let mut mutation_log_handlers = Vec :: new ( ) ;
318326 let mut num_rows_mutated = 0 ;
319327 for ( segment_idx, block_deletion) in self . deletion_accumulator . deletions . drain ( ) {
320- let ( path , ver) = self
328+ let ( segment_path , ver) = self
321329 . aggregation_ctx
322330 . segment_locations
323331 . get ( & segment_idx)
@@ -329,19 +337,41 @@ impl ReplaceIntoOperationAggregator {
329337 } ) ?;
330338
331339 let load_param = LoadParams {
332- location : path . clone ( ) ,
340+ location : segment_path . clone ( ) ,
333341 len_hint : None ,
334342 ver : * ver,
335343 put_cache : true ,
336344 } ;
337345
338- let compact_segment_info = aggregation_ctx . segment_reader . read ( & load_param ) . await ? ;
339- let segment_info : SegmentInfo = compact_segment_info . try_into ( ) ? ;
346+ // Retain SegmentInfo to avoid repeatedly extracting it from CompactSegmentInfo later.
347+ let mut opt_segment_info : Option < SegmentInfo > = None ;
340348
341349 for ( block_index, keys) in block_deletion {
350+ let block_cache_key = format ! ( "{segment_path}-{block_index}" ) ;
351+ let block_meta = match block_meta_cache. get ( & block_cache_key) {
352+ Some ( block_meta) => block_meta,
353+ None => {
354+ let block_meta = if let Some ( segment_info) = & opt_segment_info {
355+ segment_info. blocks [ block_index] . clone ( )
356+ } else {
357+ let compact_segment_info =
358+ aggregation_ctx. segment_reader . read ( & load_param) . await ?;
359+ let segment_info: SegmentInfo = compact_segment_info. try_into ( ) ?;
360+ let block_meta = segment_info. blocks [ block_index] . clone ( ) ;
361+ opt_segment_info = Some ( segment_info) ;
362+ block_meta
363+ } ;
364+ // A query node typically processes only a subset of the BlockMeta in a given segment.
365+ // Therefore, even though all BlockMeta of a segment are available here, not all are populated into the cache.
366+ block_meta_cache. insert ( block_cache_key, block_meta. as_ref ( ) . clone ( ) ) ;
367+ block_meta
368+ }
369+ } ;
370+
342371 let permit =
343372 acquire_task_permit ( aggregation_ctx. io_request_semaphore . clone ( ) ) . await ?;
344- let block_meta = segment_info. blocks [ block_index] . clone ( ) ;
373+
374+ // let block_meta = segment_info.blocks[block_index].clone();
345375 let aggregation_ctx = aggregation_ctx. clone ( ) ;
346376 num_rows_mutated += block_meta. row_count ;
347377 // self.aggregation_ctx.
@@ -604,7 +634,7 @@ impl AggregationContext {
604634 if let Some ( stats) = column_stats {
605635 let max = stats. max ( ) ;
606636 let min = stats. min ( ) ;
607- std:: cmp:: min ( key_max, max) >= std:: cmp:: max ( key_min, min)
637+ std:: cmp:: min ( key_max, max) >= std:: cmp:: max ( key_min, min)
608638 || // coincide overlap
609639 ( max == key_max && min == key_min)
610640 } else {
@@ -630,22 +660,22 @@ impl AggregationContext {
630660 let reader = reader. clone ( ) ;
631661 GlobalIORuntime :: instance ( )
632662 . spawn ( async move {
633- let column_chunks = merged_io_read_result. columns_chunks ( ) ?;
634- reader. deserialize_chunks (
635- block_meta_ptr. location . 0 . as_str ( ) ,
636- block_meta_ptr. row_count as usize ,
637- & block_meta_ptr. compression ,
638- & block_meta_ptr. col_metas ,
639- column_chunks,
640- & storage_format,
641- )
642- } )
663+ let column_chunks = merged_io_read_result. columns_chunks ( ) ?;
664+ reader. deserialize_chunks (
665+ block_meta_ptr. location . 0 . as_str ( ) ,
666+ block_meta_ptr. row_count as usize ,
667+ & block_meta_ptr. compression ,
668+ & block_meta_ptr. col_metas ,
669+ column_chunks,
670+ & storage_format,
671+ )
672+ } )
643673 . await
644674 . map_err ( |e| {
645675 ErrorCode :: Internal (
646676 "unexpected, failed to join aggregation context read block tasks for replace into." ,
647677 )
648- . add_message_back ( e. to_string ( ) )
678+ . add_message_back ( e. to_string ( ) )
649679 } ) ?
650680 }
651681
0 commit comments