@@ -20,11 +20,14 @@ use databend_common_catalog::plan::ReclusterInfoSideCar;
2020use databend_common_catalog:: table:: Table ;
2121use databend_common_catalog:: table_context:: TableContext ;
2222use databend_common_exception:: Result ;
23- use databend_common_expression:: BlockThresholds ;
23+ use databend_common_io:: constants:: DEFAULT_BLOCK_BUFFER_SIZE ;
24+ use databend_common_io:: constants:: DEFAULT_BLOCK_PER_SEGMENT ;
2425use databend_common_storages_fuse:: pruning:: create_segment_location_vector;
2526use databend_common_storages_fuse:: statistics:: reducers:: merge_statistics_mut;
2627use databend_common_storages_fuse:: FuseTable ;
2728use databend_common_storages_fuse:: SegmentLocation ;
29+ use databend_common_storages_fuse:: FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD ;
30+ use databend_common_storages_fuse:: FUSE_OPT_KEY_BLOCK_PER_SEGMENT ;
2831use databend_enterprise_hilbert_clustering:: HilbertClusteringHandler ;
2932use databend_enterprise_hilbert_clustering:: HilbertClusteringHandlerWrapper ;
3033use databend_storages_common_table_meta:: meta:: ClusterStatistics ;
@@ -53,26 +56,26 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler {
5356 return Ok ( None ) ;
5457 } ;
5558
56- let block_thresholds = fuse_table . get_block_thresholds ( ) ;
57- let thresholds = BlockThresholds {
58- max_rows_per_block : block_thresholds . block_per_segment
59- * block_thresholds . max_rows_per_block ,
60- min_rows_per_block : block_thresholds . block_per_segment
61- * block_thresholds . min_rows_per_block ,
62- max_bytes_per_block : block_thresholds . block_per_segment
63- * block_thresholds . max_bytes_per_block ,
64- max_bytes_per_file : block_thresholds . block_per_segment
65- * block_thresholds . max_bytes_per_file ,
66- block_per_segment : block_thresholds . block_per_segment ,
67- } ;
59+ let block_per_seg =
60+ fuse_table . get_option ( FUSE_OPT_KEY_BLOCK_PER_SEGMENT , DEFAULT_BLOCK_PER_SEGMENT ) ;
61+ let hilbert_clustering_min_bytes =
62+ ctx . get_settings ( ) . get_hilbert_clustering_min_bytes ( ) ? as usize ;
63+ let max_bytes_per_block = fuse_table . get_option (
64+ FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD ,
65+ DEFAULT_BLOCK_BUFFER_SIZE ,
66+ ) ;
67+ let hilbert_min_bytes = std :: cmp :: max (
68+ hilbert_clustering_min_bytes ,
69+ max_bytes_per_block * block_per_seg ,
70+ ) ;
6871 let segment_locations = snapshot. segments . clone ( ) ;
6972 let segment_locations = create_segment_location_vector ( segment_locations, None ) ;
7073
7174 let max_threads = ctx. get_settings ( ) . get_max_threads ( ) ? as usize ;
7275 let chunk_size = max_threads * 4 ;
7376 let mut checker = ReclusterChecker :: new (
7477 cluster_key_id,
75- thresholds ,
78+ hilbert_min_bytes ,
7679 push_downs. as_ref ( ) . is_none_or ( |v| v. filters . is_none ( ) ) ,
7780 ) ;
7881 ' FOR : for chunk in segment_locations. chunks ( chunk_size) {
@@ -99,12 +102,6 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler {
99102 return Ok ( None ) ;
100103 }
101104
102- let rows_per_block =
103- block_thresholds. calc_rows_per_block ( checker. total_size , checker. total_rows , 0 ) as u64 ;
104- let block_size = ctx. get_settings ( ) . get_max_block_size ( ) ?;
105- ctx. get_settings ( )
106- . set_max_block_size ( rows_per_block. min ( block_size) ) ?;
107-
108105 let mut removed_statistics = Statistics :: default ( ) ;
109106 let mut removed_segment_indexes = Vec :: with_capacity ( target_segments. len ( ) ) ;
110107 for ( segment_loc, segment) in target_segments {
@@ -137,82 +134,60 @@ impl RealHilbertClusteringHandler {
137134
138135struct ReclusterChecker {
139136 segments : Vec < ( SegmentLocation , Arc < CompactSegmentInfo > ) > ,
137+ last_segment : Option < ( SegmentLocation , Arc < CompactSegmentInfo > ) > ,
140138 default_cluster_id : u32 ,
141- thresholds : BlockThresholds ,
142139
143- total_rows : usize ,
144- total_size : usize ,
140+ hilbert_min_bytes : usize ,
141+ total_bytes : usize ,
145142
146143 finished : bool ,
147144 // Whether the target segments is at the head of snapshot.
148145 head_of_snapshot : bool ,
149146}
150147
151148impl ReclusterChecker {
152- fn new ( default_cluster_id : u32 , thresholds : BlockThresholds , head_of_snapshot : bool ) -> Self {
149+ fn new ( default_cluster_id : u32 , hilbert_min_bytes : usize , head_of_snapshot : bool ) -> Self {
153150 Self {
154151 segments : vec ! [ ] ,
152+ last_segment : None ,
155153 default_cluster_id,
156- thresholds,
157- total_rows : 0 ,
158- total_size : 0 ,
154+ hilbert_min_bytes,
155+ total_bytes : 0 ,
159156 finished : false ,
160157 head_of_snapshot,
161158 }
162159 }
163160
164161 fn add ( & mut self , location : SegmentLocation , segment : Arc < CompactSegmentInfo > ) -> bool {
165- let row_count = segment. summary . row_count as usize ;
166- let byte_size = segment. summary . uncompressed_byte_size as usize ;
167- self . total_rows += row_count;
168- self . total_size += byte_size;
169- if !self
170- . thresholds
171- . check_large_enough ( self . total_rows , self . total_size )
172- {
173- // totals < N
174- self . segments . push ( ( location, segment) ) ;
175- return false ;
176- }
177-
178162 let segment_should_recluster = self . should_recluster ( & segment, |v| {
179163 v. cluster_key_id != self . default_cluster_id || v. level != -1
180164 } ) ;
181- let mut retained = false ;
182- if !self . head_of_snapshot || segment_should_recluster {
183- if self
184- . thresholds
185- . check_for_compact ( self . total_rows , self . total_size )
186- {
187- // N <= totals < 2N
188- self . segments . push ( ( location, segment) ) ;
189- retained = true ;
190- } else if segment_should_recluster {
191- // totals >= 2N
192- self . segments = vec ! [ ( location, segment) ] ;
193- self . total_rows = row_count;
194- self . total_size = byte_size;
195- self . finished = true ;
196- return true ;
197- }
165+
166+ if segment_should_recluster || !self . head_of_snapshot {
167+ self . total_bytes += segment. summary . uncompressed_byte_size as usize ;
168+ self . segments . push ( ( location. clone ( ) , segment. clone ( ) ) ) ;
198169 }
199170
200- if self . check_for_recluster ( ) {
201- if !retained {
202- self . total_rows -= row_count ;
203- self . total_size -= byte_size ;
171+ if !segment_should_recluster || self . total_bytes >= self . hilbert_min_bytes {
172+ if self . check_for_recluster ( ) {
173+ self . finished = true ;
174+ return true ;
204175 }
205- self . finished = true ;
206- return true ;
176+ self . last_segment = Some ( ( location , segment ) ) ;
177+ self . reset ( ) ;
207178 }
208179
209- self . reset ( ) ;
210180 false
211181 }
212182
213183 fn finalize ( & mut self ) -> Vec < ( SegmentLocation , Arc < CompactSegmentInfo > ) > {
214- if !self . finished && !self . check_for_recluster ( ) {
215- return vec ! [ ] ;
184+ if !self . finished {
185+ if let Some ( ( location, segment) ) = self . last_segment . take ( ) {
186+ self . segments . push ( ( location, segment) ) ;
187+ }
188+ if !self . check_for_recluster ( ) {
189+ return vec ! [ ] ;
190+ }
216191 }
217192 std:: mem:: take ( & mut self . segments )
218193 }
@@ -233,8 +208,7 @@ impl ReclusterChecker {
233208 }
234209
235210 fn reset ( & mut self ) {
236- self . total_rows = 0 ;
237- self . total_size = 0 ;
211+ self . total_bytes = 0 ;
238212 self . head_of_snapshot = false ;
239213 self . segments . clear ( ) ;
240214 }
0 commit comments