@@ -183,6 +183,7 @@ impl Interpreter for ReclusterTableInterpreter {
183183impl ReclusterTableInterpreter {
184184 async fn execute_recluster ( & self , push_downs : & mut Option < PushDownInfo > ) -> Result < bool > {
185185 let start = SystemTime :: now ( ) ;
186+ let settings = self . ctx . get_settings ( ) ;
186187
187188 let ReclusterPlan {
188189 catalog,
@@ -213,8 +214,7 @@ impl ReclusterTableInterpreter {
213214 if push_downs. is_none ( ) {
214215 if let Some ( expr) = & selection {
215216 let ( mut bind_context, metadata) = bind_table ( tbl. clone ( ) ) ?;
216- let name_resolution_ctx =
217- NameResolutionContext :: try_from ( self . ctx . get_settings ( ) . as_ref ( ) ) ?;
217+ let name_resolution_ctx = NameResolutionContext :: try_from ( settings. as_ref ( ) ) ?;
218218 let mut type_checker = TypeChecker :: try_create (
219219 & mut bind_context,
220220 self . ctx . clone ( ) ,
@@ -249,14 +249,13 @@ impl ReclusterTableInterpreter {
249249 let block_thresholds = tbl. get_block_thresholds ( ) ;
250250 let total_bytes = recluster_info. removed_statistics . uncompressed_byte_size as usize ;
251251 let total_rows = recluster_info. removed_statistics . row_count as usize ;
252- let rows_per_block = block_thresholds
253- . calc_rows_per_block ( total_bytes , total_rows )
254- . max ( 65536 ) ;
252+ let rows_per_block = block_thresholds. calc_rows_per_block ( total_bytes , total_rows ) ;
253+ let block_size = settings . get_max_block_size ( ) ? ;
254+ settings . set_max_block_size ( std :: cmp :: min ( rows_per_block as u64 , block_size ) ) ? ;
255255 let total_partitions = std:: cmp:: max ( total_rows / rows_per_block, 1 ) ;
256256
257257 let ast_exprs = tbl. resolve_cluster_keys ( self . ctx . clone ( ) ) . unwrap ( ) ;
258258 let cluster_keys_len = ast_exprs. len ( ) ;
259- let settings = self . ctx . get_settings ( ) ;
260259 let name_resolution_ctx = NameResolutionContext :: try_from ( settings. as_ref ( ) ) ?;
261260 let cluster_key_strs = ast_exprs. into_iter ( ) . fold (
262261 Vec :: with_capacity ( cluster_keys_len) ,
@@ -277,7 +276,10 @@ impl ReclusterTableInterpreter {
277276 let subquery_executor = Arc :: new ( ServiceQueryExecutor :: new (
278277 QueryContext :: create_from ( self . ctx . as_ref ( ) ) ,
279278 ) ) ;
280- let partitions = settings. get_hilbert_num_range_ids ( ) ?;
279+ let partitions = std:: cmp:: max (
280+ total_partitions,
281+ settings. get_hilbert_num_range_ids ( ) ? as usize ,
282+ ) ;
281283 let sample_size = settings. get_hilbert_sample_size_per_block ( ) ?;
282284 let mut keys_bounds = Vec :: with_capacity ( cluster_key_strs. len ( ) ) ;
283285 for ( index, cluster_key_str) in cluster_key_strs. iter ( ) . enumerate ( ) {
@@ -348,11 +350,7 @@ impl ReclusterTableInterpreter {
348350 FROM {database}.{table}"
349351 ) ;
350352 let tokens = tokenize_sql ( query. as_str ( ) ) ?;
351- let sql_dialect = self
352- . ctx
353- . get_settings ( )
354- . get_sql_dialect ( )
355- . unwrap_or_default ( ) ;
353+ let sql_dialect = settings. get_sql_dialect ( ) . unwrap_or_default ( ) ;
356354 let ( stmt, _) = parse_sql ( & tokens, sql_dialect) ?;
357355
358356 let mut planner = Planner :: new ( self . ctx . clone ( ) ) ;
@@ -410,7 +408,6 @@ impl ReclusterTableInterpreter {
410408 input : plan,
411409 table_info : table_info. clone ( ) ,
412410 num_partitions : total_partitions,
413- rows_per_block,
414411 } ) ) ;
415412
416413 if is_distributed {
@@ -527,7 +524,7 @@ impl ReclusterTableInterpreter {
527524 build_query_pipeline_without_render_result_set ( & self . ctx , & physical_plan) . await ?;
528525 debug_assert ! ( build_res. main_pipeline. is_complete_pipeline( ) ?) ;
529526
530- let max_threads = self . ctx . get_settings ( ) . get_max_threads ( ) ? as usize ;
527+ let max_threads = settings . get_max_threads ( ) ? as usize ;
531528 build_res. set_max_threads ( max_threads) ;
532529
533530 let executor_settings = ExecutorSettings :: try_create ( self . ctx . clone ( ) ) ?;
0 commit comments