@@ -138,6 +138,11 @@ impl TransformHilbertPartitionCollect {
138138 let partitions: Vec < usize > = ( 0 ..num_partitions)
139139 . filter ( |& partition| partition % num_processors == processor_id)
140140 . collect ( ) ;
141+ log:: warn!(
142+ "collect processor id: {}, partitions: {:?}" ,
143+ processor_id,
144+ partitions
145+ ) ;
141146
142147 // Map each partition id to new partition id.
143148 let mut partition_id = vec ! [ 0 ; num_partitions] ;
@@ -185,6 +190,11 @@ impl TransformHilbertPartitionCollect {
185190 . and_then ( WindowPartitionMeta :: downcast_from)
186191 {
187192 for ( partition_id, data_block) in meta. partitioned_data . into_iter ( ) {
193+ log:: warn!(
194+ "collect processor id: {}, partition_id: {}" ,
195+ self . processor_id,
196+ partition_id
197+ ) ;
188198 self . process_size += data_block. num_rows ( ) ;
189199 let partition_id = self . partition_id [ partition_id] ;
190200 self . partition_sizes [ partition_id] += data_block. num_rows ( ) ;
@@ -317,18 +327,12 @@ impl Processor for TransformHilbertPartitionCollect {
317327 if let Some ( ( partition_id, data_block) ) = self . immediate_output_blocks . pop ( ) {
318328 self . restored_data_blocks = self . buffer . restore_by_id ( partition_id) . await ?;
319329 self . restored_data_blocks . push ( data_block) ;
320- log:: warn!(
321- "processor id: {}, restore id: {}" ,
322- self . processor_id,
323- partition_id
324- ) ;
325330 self . state = State :: Concat ;
326331 }
327332 }
328333 State :: Spill => self . buffer . spill ( ) . await ?,
329334 State :: Restore => {
330335 let ( id, blocks) = self . buffer . restore_with_id ( ) . await ?;
331- log:: warn!( "processor id: {}, restore id: {}" , self . processor_id, id) ;
332336 if !blocks. is_empty ( ) {
333337 self . restored_data_blocks = blocks;
334338 self . state = State :: Concat ;
0 commit comments