@@ -116,6 +116,8 @@ pub struct TransformHilbertPartitionCollect {
116116 // The buffer is used to control the memory usage of the window operator.
117117 buffer : WindowPartitionBuffer ,
118118 state : State ,
119+ process_size : usize ,
120+ processor_id : usize ,
119121}
120122
121123impl TransformHilbertPartitionCollect {
@@ -171,6 +173,8 @@ impl TransformHilbertPartitionCollect {
171173 max_block_size,
172174 partition_sizes : vec ! [ 0 ; num_partitions] ,
173175 state : State :: Consume ,
176+ process_size : 0 ,
177+ processor_id,
174178 } )
175179 }
176180
@@ -181,6 +185,7 @@ impl TransformHilbertPartitionCollect {
181185 . and_then ( WindowPartitionMeta :: downcast_from)
182186 {
183187 for ( partition_id, data_block) in meta. partitioned_data . into_iter ( ) {
188+ self . process_size += data_block. num_rows ( ) ;
184189 let partition_id = self . partition_id [ partition_id] ;
185190 self . partition_sizes [ partition_id] += data_block. num_rows ( ) ;
186191 if self . partition_sizes [ partition_id] >= self . max_block_size {
@@ -220,18 +225,18 @@ impl Processor for TransformHilbertPartitionCollect {
220225 }
221226
222227 if self . output . is_finished ( ) {
228+ log:: warn!(
229+ "process id: {}, process rows: {}" ,
230+ self . processor_id,
231+ self . process_size
232+ ) ;
223233 return Ok ( Event :: Finished ) ;
224234 }
225235
226236 if !self . output . can_push ( ) {
227237 return Ok ( Event :: NeedConsume ) ;
228238 }
229239
230- if self . need_spill ( ) {
231- self . state = State :: Spill ;
232- return Ok ( Event :: Async ) ;
233- }
234-
235240 if let Some ( data_block) = self . output_data_blocks . pop ( ) {
236241 self . output . push_data ( Ok ( data_block) ) ;
237242 return Ok ( Event :: NeedConsume ) ;
@@ -242,6 +247,11 @@ impl Processor for TransformHilbertPartitionCollect {
242247 return Ok ( Event :: Async ) ;
243248 }
244249
250+ if self . need_spill ( ) {
251+ self . state = State :: Spill ;
252+ return Ok ( Event :: Async ) ;
253+ }
254+
245255 if self . input . is_finished ( ) {
246256 if !self . buffer . is_empty ( ) {
247257 self . state = State :: Restore ;
@@ -253,6 +263,11 @@ impl Processor for TransformHilbertPartitionCollect {
253263 return Ok ( Event :: Sync ) ;
254264 }
255265 self . output . finish ( ) ;
266+ log:: warn!(
267+ "process id: {}, process rows: {}" ,
268+ self . processor_id,
269+ self . process_size
270+ ) ;
256271 return Ok ( Event :: Finished ) ;
257272 }
258273
@@ -302,13 +317,20 @@ impl Processor for TransformHilbertPartitionCollect {
302317 if let Some ( ( partition_id, data_block) ) = self . immediate_output_blocks . pop ( ) {
303318 self . restored_data_blocks = self . buffer . restore_by_id ( partition_id) . await ?;
304319 self . restored_data_blocks . push ( data_block) ;
320+ log:: warn!(
321+ "processor id: {}, restore id: {}" ,
322+ self . processor_id,
323+ partition_id
324+ ) ;
305325 self . state = State :: Concat ;
306326 }
307327 }
308328 State :: Spill => self . buffer . spill ( ) . await ?,
309329 State :: Restore => {
310- self . restored_data_blocks = self . buffer . restore ( ) . await ?;
311- if !self . restored_data_blocks . is_empty ( ) {
330+ let ( id, blocks) = self . buffer . restore_with_id ( ) . await ?;
331+ log:: warn!( "processor id: {}, restore id: {}" , self . processor_id, id) ;
332+ if !blocks. is_empty ( ) {
333+ self . restored_data_blocks = blocks;
312334 self . state = State :: Concat ;
313335 }
314336 }
0 commit comments