File tree Expand file tree Collapse file tree 5 files changed +41
-9
lines changed
service/src/pipelines/builders Expand file tree Collapse file tree 5 files changed +41
-9
lines changed Original file line number Diff line number Diff line change @@ -47,7 +47,7 @@ ignore = [
4747 " RUSTSEC-2024-0351" ,
4848 # gix-path: improperly resolves configuration path reported by Git
4949 " RUSTSEC-2024-0371" ,
50- # Remotely exploitable Denial of Service in Tonic, ignored temporarily
50+ # Remotely exploitable Denial of Service in Tonic, ignored temporarily
5151 " RUSTSEC-2024-0376" ,
5252 # rustls network-reachable panic in `Acceptor::accept`
5353 " RUSTSEC-2024-0399" ,
Original file line number Diff line number Diff line change @@ -89,24 +89,37 @@ impl HashTableConfig {
8989 self
9090 }
9191
92- pub fn with_partial ( mut self , partial_agg : bool , active_threads : usize ) -> Self {
92+ pub fn with_partial (
93+ mut self ,
94+ partial_agg : bool ,
95+ active_threads : usize ,
96+ min_skip_partial_aggregation_capacity : usize ,
97+ ) -> Self {
9398 self . partial_agg = partial_agg;
9499
95100 // init max_partial_capacity
96101 let total_shared_cache_size = active_threads * L3_CACHE_SIZE ;
97102 let cache_per_active_thread =
98103 L1_CACHE_SIZE + L2_CACHE_SIZE + total_shared_cache_size / active_threads;
99104 let size_per_entry = ( 8_f64 * LOAD_FACTOR ) as usize ;
100- let capacity = ( cache_per_active_thread / size_per_entry) . next_power_of_two ( ) ;
105+ let capacity = ( cache_per_active_thread / size_per_entry)
106+ . next_power_of_two ( )
107+ . max ( min_skip_partial_aggregation_capacity) ;
101108 self . max_partial_capacity = capacity;
102109
103110 self
104111 }
105112
106- pub fn cluster_with_partial ( mut self , partial_agg : bool , node_nums : usize ) -> Self {
113+ pub fn cluster_with_partial (
114+ mut self ,
115+ partial_agg : bool ,
116+ node_nums : usize ,
117+ min_skip_partial_aggregation_capacity : usize ,
118+ ) -> Self {
107119 self . partial_agg = partial_agg;
108120 self . repartition_radix_bits_incr = 4 ;
109- self . max_partial_capacity = 131072 * ( 2 << node_nums) ;
121+ self . max_partial_capacity =
122+ ( 131072 * ( 2 << node_nums) ) . max ( min_skip_partial_aggregation_capacity) ;
110123
111124 self
112125 }
Original file line number Diff line number Diff line change @@ -127,13 +127,22 @@ impl PipelineBuilder {
127127 let schema_before_group_by = params. input_schema . clone ( ) ;
128128 let sample_block = DataBlock :: empty_with_schema ( schema_before_group_by) ;
129129 let method = DataBlock :: choose_hash_method ( & sample_block, group_cols, efficiently_memory) ?;
130+ let min_skip_partial_aggregation_capacity =
131+ self . settings . get_min_skip_partial_aggregation_capacity ( ) ? as usize ;
130132
131133 // Need a global atomic to read the max current radix bits hint
132- let partial_agg_config = if self . ctx . get_cluster ( ) . is_empty ( ) {
133- HashTableConfig :: default ( ) . with_partial ( true , max_threads as usize )
134+ let partial_agg_config = if !self . is_exchange_neighbor {
135+ HashTableConfig :: default ( ) . with_partial (
136+ true ,
137+ max_threads as usize ,
138+ min_skip_partial_aggregation_capacity,
139+ )
134140 } else {
135- HashTableConfig :: default ( )
136- . cluster_with_partial ( true , self . ctx . get_cluster ( ) . nodes . len ( ) )
141+ HashTableConfig :: default ( ) . cluster_with_partial (
142+ true ,
143+ self . ctx . get_cluster ( ) . nodes . len ( ) ,
144+ min_skip_partial_aggregation_capacity,
145+ )
137146 } ;
138147
139148 self . main_pipeline . add_transform ( |input, output| {
Original file line number Diff line number Diff line change @@ -713,6 +713,12 @@ impl DefaultSettings {
713713 mode : SettingMode :: Both ,
714714 range : Some ( SettingRange :: Numeric ( 0 ..=1 ) ) ,
715715 } ) ,
716+ ( "min_skip_partial_aggregation_capacity" , DefaultSettingValue {
717+ value : UserSettingValue :: UInt64 ( 0 ) ,
718+ desc : "The min size of aggregate hashtable skip partial aggregation capacity" ,
719+ mode : SettingMode :: Both ,
720+ range : Some ( SettingRange :: Numeric ( 0 ..=u64:: MAX ) ) ,
721+ } ) ,
716722 ( "numeric_cast_option" , DefaultSettingValue {
717723 value : UserSettingValue :: String ( "rounding" . to_string ( ) ) ,
718724 desc : "Set numeric cast mode as \" rounding\" or \" truncating\" ." ,
Original file line number Diff line number Diff line change @@ -423,6 +423,10 @@ impl Settings {
423423 Ok ( self . try_get_u64 ( "enable_experimental_aggregate_hashtable" ) ? == 1 )
424424 }
425425
426+ pub fn get_min_skip_partial_aggregation_capacity ( & self ) -> Result < u64 > {
427+ self . try_get_u64 ( "min_skip_partial_aggregation_capacity" )
428+ }
429+
426430 pub fn get_lazy_read_threshold ( & self ) -> Result < u64 > {
427431 self . try_get_u64 ( "lazy_read_threshold" )
428432 }
You can’t perform that action at this time.
0 commit comments