1414
1515use databend_common_io:: constants:: DEFAULT_BLOCK_BUFFER_SIZE ;
1616use databend_common_io:: constants:: DEFAULT_BLOCK_COMPRESSED_SIZE ;
17- use databend_common_io:: constants:: DEFAULT_BLOCK_MAX_ROWS ;
18- use databend_common_io:: constants:: DEFAULT_BLOCK_MIN_ROWS ;
1917use databend_common_io:: constants:: DEFAULT_BLOCK_PER_SEGMENT ;
18+ use databend_common_io:: constants:: DEFAULT_BLOCK_ROW_COUNT ;
2019
2120#[ derive( Clone , Copy , Debug , serde:: Serialize , serde:: Deserialize ) ]
2221pub struct BlockThresholds {
2322 pub max_rows_per_block : usize ,
2423 pub min_rows_per_block : usize ,
24+
2525 pub max_bytes_per_block : usize ,
26- pub max_bytes_per_file : usize ,
26+ pub min_bytes_per_block : usize ,
27+
28+ pub max_compressed_per_block : usize ,
29+ pub min_compressed_per_block : usize ,
30+
2731 pub block_per_segment : usize ,
2832}
2933
3034impl Default for BlockThresholds {
3135 fn default ( ) -> BlockThresholds {
3236 BlockThresholds {
33- max_rows_per_block : DEFAULT_BLOCK_MAX_ROWS ,
34- min_rows_per_block : DEFAULT_BLOCK_MIN_ROWS ,
37+ max_rows_per_block : DEFAULT_BLOCK_ROW_COUNT ,
38+ min_rows_per_block : ( DEFAULT_BLOCK_ROW_COUNT * 4 ) . div_ceil ( 5 ) ,
3539 max_bytes_per_block : DEFAULT_BLOCK_BUFFER_SIZE ,
36- max_bytes_per_file : DEFAULT_BLOCK_COMPRESSED_SIZE ,
40+ min_bytes_per_block : ( DEFAULT_BLOCK_BUFFER_SIZE * 4 ) . div_ceil ( 5 ) ,
41+ max_compressed_per_block : DEFAULT_BLOCK_COMPRESSED_SIZE ,
42+ min_compressed_per_block : ( DEFAULT_BLOCK_COMPRESSED_SIZE * 4 ) . div_ceil ( 5 ) ,
3743 block_per_segment : DEFAULT_BLOCK_PER_SEGMENT ,
3844 }
3945 }
@@ -42,16 +48,17 @@ impl Default for BlockThresholds {
4248impl BlockThresholds {
4349 pub fn new (
4450 max_rows_per_block : usize ,
45- min_rows_per_block : usize ,
4651 max_bytes_per_block : usize ,
47- max_bytes_per_file : usize ,
52+ max_compressed_per_block : usize ,
4853 block_per_segment : usize ,
4954 ) -> Self {
5055 BlockThresholds {
5156 max_rows_per_block,
52- min_rows_per_block,
57+ min_rows_per_block : ( max_rows_per_block * 4 ) . div_ceil ( 5 ) ,
5358 max_bytes_per_block,
54- max_bytes_per_file,
59+ min_bytes_per_block : ( max_bytes_per_block * 4 ) . div_ceil ( 5 ) ,
60+ max_compressed_per_block,
61+ min_compressed_per_block : ( max_compressed_per_block * 4 ) . div_ceil ( 5 ) ,
5562 block_per_segment,
5663 }
5764 }
@@ -64,8 +71,8 @@ impl BlockThresholds {
6471 file_size : usize ,
6572 ) -> bool {
6673 row_count >= self . min_rows_per_block
67- || block_size >= self . max_bytes_per_block
68- || file_size >= self . max_bytes_per_file
74+ || block_size >= self . min_bytes_per_block
75+ || file_size >= self . min_compressed_per_block
6976 }
7077
7178 #[ inline]
@@ -78,53 +85,85 @@ impl BlockThresholds {
7885 ) -> bool {
7986 total_blocks >= self . block_per_segment
8087 && ( total_rows >= self . min_rows_per_block * self . block_per_segment
81- || total_bytes >= self . max_bytes_per_block * self . block_per_segment
82- || total_compressed >= self . max_bytes_per_file * self . block_per_segment )
88+ || total_bytes >= self . min_bytes_per_block * self . block_per_segment
89+ || total_compressed >= self . min_compressed_per_block * self . block_per_segment )
8390 }
8491
8592 #[ inline]
8693 pub fn check_large_enough ( & self , row_count : usize , block_size : usize ) -> bool {
87- row_count >= self . min_rows_per_block || block_size >= self . max_bytes_per_block
94+ row_count >= self . min_rows_per_block || block_size >= self . min_bytes_per_block
8895 }
8996
9097 #[ inline]
9198 pub fn check_for_compact ( & self , row_count : usize , block_size : usize ) -> bool {
92- row_count < 2 * self . min_rows_per_block && block_size < 2 * self . max_bytes_per_block
99+ row_count < 2 * self . min_rows_per_block && block_size < 2 * self . min_bytes_per_block
93100 }
94101
95102 #[ inline]
96103 pub fn check_too_small ( & self , row_count : usize , block_size : usize , file_size : usize ) -> bool {
97104 row_count < self . min_rows_per_block / 2
98- && block_size < self . max_bytes_per_block / 2
99- && file_size <= self . max_bytes_per_file / 2
105+ && block_size < self . min_bytes_per_block / 2
106+ && file_size < self . min_compressed_per_block / 2
100107 }
101108
102109 #[ inline]
103- pub fn calc_rows_per_block (
110+ pub fn calc_rows_for_compact ( & self , total_bytes : usize , total_rows : usize ) -> usize {
111+ if self . check_for_compact ( total_rows, total_bytes) {
112+ return total_rows;
113+ }
114+
115+ let block_num_by_rows = std:: cmp:: max ( total_rows / self . min_rows_per_block , 1 ) ;
116+ let block_num_by_size = total_bytes / self . min_bytes_per_block ;
117+ if block_num_by_rows >= block_num_by_size {
118+ return self . max_rows_per_block ;
119+ }
120+ total_rows. div_ceil ( block_num_by_size)
121+ }
122+
123+ /// Calculates the optimal number of rows per block based on total data size and row count.
124+ ///
125+ /// # Parameters
126+ /// - `total_bytes`: The total size of the data in bytes.
127+ /// - `total_rows`: The total number of rows in the data.
128+ /// - `total_compressed`: The total compressed size of the data in bytes.
129+ ///
130+ /// # Returns
131+ /// - The calculated number of rows per block that satisfies the thresholds.
132+ #[ inline]
133+ pub fn calc_rows_for_recluster (
104134 & self ,
105- total_bytes : usize ,
106135 total_rows : usize ,
136+ total_bytes : usize ,
107137 total_compressed : usize ,
108138 ) -> usize {
109- if self . check_for_compact ( total_rows, total_bytes) {
139+ // Check if the data is compact enough to skip further calculations.
140+ if self . check_for_compact ( total_rows, total_bytes)
141+ && total_compressed < 2 * self . min_compressed_per_block
142+ {
110143 return total_rows;
111144 }
112145
113146 let block_num_by_rows = std:: cmp:: max ( total_rows / self . min_rows_per_block , 1 ) ;
114- let block_num_by_size = std:: cmp:: max (
115- total_bytes / self . max_bytes_per_block ,
116- total_compressed / self . max_bytes_per_file ,
117- ) ;
118- if block_num_by_rows >= block_num_by_size {
147+ let block_num_by_compressed = total_compressed. div_ceil ( self . max_compressed_per_block ) ;
148+ // If row-based block count exceeds compressed-based block count, use max rows per block.
149+ if block_num_by_rows >= block_num_by_compressed {
119150 return self . max_rows_per_block ;
120151 }
121152
122- let mut rows_per_block = total_rows. div_ceil ( block_num_by_size) ;
123- if rows_per_block < self . max_rows_per_block / 2 {
124- // If block rows < 500_000, max_bytes_per_block set to 125M
125- let block_num_by_size = ( 4 * block_num_by_size / 5 ) . max ( 1 ) ;
126- rows_per_block = total_rows. div_ceil ( block_num_by_size) ;
127- }
128- rows_per_block
153+ let bytes_per_block = total_bytes. div_ceil ( block_num_by_compressed) ;
154+ // Adjust the number of blocks based on block size thresholds.
155+ let max_bytes_per_block = ( 4 * self . min_bytes_per_block ) . min ( 400 * 1024 * 1024 ) ;
156+ let min_bytes_per_block = ( self . min_bytes_per_block / 2 ) . min ( 50 * 1024 * 1024 ) ;
157+ let block_nums = if bytes_per_block > max_bytes_per_block {
158+ // Case 1: If the block size is too bigger.
159+ total_bytes. div_ceil ( max_bytes_per_block)
160+ } else if bytes_per_block < min_bytes_per_block {
161+ // Case 2: If the block size is too smaller.
162+ total_bytes / min_bytes_per_block
163+ } else {
164+ // Case 3: Otherwise, use the compressed-based block count.
165+ block_num_by_compressed
166+ } ;
167+ total_rows. div_ceil ( block_nums. max ( 1 ) ) . max ( 1 )
129168 }
130169}
0 commit comments