@@ -82,31 +82,24 @@ impl Exchange for HilbertPartitionExchange {
8282
8383enum State {
8484 Consume ,
85- Flush ,
8685 Spill ,
8786 Restore ,
8887 Concat ,
89- Coalesce ,
9088}
9189
9290pub struct TransformHilbertPartitionCollect {
9391 input : Arc < InputPort > ,
9492 output : Arc < OutputPort > ,
9593
96- immediate_output_blocks : Vec < ( usize , DataBlock ) > ,
9794 restored_data_blocks : Vec < DataBlock > ,
98- pending_small_blocks : Vec < DataBlock > ,
9995 output_data_blocks : Vec < DataBlock > ,
10096
10197 max_block_size : usize ,
10298 // The partition id is used to map the partition id to the new partition id.
10399 partition_id : Vec < usize > ,
104- partition_sizes : Vec < usize > ,
105100 // The buffer is used to control the memory usage of the window operator.
106101 buffer : WindowPartitionBuffer ,
107102 state : State ,
108- process_size : usize ,
109- processor_id : usize ,
110103}
111104
112105impl TransformHilbertPartitionCollect {
@@ -134,12 +127,6 @@ impl TransformHilbertPartitionCollect {
134127 partition_id[ * partition] = new_partition_id;
135128 }
136129
137- log:: warn!(
138- "new processor id: {}, partitions: {:?}, partition_id: {:?}" ,
139- processor_id,
140- partitions,
141- partition_id
142- ) ;
143130 let location_prefix = ctx. query_id_spill_prefix ( ) ;
144131 let spill_config = SpillerConfig {
145132 spiller_type : SpillerType :: Window ,
@@ -161,15 +148,10 @@ impl TransformHilbertPartitionCollect {
161148 output,
162149 partition_id,
163150 buffer,
164- immediate_output_blocks : vec ! [ ] ,
165- pending_small_blocks : vec ! [ ] ,
166151 output_data_blocks : vec ! [ ] ,
167152 restored_data_blocks : Vec :: new ( ) ,
168153 max_block_size,
169- partition_sizes : vec ! [ 0 ; num_partitions] ,
170154 state : State :: Consume ,
171- process_size : 0 ,
172- processor_id,
173155 } )
174156 }
175157
@@ -180,21 +162,8 @@ impl TransformHilbertPartitionCollect {
180162 . and_then ( WindowPartitionMeta :: downcast_from)
181163 {
182164 for ( partition_id, data_block) in meta. partitioned_data . into_iter ( ) {
183- self . process_size += data_block. num_rows ( ) ;
184- let new_id = self . partition_id [ partition_id] ;
185- self . partition_sizes [ new_id] += data_block. num_rows ( ) ;
186- if self . partition_sizes [ new_id] >= self . max_block_size {
187- log:: warn!(
188- "immediate processor id: {}, partition_id: {}, new_id : {}" ,
189- self . processor_id,
190- partition_id,
191- new_id
192- ) ;
193- self . immediate_output_blocks . push ( ( new_id, data_block) ) ;
194- self . partition_sizes [ new_id] = 0 ;
195- continue ;
196- }
197- self . buffer . add_data_block ( new_id, data_block) ;
165+ let partition_id = self . partition_id [ partition_id] ;
166+ self . buffer . add_data_block ( partition_id, data_block) ;
198167 }
199168 }
200169 Ok ( ( ) )
@@ -216,20 +185,15 @@ impl Processor for TransformHilbertPartitionCollect {
216185 }
217186
218187 fn event ( & mut self ) -> Result < Event > {
219- if matches ! ( self . state, State :: Concat | State :: Coalesce ) {
188+ if matches ! ( self . state, State :: Concat ) {
220189 return Ok ( Event :: Sync ) ;
221190 }
222191
223- if matches ! ( self . state, State :: Flush | State :: Restore | State :: Spill ) {
192+ if matches ! ( self . state, State :: Restore | State :: Spill ) {
224193 return Ok ( Event :: Async ) ;
225194 }
226195
227196 if self . output . is_finished ( ) {
228- log:: warn!(
229- "process id: {}, process rows: {}" ,
230- self . processor_id,
231- self . process_size
232- ) ;
233197 return Ok ( Event :: Finished ) ;
234198 }
235199
@@ -242,11 +206,6 @@ impl Processor for TransformHilbertPartitionCollect {
242206 return Ok ( Event :: NeedConsume ) ;
243207 }
244208
245- if !self . immediate_output_blocks . is_empty ( ) {
246- self . state = State :: Flush ;
247- return Ok ( Event :: Async ) ;
248- }
249-
250209 if self . need_spill ( ) {
251210 self . state = State :: Spill ;
252211 return Ok ( Event :: Async ) ;
@@ -258,16 +217,7 @@ impl Processor for TransformHilbertPartitionCollect {
258217 return Ok ( Event :: Async ) ;
259218 }
260219
261- if !self . pending_small_blocks . is_empty ( ) {
262- self . state = State :: Coalesce ;
263- return Ok ( Event :: Sync ) ;
264- }
265220 self . output . finish ( ) ;
266- log:: warn!(
267- "process id: {}, process rows: {}" ,
268- self . processor_id,
269- self . process_size
270- ) ;
271221 return Ok ( Event :: Finished ) ;
272222 }
273223
@@ -278,11 +228,6 @@ impl Processor for TransformHilbertPartitionCollect {
278228 self . state = State :: Spill ;
279229 return Ok ( Event :: Async ) ;
280230 }
281-
282- if !self . immediate_output_blocks . is_empty ( ) {
283- self . state = State :: Flush ;
284- return Ok ( Event :: Async ) ;
285- }
286231 }
287232
288233 self . input . set_need_data ( ) ;
@@ -294,15 +239,6 @@ impl Processor for TransformHilbertPartitionCollect {
294239 State :: Concat => {
295240 let restored_data_blocks = std:: mem:: take ( & mut self . restored_data_blocks ) ;
296241 let block = DataBlock :: concat ( & restored_data_blocks) ?;
297- if block. num_rows ( ) < self . max_block_size / 10 {
298- self . pending_small_blocks . push ( block) ;
299- } else {
300- self . output_data_blocks . push ( block) ;
301- }
302- }
303- State :: Coalesce => {
304- let pending_small_blocks = std:: mem:: take ( & mut self . pending_small_blocks ) ;
305- let block = DataBlock :: concat ( & pending_small_blocks) ?;
306242 self . output_data_blocks . push ( block) ;
307243 }
308244 _ => unreachable ! ( ) ,
@@ -313,26 +249,9 @@ impl Processor for TransformHilbertPartitionCollect {
313249 #[ async_backtrace:: framed]
314250 async fn async_process ( & mut self ) -> Result < ( ) > {
315251 match std:: mem:: replace ( & mut self . state , State :: Consume ) {
316- State :: Flush => {
317- if let Some ( ( partition_id, data_block) ) = self . immediate_output_blocks . pop ( ) {
318- self . restored_data_blocks = self . buffer . restore_by_id ( partition_id) . await ?;
319- self . restored_data_blocks . push ( data_block) ;
320- log:: warn!(
321- "flush processor id: {}, partition id: {}" ,
322- self . processor_id,
323- partition_id
324- ) ;
325- self . state = State :: Concat ;
326- }
327- }
328252 State :: Spill => self . buffer . spill ( ) . await ?,
329253 State :: Restore => {
330- let ( id, blocks) = self . buffer . restore_with_id ( ) . await ?;
331- log:: warn!(
332- "restore processor id: {}, partition id: {}" ,
333- self . processor_id,
334- id
335- ) ;
254+ let blocks = self . buffer . restore ( ) . await ?;
336255 if !blocks. is_empty ( ) {
337256 self . restored_data_blocks = blocks;
338257 self . state = State :: Concat ;
0 commit comments