From 6294fb97df3ade87c7f207d8d7921ad803fe74c2 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 13 Jan 2025 20:17:22 +0800 Subject: [PATCH 1/3] fix(query): hash table scatter will always send agg meta (#17245) * fix(query): hash table scatter will always send agg meta * fix(query): hash table scatter will always send agg meta * fix(query): z * fix(query): z * update * update * update * z * z * z * z * z * z * z --- src/query/expression/src/aggregate/payload.rs | 11 +++--- .../expression/src/aggregate/payload_flush.rs | 4 +- .../aggregator/aggregate_exchange_injector.rs | 26 +++++-------- .../new_transform_partition_bucket.rs | 37 ++++++++++++------- .../transforms/aggregator/serde/serde_meta.rs | 22 +++++------ .../serde/transform_aggregate_serializer.rs | 32 ++++++++++++++-- .../serde/transform_deserializer.rs | 32 ++++++++++------ ...transform_exchange_aggregate_serializer.rs | 21 +++++------ .../aggregator/transform_aggregate_final.rs | 1 + .../transforms/range_join/ie_join_state.rs | 4 ++ .../processors/transforms/transform_srf.rs | 3 ++ 11 files changed, 115 insertions(+), 78 deletions(-) diff --git a/src/query/expression/src/aggregate/payload.rs b/src/query/expression/src/aggregate/payload.rs index d7f01dcf1e48e..504699715e466 100644 --- a/src/query/expression/src/aggregate/payload.rs +++ b/src/query/expression/src/aggregate/payload.rs @@ -390,15 +390,14 @@ impl Payload { true } - pub fn empty_block(&self) -> DataBlock { - let columns = self - .aggrs - .iter() - .map(|f| ColumnBuilder::with_capacity(&f.return_type().unwrap(), 0).build()) + pub fn empty_block(&self, fake_rows: Option) -> DataBlock { + let fake_rows = fake_rows.unwrap_or(0); + let columns = (0..self.aggrs.len()) + .map(|_| ColumnBuilder::repeat_default(&DataType::Binary, fake_rows).build()) .chain( self.group_types .iter() - .map(|t| ColumnBuilder::with_capacity(t, 0).build()), + .map(|t| ColumnBuilder::repeat_default(t, fake_rows).build()), ) .collect_vec(); DataBlock::new_from_columns(columns) diff --git a/src/query/expression/src/aggregate/payload_flush.rs b/src/query/expression/src/aggregate/payload_flush.rs index 632ec78f69990..e4b4735394c17 100644 --- a/src/query/expression/src/aggregate/payload_flush.rs +++ b/src/query/expression/src/aggregate/payload_flush.rs @@ -121,7 +121,7 @@ impl Payload { } if blocks.is_empty() { - return Ok(self.empty_block()); + return Ok(self.empty_block(None)); } DataBlock::concat(&blocks) } @@ -173,7 +173,7 @@ impl Payload { } if blocks.is_empty() { - return Ok(self.empty_block()); + return Ok(self.empty_block(None)); } DataBlock::concat(&blocks) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs index b19f952c78eca..cb594d770001a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs @@ -173,26 +173,20 @@ impl FlightScatter for HashTableHashScatter { AggregateMeta::Partitioned { .. } => unreachable!(), AggregateMeta::AggregateSpilling(payload) => { for p in scatter_partitioned_payload(payload, self.buckets)? { - blocks.push(match p.len() == 0 { - true => DataBlock::empty(), - false => DataBlock::empty_with_meta( - AggregateMeta::create_agg_spilling(p), - ), - }); + blocks.push(DataBlock::empty_with_meta( + AggregateMeta::create_agg_spilling(p), + )); } } AggregateMeta::AggregatePayload(p) => { for payload in scatter_payload(p.payload, self.buckets)? { - blocks.push(match payload.len() == 0 { - true => DataBlock::empty(), - false => { - DataBlock::empty_with_meta(AggregateMeta::create_agg_payload( - p.bucket, - payload, - p.max_partition_count, - )) - } - }); + blocks.push(DataBlock::empty_with_meta( + AggregateMeta::create_agg_payload( + p.bucket, + payload, + p.max_partition_count, + ), + )); } } }; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs index 12c01ecebaefe..0944a0c36c1c1 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs @@ -212,6 +212,7 @@ impl NewTransformPartitionBucket { #[allow(unused_assignments)] fn add_bucket(&mut self, mut data_block: DataBlock) -> Result<(isize, usize)> { let (mut bucket, mut partition_count) = (0, 0); + let mut is_empty_block = false; if let Some(block_meta) = data_block.get_meta() { if let Some(block_meta) = AggregateMeta::downcast_ref_from(block_meta) { (bucket, partition_count) = match block_meta { @@ -250,7 +251,11 @@ impl NewTransformPartitionBucket { if let Some(AggregateMeta::Spilled(buckets_payload)) = AggregateMeta::downcast_from(meta) { - let partition_count = buckets_payload[0].max_partition_count; + let partition_count = if !buckets_payload.is_empty() { + buckets_payload[0].max_partition_count + } else { + MAX_PARTITION_COUNT + }; self.max_partition_count = self.max_partition_count.max(partition_count); @@ -274,12 +279,14 @@ impl NewTransformPartitionBucket { unreachable!() } AggregateMeta::Serialized(payload) => { + is_empty_block = payload.data_block.is_empty(); self.max_partition_count = self.max_partition_count.max(payload.max_partition_count); (payload.bucket, payload.max_partition_count) } AggregateMeta::AggregatePayload(payload) => { + is_empty_block = payload.payload.len() == 0; self.max_partition_count = self.max_partition_count.max(payload.max_partition_count); @@ -298,23 +305,25 @@ impl NewTransformPartitionBucket { )); } - if self.all_inputs_init { - if partition_count != self.max_partition_count { - return Err(ErrorCode::Internal( + if !is_empty_block { + if self.all_inputs_init { + if partition_count != self.max_partition_count { + return Err(ErrorCode::Internal( "Internal, the partition count does not equal the max partition count on TransformPartitionBucket. ", )); - } - match self.buckets_blocks.entry(bucket) { - Entry::Vacant(v) => { - v.insert(vec![data_block]); - } - Entry::Occupied(mut v) => { - v.get_mut().push(data_block); } - }; - } else { - self.unpartitioned_blocks.push(data_block); + match self.buckets_blocks.entry(bucket) { + Entry::Vacant(v) => { + v.insert(vec![data_block]); + } + Entry::Occupied(mut v) => { + v.get_mut().push(data_block); + } + }; + } else { + self.unpartitioned_blocks.push(data_block); + } } Ok((bucket, partition_count)) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs index 731ec4e1b1049..b83cf2c97c90e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs @@ -31,21 +31,15 @@ pub struct AggregateSerdeMeta { pub columns_layout: Vec, // use for new agg hashtable pub max_partition_count: usize, + pub is_empty: bool, } impl AggregateSerdeMeta { - pub fn create(bucket: isize) -> BlockMetaInfoPtr { - Box::new(AggregateSerdeMeta { - typ: BUCKET_TYPE, - bucket, - location: None, - data_range: None, - columns_layout: vec![], - max_partition_count: 0, - }) - } - - pub fn create_agg_payload(bucket: isize, max_partition_count: usize) -> BlockMetaInfoPtr { + pub fn create_agg_payload( + bucket: isize, + max_partition_count: usize, + is_empty: bool, + ) -> BlockMetaInfoPtr { Box::new(AggregateSerdeMeta { typ: BUCKET_TYPE, bucket, @@ -53,6 +47,7 @@ impl AggregateSerdeMeta { data_range: None, columns_layout: vec![], max_partition_count, + is_empty, }) } @@ -61,6 +56,7 @@ impl AggregateSerdeMeta { location: String, data_range: Range, columns_layout: Vec, + is_empty: bool, ) -> BlockMetaInfoPtr { Box::new(AggregateSerdeMeta { typ: SPILLED_TYPE, @@ -69,6 +65,7 @@ impl AggregateSerdeMeta { location: Some(location), data_range: Some(data_range), max_partition_count: 0, + is_empty, }) } @@ -86,6 +83,7 @@ impl AggregateSerdeMeta { location: Some(location), data_range: Some(data_range), max_partition_count, + is_empty: false, }) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs index bd3bdb24d6339..096485fa98fcc 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs @@ -183,6 +183,7 @@ pub struct SerializeAggregateStream { pub payload: Pin>, flush_state: PayloadFlushState, end_iter: bool, + nums: usize, } unsafe impl Send for SerializeAggregateStream {} @@ -198,6 +199,7 @@ impl SerializeAggregateStream { flush_state: PayloadFlushState::default(), _params: params.clone(), end_iter: false, + nums: 0, } } } @@ -225,10 +227,32 @@ impl SerializeAggregateStream { } match block { - Some(block) => Ok(Some(block.add_meta(Some( - AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count), - ))?)), - None => Ok(None), + Some(block) => { + self.nums += 1; + Ok(Some(block.add_meta(Some( + AggregateSerdeMeta::create_agg_payload( + p.bucket, + p.max_partition_count, + false, + ), + ))?)) + } + None => { + // always return at least one block + if self.nums == 0 { + self.nums += 1; + let block = p.payload.empty_block(Some(1)); + Ok(Some(block.add_meta(Some( + AggregateSerdeMeta::create_agg_payload( + p.bucket, + p.max_partition_count, + true, + ), + ))?)) + } else { + Ok(None) + } + } } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs index c73799432d17d..ed82428bbc7d2 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs @@ -89,18 +89,26 @@ impl TransformDeserializer { } Some(meta) => { return match meta.typ == BUCKET_TYPE { - true => Ok(DataBlock::empty_with_meta( - AggregateMeta::create_serialized( - meta.bucket, - deserialize_block( - dict, - fragment_data, - &self.schema, - self.arrow_schema.clone(), - )?, - meta.max_partition_count, - ), - )), + true => { + let mut block = deserialize_block( + dict, + fragment_data, + &self.schema, + self.arrow_schema.clone(), + )?; + + if meta.is_empty { + block = block.slice(0..0); + } + + Ok(DataBlock::empty_with_meta( + AggregateMeta::create_serialized( + meta.bucket, + block, + meta.max_partition_count, + ), + )) + } false => { let data_schema = Arc::new(exchange_defines::spilled_schema()); let arrow_schema = Arc::new(exchange_defines::spilled_arrow_schema()); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 65e9e5cdc513a..a034b86038acb 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -149,6 +149,8 @@ impl BlockMetaTransform for TransformExchangeAggregateSeria } Some(AggregateMeta::AggregatePayload(p)) => { + let (bucket, max_partition_count) = (p.bucket, p.max_partition_count); + if index == self.local_pos { serialized_blocks.push(FlightSerialized::DataBlock( block.add_meta(Some(Box::new(AggregateMeta::AggregatePayload(p))))?, @@ -156,24 +158,19 @@ impl BlockMetaTransform for TransformExchangeAggregateSeria continue; } - let bucket = compute_block_number(p.bucket, p.max_partition_count)?; + let block_number = compute_block_number(bucket, max_partition_count)?; let stream = SerializeAggregateStream::create( &self.params, SerializePayload::AggregatePayload(p), ); let mut stream_blocks = stream.into_iter().collect::>>()?; - - if stream_blocks.is_empty() { - serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); - } else { - let mut c = DataBlock::concat(&stream_blocks)?; - if let Some(meta) = stream_blocks[0].take_meta() { - c.replace_meta(meta); - } - - let c = serialize_block(bucket, c, &self.options)?; - serialized_blocks.push(FlightSerialized::DataBlock(c)); + debug_assert!(!stream_blocks.is_empty()); + let mut c = DataBlock::concat(&stream_blocks)?; + if let Some(meta) = stream_blocks[0].take_meta() { + c.replace_meta(meta); } + let c = serialize_block(block_number, c, &self.options)?; + serialized_blocks.push(FlightSerialized::DataBlock(c)); } }; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index 2a719a23c23d8..beded12043100 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -59,6 +59,7 @@ impl TransformFinalAggregate { AggregateMeta::Serialized(payload) => match agg_hashtable.as_mut() { Some(ht) => { debug_assert!(bucket == payload.bucket); + let payload = payload.convert_to_partitioned_payload( self.params.group_data_types.clone(), self.params.aggregate_functions.clone(), diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs b/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs index 5c44b2be9cbf6..24991a001a112 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs @@ -125,6 +125,10 @@ impl IEJoinState { fn intersection(&self, left_block: &DataBlock, right_block: &DataBlock) -> bool { let left_len = left_block.num_rows(); let right_len = right_block.num_rows(); + if left_len == 0 || right_len == 0 { + return false; + } + let left_l1_column = left_block.columns()[0] .value .convert_to_full_column(&self.l1_data_type, left_len); diff --git a/src/query/service/src/pipelines/processors/transforms/transform_srf.rs b/src/query/service/src/pipelines/processors/transforms/transform_srf.rs index 63e6b94b3bab5..ab71d492ed2cf 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_srf.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_srf.rs @@ -113,6 +113,9 @@ impl BlockingTransform for TransformSRF { } let input = self.input.take().unwrap(); + if input.is_empty() { + return Ok(None); + } let mut result_size = 0; let mut used = 0; From 7af80b114c0ef66a0ee118ec8dfcc27a39400747 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 13 Jan 2025 13:00:40 +0800 Subject: [PATCH 2/3] fix(query): increase state_rows in copy agg state rows (#17252) * fix(query): increase state_rows in copy agg state rows * fix(query): increase state_rows in copy agg state rows * fix(query): increase state_rows in copy agg state rows --- src/query/expression/src/aggregate/payload.rs | 1 + .../20+_others/20_0022_agg_memory.result | 6 +++ .../20+_others/20_0022_agg_memory.sh | 46 +++++++++++++++++++ 3 files changed, 53 insertions(+) create mode 100755 tests/suites/0_stateless/20+_others/20_0022_agg_memory.result create mode 100755 tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh diff --git a/src/query/expression/src/aggregate/payload.rs b/src/query/expression/src/aggregate/payload.rs index 504699715e466..c3cde64a989f4 100644 --- a/src/query/expression/src/aggregate/payload.rs +++ b/src/query/expression/src/aggregate/payload.rs @@ -339,6 +339,7 @@ impl Payload { ) } page.rows += 1; + page.state_rows += 1; if page.rows == page.capacity { page = self.writable_page(); diff --git a/tests/suites/0_stateless/20+_others/20_0022_agg_memory.result b/tests/suites/0_stateless/20+_others/20_0022_agg_memory.result new file mode 100755 index 0000000000000..f32baf89a114b --- /dev/null +++ b/tests/suites/0_stateless/20+_others/20_0022_agg_memory.result @@ -0,0 +1,6 @@ +executing 1 +executing 2 +executing 3 +executing 4 +executing 5 +Memory usage difference is less than 5% diff --git a/tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh b/tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh new file mode 100755 index 0000000000000..e8602a4ae66a0 --- /dev/null +++ b/tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + + +## warmup +for i in `seq 1 2`;do + $BENDSQL_CLIENT_CONNECT --query=""" + select number::string , max(number::string), min(number::string), count(distinct number) from numbers(10000000) group by 1 ignore_result; + """ +done + + +PIDS=($(pgrep databend-query)) +BEFORE_MEM=0 +for PID in "${PIDS[@]}"; do + MEM=$(ps -o rss= -p $PID | tail -n 1) + BEFORE_MEM=$((BEFORE_MEM + MEM)) +done + + +for i in `seq 1 5`;do + echo "executing $i" + $BENDSQL_CLIENT_CONNECT --query=""" + select number::string , max(number::string), min(number::string), count(distinct number) from numbers(10000000) group by 1 ignore_result; + """ +done + +sleep 15 + + +AFTER_MEM=0 +for PID in "${PIDS[@]}"; do + MEM=$(ps -o rss= -p $PID | tail -n 1) + AFTER_MEM=$((AFTER_MEM + MEM)) +done + +# Calculate the difference in percentage +DIFF=$(awk -v before=$BEFORE_MEM -v after=$AFTER_MEM 'BEGIN {print (after-before)/before * 100}') + +# Check if the difference is less than 5% +if (( $(awk -v diff=$DIFF 'BEGIN {print (diff < 5)}') )); then + echo "Memory usage difference is less than 5%" +else + echo "Memory usage difference is greater than 5%, before ${BEFORE_MEM} ${AFTER_MEM}" +fi \ No newline at end of file From 867c6019d5cde2cbacacafc36a02709ee7742e3b Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Mon, 13 Jan 2025 21:19:09 +0800 Subject: [PATCH 3/3] Revert "fix(query): increase state_rows in copy agg state rows (#17252)" This reverts commit 7af80b114c0ef66a0ee118ec8dfcc27a39400747. --- src/query/expression/src/aggregate/payload.rs | 1 - .../20+_others/20_0022_agg_memory.result | 6 --- .../20+_others/20_0022_agg_memory.sh | 46 ------------------- 3 files changed, 53 deletions(-) delete mode 100755 tests/suites/0_stateless/20+_others/20_0022_agg_memory.result delete mode 100755 tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh diff --git a/src/query/expression/src/aggregate/payload.rs b/src/query/expression/src/aggregate/payload.rs index c3cde64a989f4..504699715e466 100644 --- a/src/query/expression/src/aggregate/payload.rs +++ b/src/query/expression/src/aggregate/payload.rs @@ -339,7 +339,6 @@ impl Payload { ) } page.rows += 1; - page.state_rows += 1; if page.rows == page.capacity { page = self.writable_page(); diff --git a/tests/suites/0_stateless/20+_others/20_0022_agg_memory.result b/tests/suites/0_stateless/20+_others/20_0022_agg_memory.result deleted file mode 100755 index f32baf89a114b..0000000000000 --- a/tests/suites/0_stateless/20+_others/20_0022_agg_memory.result +++ /dev/null @@ -1,6 +0,0 @@ -executing 1 -executing 2 -executing 3 -executing 4 -executing 5 -Memory usage difference is less than 5% diff --git a/tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh b/tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh deleted file mode 100755 index e8602a4ae66a0..0000000000000 --- a/tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env bash -CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -. "$CURDIR"/../../../shell_env.sh - - -## warmup -for i in `seq 1 2`;do - $BENDSQL_CLIENT_CONNECT --query=""" - select number::string , max(number::string), min(number::string), count(distinct number) from numbers(10000000) group by 1 ignore_result; - """ -done - - -PIDS=($(pgrep databend-query)) -BEFORE_MEM=0 -for PID in "${PIDS[@]}"; do - MEM=$(ps -o rss= -p $PID | tail -n 1) - BEFORE_MEM=$((BEFORE_MEM + MEM)) -done - - -for i in `seq 1 5`;do - echo "executing $i" - $BENDSQL_CLIENT_CONNECT --query=""" - select number::string , max(number::string), min(number::string), count(distinct number) from numbers(10000000) group by 1 ignore_result; - """ -done - -sleep 15 - - -AFTER_MEM=0 -for PID in "${PIDS[@]}"; do - MEM=$(ps -o rss= -p $PID | tail -n 1) - AFTER_MEM=$((AFTER_MEM + MEM)) -done - -# Calculate the difference in percentage -DIFF=$(awk -v before=$BEFORE_MEM -v after=$AFTER_MEM 'BEGIN {print (after-before)/before * 100}') - -# Check if the difference is less than 5% -if (( $(awk -v diff=$DIFF 'BEGIN {print (diff < 5)}') )); then - echo "Memory usage difference is less than 5%" -else - echo "Memory usage difference is greater than 5%, before ${BEFORE_MEM} ${AFTER_MEM}" -fi \ No newline at end of file