diff --git a/src/query/expression/src/aggregate/partitioned_payload.rs b/src/query/expression/src/aggregate/partitioned_payload.rs index 2eca5510d948e..f813355c18a83 100644 --- a/src/query/expression/src/aggregate/partitioned_payload.rs +++ b/src/query/expression/src/aggregate/partitioned_payload.rs @@ -108,7 +108,6 @@ impl PartitionedPayload { &state.empty_vector, &state.group_hashes, &mut state.addresses, - &mut state.page_index, new_group_rows, group_columns, ); @@ -135,7 +134,6 @@ impl PartitionedPayload { sel, &state.group_hashes, &mut state.addresses, - &mut state.page_index, count, group_columns, ); diff --git a/src/query/expression/src/aggregate/payload.rs b/src/query/expression/src/aggregate/payload.rs index 562b2a899f9bf..f4001d0fe5c0a 100644 --- a/src/query/expression/src/aggregate/payload.rs +++ b/src/query/expression/src/aggregate/payload.rs @@ -78,7 +78,6 @@ unsafe impl Sync for Payload {} pub struct Page { pub(crate) data: Vec>, pub(crate) rows: usize, - pub(crate) state_rows: usize, pub(crate) capacity: usize, } @@ -168,26 +167,21 @@ impl Payload { } #[inline] - pub fn writable_page(&mut self) -> (&mut Page, usize) { + pub fn writable_page(&mut self) -> &mut Page { if self.current_write_page == 0 || self.pages[self.current_write_page - 1].rows == self.pages[self.current_write_page - 1].capacity { self.current_write_page += 1; if self.current_write_page > self.pages.len() { - let data = Vec::with_capacity(self.row_per_page * self.tuple_size); self.pages.push(Page { - data, + data: Vec::with_capacity(self.row_per_page * self.tuple_size), rows: 0, - state_rows: 0, capacity: self.row_per_page, }); } } - ( - &mut self.pages[self.current_write_page - 1], - self.current_write_page - 1, - ) + &mut self.pages[self.current_write_page - 1] } #[inline] @@ -200,27 +194,31 @@ impl Payload { select_vector: &SelectVector, group_hashes: &[u64], address: &mut [*const u8], - page_index: &mut [usize], new_group_rows: usize, group_columns: InputColumns, ) { let tuple_size = self.tuple_size; - let (mut page, mut page_index_value) = self.writable_page(); + let mut page = self.writable_page(); for idx in select_vector.iter().take(new_group_rows).copied() { address[idx] = unsafe { page.data.as_ptr().add(page.rows * tuple_size) as *const u8 }; - page_index[idx] = page_index_value; page.rows += 1; if page.rows == page.capacity { - (page, page_index_value) = self.writable_page(); + page = self.writable_page(); } } + self.total_rows += new_group_rows; + + debug_assert_eq!( + self.total_rows, + self.pages.iter().map(|x| x.rows).sum::() + ); + self.append_rows( select_vector, group_hashes, address, - page_index, new_group_rows, group_columns, ) @@ -231,7 +229,6 @@ impl Payload { select_vector: &SelectVector, group_hashes: &[u64], address: &mut [*const u8], - page_index: &mut [usize], new_group_rows: usize, group_columns: InputColumns, ) { @@ -303,16 +300,8 @@ impl Payload { for (aggr, offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) { aggr.init_state(place.next(*offset)); } - self.pages[page_index[idx]].state_rows += 1; } } - - self.total_rows += new_group_rows; - - debug_assert_eq!( - self.total_rows, - self.pages.iter().map(|x| x.rows).sum::() - ); } pub fn combine(&mut self, mut other: Payload) { @@ -338,7 +327,7 @@ impl Payload { address: &[*const u8], ) { let tuple_size = self.tuple_size; - let (mut page, _) = self.writable_page(); + let mut page = self.writable_page(); for i in 0..row_count { let index = select_vector[i]; @@ -350,10 +339,9 @@ impl Payload { ) } page.rows += 1; - page.state_rows += 1; if page.rows == page.capacity { - (page, _) = self.writable_page(); + page = self.writable_page(); } } @@ -424,12 +412,18 @@ impl Drop for Payload { if !self.state_move_out { for (aggr, addr_offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) { if aggr.need_manual_drop_state() { - for page in self.pages.iter() { - for row in 0..page.state_rows { + 'PAGE_END: for page in self.pages.iter() { + for row in 0..page.rows { let ptr = self.data_ptr(page, row); unsafe { let state_addr = read::(ptr.add(self.state_offset) as _) as usize; + + // row is reserved, but not written (maybe throw by oom error) + if state_addr == 0 { + break 'PAGE_END; + } + let state_place = StateAddr::new(state_addr); aggr.drop_state(state_place.next(*addr_offset)); } diff --git a/src/query/expression/src/aggregate/probe_state.rs b/src/query/expression/src/aggregate/probe_state.rs index 896c1ff46cca9..bdeb5f0854a9c 100644 --- a/src/query/expression/src/aggregate/probe_state.rs +++ b/src/query/expression/src/aggregate/probe_state.rs @@ -22,7 +22,6 @@ use crate::BATCH_SIZE; pub struct ProbeState { pub group_hashes: [u64; BATCH_SIZE], pub addresses: [*const u8; BATCH_SIZE], - pub page_index: [usize; BATCH_SIZE], pub state_places: [StateAddr; BATCH_SIZE], pub group_compare_vector: SelectVector, pub no_match_vector: SelectVector, @@ -39,7 +38,6 @@ impl Default for ProbeState { Self { group_hashes: [0_u64; BATCH_SIZE], addresses: [std::ptr::null::(); BATCH_SIZE], - page_index: [0; BATCH_SIZE], state_places: [StateAddr::new(0); BATCH_SIZE], group_compare_vector: new_sel(), no_match_vector: new_sel(), diff --git a/tests/suites/0_stateless/20+_others/20_0022_agg_memory.result b/tests/suites/0_stateless/20+_others/20_0022_agg_memory.result deleted file mode 100755 index f32baf89a114b..0000000000000 --- a/tests/suites/0_stateless/20+_others/20_0022_agg_memory.result +++ /dev/null @@ -1,6 +0,0 @@ -executing 1 -executing 2 -executing 3 -executing 4 -executing 5 -Memory usage difference is less than 5% diff --git a/tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh b/tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh deleted file mode 100755 index e8602a4ae66a0..0000000000000 --- a/tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env bash -CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -. "$CURDIR"/../../../shell_env.sh - - -## warmup -for i in `seq 1 2`;do - $BENDSQL_CLIENT_CONNECT --query=""" - select number::string , max(number::string), min(number::string), count(distinct number) from numbers(10000000) group by 1 ignore_result; - """ -done - - -PIDS=($(pgrep databend-query)) -BEFORE_MEM=0 -for PID in "${PIDS[@]}"; do - MEM=$(ps -o rss= -p $PID | tail -n 1) - BEFORE_MEM=$((BEFORE_MEM + MEM)) -done - - -for i in `seq 1 5`;do - echo "executing $i" - $BENDSQL_CLIENT_CONNECT --query=""" - select number::string , max(number::string), min(number::string), count(distinct number) from numbers(10000000) group by 1 ignore_result; - """ -done - -sleep 15 - - -AFTER_MEM=0 -for PID in "${PIDS[@]}"; do - MEM=$(ps -o rss= -p $PID | tail -n 1) - AFTER_MEM=$((AFTER_MEM + MEM)) -done - -# Calculate the difference in percentage -DIFF=$(awk -v before=$BEFORE_MEM -v after=$AFTER_MEM 'BEGIN {print (after-before)/before * 100}') - -# Check if the difference is less than 5% -if (( $(awk -v diff=$DIFF 'BEGIN {print (diff < 5)}') )); then - echo "Memory usage difference is less than 5%" -else - echo "Memory usage difference is greater than 5%, before ${BEFORE_MEM} ${AFTER_MEM}" -fi \ No newline at end of file