@@ -138,18 +138,19 @@ impl TransformHilbertPartitionCollect {
138138 let partitions: Vec < usize > = ( 0 ..num_partitions)
139139 . filter ( |& partition| partition % num_processors == processor_id)
140140 . collect ( ) ;
141- log:: warn!(
142- "collect processor id: {}, partitions: {:?}" ,
143- processor_id,
144- partitions
145- ) ;
146141
147142 // Map each partition id to new partition id.
148143 let mut partition_id = vec ! [ 0 ; num_partitions] ;
149144 for ( new_partition_id, partition) in partitions. iter ( ) . enumerate ( ) {
150145 partition_id[ * partition] = new_partition_id;
151146 }
152147
148+ log:: warn!(
149+ "new processor id: {}, partitions: {:?}, partition_id: {:?}" ,
150+ processor_id,
151+ partitions,
152+ partition_id
153+ ) ;
153154 let location_prefix = ctx. query_id_spill_prefix ( ) ;
154155 let spill_config = SpillerConfig {
155156 spiller_type : SpillerType :: Window ,
@@ -190,21 +191,21 @@ impl TransformHilbertPartitionCollect {
190191 . and_then ( WindowPartitionMeta :: downcast_from)
191192 {
192193 for ( partition_id, data_block) in meta. partitioned_data . into_iter ( ) {
193- log:: warn!(
194- "collect processor id: {}, partition_id: {}" ,
195- self . processor_id,
196- partition_id
197- ) ;
198194 self . process_size += data_block. num_rows ( ) ;
199- let partition_id = self . partition_id [ partition_id] ;
200- self . partition_sizes [ partition_id] += data_block. num_rows ( ) ;
201- if self . partition_sizes [ partition_id] >= self . max_block_size {
202- self . immediate_output_blocks
203- . push ( ( partition_id, data_block) ) ;
204- self . partition_sizes [ partition_id] = 0 ;
195+ let new_id = self . partition_id [ partition_id] ;
196+ self . partition_sizes [ new_id] += data_block. num_rows ( ) ;
197+ if self . partition_sizes [ new_id] >= self . max_block_size {
198+ log:: warn!(
199+ "immediate processor id: {}, partition_id: {}, new_id : {}" ,
200+ self . processor_id,
201+ partition_id,
202+ new_id
203+ ) ;
204+ self . immediate_output_blocks . push ( ( new_id, data_block) ) ;
205+ self . partition_sizes [ new_id] = 0 ;
205206 continue ;
206207 }
207- self . buffer . add_data_block ( partition_id , data_block) ;
208+ self . buffer . add_data_block ( new_id , data_block) ;
208209 }
209210 }
210211 Ok ( ( ) )
0 commit comments