diff --git a/src/query/expression/src/aggregate/payload.rs b/src/query/expression/src/aggregate/payload.rs index d7f01dcf1e48e..504699715e466 100644 --- a/src/query/expression/src/aggregate/payload.rs +++ b/src/query/expression/src/aggregate/payload.rs @@ -390,15 +390,14 @@ impl Payload { true } - pub fn empty_block(&self) -> DataBlock { - let columns = self - .aggrs - .iter() - .map(|f| ColumnBuilder::with_capacity(&f.return_type().unwrap(), 0).build()) + pub fn empty_block(&self, fake_rows: Option) -> DataBlock { + let fake_rows = fake_rows.unwrap_or(0); + let columns = (0..self.aggrs.len()) + .map(|_| ColumnBuilder::repeat_default(&DataType::Binary, fake_rows).build()) .chain( self.group_types .iter() - .map(|t| ColumnBuilder::with_capacity(t, 0).build()), + .map(|t| ColumnBuilder::repeat_default(t, fake_rows).build()), ) .collect_vec(); DataBlock::new_from_columns(columns) diff --git a/src/query/expression/src/aggregate/payload_flush.rs b/src/query/expression/src/aggregate/payload_flush.rs index 632ec78f69990..e4b4735394c17 100644 --- a/src/query/expression/src/aggregate/payload_flush.rs +++ b/src/query/expression/src/aggregate/payload_flush.rs @@ -121,7 +121,7 @@ impl Payload { } if blocks.is_empty() { - return Ok(self.empty_block()); + return Ok(self.empty_block(None)); } DataBlock::concat(&blocks) } @@ -173,7 +173,7 @@ impl Payload { } if blocks.is_empty() { - return Ok(self.empty_block()); + return Ok(self.empty_block(None)); } DataBlock::concat(&blocks) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs index b19f952c78eca..cb594d770001a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs @@ -173,26 +173,20 @@ impl FlightScatter for HashTableHashScatter { AggregateMeta::Partitioned { .. } => unreachable!(), AggregateMeta::AggregateSpilling(payload) => { for p in scatter_partitioned_payload(payload, self.buckets)? { - blocks.push(match p.len() == 0 { - true => DataBlock::empty(), - false => DataBlock::empty_with_meta( - AggregateMeta::create_agg_spilling(p), - ), - }); + blocks.push(DataBlock::empty_with_meta( + AggregateMeta::create_agg_spilling(p), + )); } } AggregateMeta::AggregatePayload(p) => { for payload in scatter_payload(p.payload, self.buckets)? { - blocks.push(match payload.len() == 0 { - true => DataBlock::empty(), - false => { - DataBlock::empty_with_meta(AggregateMeta::create_agg_payload( - p.bucket, - payload, - p.max_partition_count, - )) - } - }); + blocks.push(DataBlock::empty_with_meta( + AggregateMeta::create_agg_payload( + p.bucket, + payload, + p.max_partition_count, + ), + )); } } }; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs index 12c01ecebaefe..0944a0c36c1c1 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs @@ -212,6 +212,7 @@ impl NewTransformPartitionBucket { #[allow(unused_assignments)] fn add_bucket(&mut self, mut data_block: DataBlock) -> Result<(isize, usize)> { let (mut bucket, mut partition_count) = (0, 0); + let mut is_empty_block = false; if let Some(block_meta) = data_block.get_meta() { if let Some(block_meta) = AggregateMeta::downcast_ref_from(block_meta) { (bucket, partition_count) = match block_meta { @@ -250,7 +251,11 @@ impl NewTransformPartitionBucket { if let Some(AggregateMeta::Spilled(buckets_payload)) = AggregateMeta::downcast_from(meta) { - let partition_count = buckets_payload[0].max_partition_count; + let partition_count = if !buckets_payload.is_empty() { + buckets_payload[0].max_partition_count + } else { + MAX_PARTITION_COUNT + }; self.max_partition_count = self.max_partition_count.max(partition_count); @@ -274,12 +279,14 @@ impl NewTransformPartitionBucket { unreachable!() } AggregateMeta::Serialized(payload) => { + is_empty_block = payload.data_block.is_empty(); self.max_partition_count = self.max_partition_count.max(payload.max_partition_count); (payload.bucket, payload.max_partition_count) } AggregateMeta::AggregatePayload(payload) => { + is_empty_block = payload.payload.len() == 0; self.max_partition_count = self.max_partition_count.max(payload.max_partition_count); @@ -298,23 +305,25 @@ impl NewTransformPartitionBucket { )); } - if self.all_inputs_init { - if partition_count != self.max_partition_count { - return Err(ErrorCode::Internal( + if !is_empty_block { + if self.all_inputs_init { + if partition_count != self.max_partition_count { + return Err(ErrorCode::Internal( "Internal, the partition count does not equal the max partition count on TransformPartitionBucket. ", )); - } - match self.buckets_blocks.entry(bucket) { - Entry::Vacant(v) => { - v.insert(vec![data_block]); - } - Entry::Occupied(mut v) => { - v.get_mut().push(data_block); } - }; - } else { - self.unpartitioned_blocks.push(data_block); + match self.buckets_blocks.entry(bucket) { + Entry::Vacant(v) => { + v.insert(vec![data_block]); + } + Entry::Occupied(mut v) => { + v.get_mut().push(data_block); + } + }; + } else { + self.unpartitioned_blocks.push(data_block); + } } Ok((bucket, partition_count)) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs index 731ec4e1b1049..b83cf2c97c90e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs @@ -31,21 +31,15 @@ pub struct AggregateSerdeMeta { pub columns_layout: Vec, // use for new agg hashtable pub max_partition_count: usize, + pub is_empty: bool, } impl AggregateSerdeMeta { - pub fn create(bucket: isize) -> BlockMetaInfoPtr { - Box::new(AggregateSerdeMeta { - typ: BUCKET_TYPE, - bucket, - location: None, - data_range: None, - columns_layout: vec![], - max_partition_count: 0, - }) - } - - pub fn create_agg_payload(bucket: isize, max_partition_count: usize) -> BlockMetaInfoPtr { + pub fn create_agg_payload( + bucket: isize, + max_partition_count: usize, + is_empty: bool, + ) -> BlockMetaInfoPtr { Box::new(AggregateSerdeMeta { typ: BUCKET_TYPE, bucket, @@ -53,6 +47,7 @@ impl AggregateSerdeMeta { data_range: None, columns_layout: vec![], max_partition_count, + is_empty, }) } @@ -61,6 +56,7 @@ impl AggregateSerdeMeta { location: String, data_range: Range, columns_layout: Vec, + is_empty: bool, ) -> BlockMetaInfoPtr { Box::new(AggregateSerdeMeta { typ: SPILLED_TYPE, @@ -69,6 +65,7 @@ impl AggregateSerdeMeta { location: Some(location), data_range: Some(data_range), max_partition_count: 0, + is_empty, }) } @@ -86,6 +83,7 @@ impl AggregateSerdeMeta { location: Some(location), data_range: Some(data_range), max_partition_count, + is_empty: false, }) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs index bd3bdb24d6339..096485fa98fcc 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs @@ -183,6 +183,7 @@ pub struct SerializeAggregateStream { pub payload: Pin>, flush_state: PayloadFlushState, end_iter: bool, + nums: usize, } unsafe impl Send for SerializeAggregateStream {} @@ -198,6 +199,7 @@ impl SerializeAggregateStream { flush_state: PayloadFlushState::default(), _params: params.clone(), end_iter: false, + nums: 0, } } } @@ -225,10 +227,32 @@ impl SerializeAggregateStream { } match block { - Some(block) => Ok(Some(block.add_meta(Some( - AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count), - ))?)), - None => Ok(None), + Some(block) => { + self.nums += 1; + Ok(Some(block.add_meta(Some( + AggregateSerdeMeta::create_agg_payload( + p.bucket, + p.max_partition_count, + false, + ), + ))?)) + } + None => { + // always return at least one block + if self.nums == 0 { + self.nums += 1; + let block = p.payload.empty_block(Some(1)); + Ok(Some(block.add_meta(Some( + AggregateSerdeMeta::create_agg_payload( + p.bucket, + p.max_partition_count, + true, + ), + ))?)) + } else { + Ok(None) + } + } } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs index c73799432d17d..ed82428bbc7d2 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs @@ -89,18 +89,26 @@ impl TransformDeserializer { } Some(meta) => { return match meta.typ == BUCKET_TYPE { - true => Ok(DataBlock::empty_with_meta( - AggregateMeta::create_serialized( - meta.bucket, - deserialize_block( - dict, - fragment_data, - &self.schema, - self.arrow_schema.clone(), - )?, - meta.max_partition_count, - ), - )), + true => { + let mut block = deserialize_block( + dict, + fragment_data, + &self.schema, + self.arrow_schema.clone(), + )?; + + if meta.is_empty { + block = block.slice(0..0); + } + + Ok(DataBlock::empty_with_meta( + AggregateMeta::create_serialized( + meta.bucket, + block, + meta.max_partition_count, + ), + )) + } false => { let data_schema = Arc::new(exchange_defines::spilled_schema()); let arrow_schema = Arc::new(exchange_defines::spilled_arrow_schema()); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 65e9e5cdc513a..a034b86038acb 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -149,6 +149,8 @@ impl BlockMetaTransform for TransformExchangeAggregateSeria } Some(AggregateMeta::AggregatePayload(p)) => { + let (bucket, max_partition_count) = (p.bucket, p.max_partition_count); + if index == self.local_pos { serialized_blocks.push(FlightSerialized::DataBlock( block.add_meta(Some(Box::new(AggregateMeta::AggregatePayload(p))))?, @@ -156,24 +158,19 @@ impl BlockMetaTransform for TransformExchangeAggregateSeria continue; } - let bucket = compute_block_number(p.bucket, p.max_partition_count)?; + let block_number = compute_block_number(bucket, max_partition_count)?; let stream = SerializeAggregateStream::create( &self.params, SerializePayload::AggregatePayload(p), ); let mut stream_blocks = stream.into_iter().collect::>>()?; - - if stream_blocks.is_empty() { - serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); - } else { - let mut c = DataBlock::concat(&stream_blocks)?; - if let Some(meta) = stream_blocks[0].take_meta() { - c.replace_meta(meta); - } - - let c = serialize_block(bucket, c, &self.options)?; - serialized_blocks.push(FlightSerialized::DataBlock(c)); + debug_assert!(!stream_blocks.is_empty()); + let mut c = DataBlock::concat(&stream_blocks)?; + if let Some(meta) = stream_blocks[0].take_meta() { + c.replace_meta(meta); } + let c = serialize_block(block_number, c, &self.options)?; + serialized_blocks.push(FlightSerialized::DataBlock(c)); } }; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index 2a719a23c23d8..beded12043100 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -59,6 +59,7 @@ impl TransformFinalAggregate { AggregateMeta::Serialized(payload) => match agg_hashtable.as_mut() { Some(ht) => { debug_assert!(bucket == payload.bucket); + let payload = payload.convert_to_partitioned_payload( self.params.group_data_types.clone(), self.params.aggregate_functions.clone(), diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs b/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs index 5c44b2be9cbf6..24991a001a112 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs @@ -125,6 +125,10 @@ impl IEJoinState { fn intersection(&self, left_block: &DataBlock, right_block: &DataBlock) -> bool { let left_len = left_block.num_rows(); let right_len = right_block.num_rows(); + if left_len == 0 || right_len == 0 { + return false; + } + let left_l1_column = left_block.columns()[0] .value .convert_to_full_column(&self.l1_data_type, left_len); diff --git a/src/query/service/src/pipelines/processors/transforms/transform_srf.rs b/src/query/service/src/pipelines/processors/transforms/transform_srf.rs index 63e6b94b3bab5..ab71d492ed2cf 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_srf.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_srf.rs @@ -113,6 +113,9 @@ impl BlockingTransform for TransformSRF { } let input = self.input.take().unwrap(); + if input.is_empty() { + return Ok(None); + } let mut result_size = 0; let mut used = 0;