Skip to content

Commit 64c53b2

Browse files
authored
fix(query): record state_rows in aggregate payload (#17194)
* chore(query): use CommonHashSet to store AggregateUniqStringState * fix(query): check state is allocated or not when oom * fix(query): zero init pages * fix(query): zero init pages * update
1 parent a83eb8f commit 64c53b2

File tree

3 files changed

+31
-22
lines changed

3 files changed

+31
-22
lines changed

src/query/expression/src/aggregate/partitioned_payload.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ impl PartitionedPayload {
108108
&state.empty_vector,
109109
&state.group_hashes,
110110
&mut state.addresses,
111+
&mut state.page_index,
111112
new_group_rows,
112113
group_columns,
113114
);
@@ -134,6 +135,7 @@ impl PartitionedPayload {
134135
sel,
135136
&state.group_hashes,
136137
&mut state.addresses,
138+
&mut state.page_index,
137139
count,
138140
group_columns,
139141
);

src/query/expression/src/aggregate/payload.rs

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ unsafe impl Sync for Payload {}
7878
pub struct Page {
7979
pub(crate) data: Vec<MaybeUninit<u8>>,
8080
pub(crate) rows: usize,
81+
pub(crate) state_rows: usize,
8182
pub(crate) capacity: usize,
8283
}
8384

@@ -167,21 +168,26 @@ impl Payload {
167168
}
168169

169170
#[inline]
170-
pub fn writable_page(&mut self) -> &mut Page {
171+
pub fn writable_page(&mut self) -> (&mut Page, usize) {
171172
if self.current_write_page == 0
172173
|| self.pages[self.current_write_page - 1].rows
173174
== self.pages[self.current_write_page - 1].capacity
174175
{
175176
self.current_write_page += 1;
176177
if self.current_write_page > self.pages.len() {
178+
let data = Vec::with_capacity(self.row_per_page * self.tuple_size);
177179
self.pages.push(Page {
178-
data: Vec::with_capacity(self.row_per_page * self.tuple_size),
180+
data,
179181
rows: 0,
182+
state_rows: 0,
180183
capacity: self.row_per_page,
181184
});
182185
}
183186
}
184-
&mut self.pages[self.current_write_page - 1]
187+
(
188+
&mut self.pages[self.current_write_page - 1],
189+
self.current_write_page - 1,
190+
)
185191
}
186192

187193
#[inline]
@@ -194,31 +200,27 @@ impl Payload {
194200
select_vector: &SelectVector,
195201
group_hashes: &[u64],
196202
address: &mut [*const u8],
203+
page_index: &mut [usize],
197204
new_group_rows: usize,
198205
group_columns: InputColumns,
199206
) {
200207
let tuple_size = self.tuple_size;
201-
let mut page = self.writable_page();
208+
let (mut page, mut page_index_value) = self.writable_page();
202209
for idx in select_vector.iter().take(new_group_rows).copied() {
203210
address[idx] = unsafe { page.data.as_ptr().add(page.rows * tuple_size) as *const u8 };
211+
page_index[idx] = page_index_value;
204212
page.rows += 1;
205213

206214
if page.rows == page.capacity {
207-
page = self.writable_page();
215+
(page, page_index_value) = self.writable_page();
208216
}
209217
}
210218

211-
self.total_rows += new_group_rows;
212-
213-
debug_assert_eq!(
214-
self.total_rows,
215-
self.pages.iter().map(|x| x.rows).sum::<usize>()
216-
);
217-
218219
self.append_rows(
219220
select_vector,
220221
group_hashes,
221222
address,
223+
page_index,
222224
new_group_rows,
223225
group_columns,
224226
)
@@ -229,6 +231,7 @@ impl Payload {
229231
select_vector: &SelectVector,
230232
group_hashes: &[u64],
231233
address: &mut [*const u8],
234+
page_index: &mut [usize],
232235
new_group_rows: usize,
233236
group_columns: InputColumns,
234237
) {
@@ -300,8 +303,16 @@ impl Payload {
300303
for (aggr, offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) {
301304
aggr.init_state(place.next(*offset));
302305
}
306+
self.pages[page_index[idx]].state_rows += 1;
303307
}
304308
}
309+
310+
self.total_rows += new_group_rows;
311+
312+
debug_assert_eq!(
313+
self.total_rows,
314+
self.pages.iter().map(|x| x.rows).sum::<usize>()
315+
);
305316
}
306317

307318
pub fn combine(&mut self, mut other: Payload) {
@@ -327,7 +338,7 @@ impl Payload {
327338
address: &[*const u8],
328339
) {
329340
let tuple_size = self.tuple_size;
330-
let mut page = self.writable_page();
341+
let (mut page, _) = self.writable_page();
331342
for i in 0..row_count {
332343
let index = select_vector[i];
333344

@@ -341,7 +352,7 @@ impl Payload {
341352
page.rows += 1;
342353

343354
if page.rows == page.capacity {
344-
page = self.writable_page();
355+
(page, _) = self.writable_page();
345356
}
346357
}
347358

@@ -412,18 +423,12 @@ impl Drop for Payload {
412423
if !self.state_move_out {
413424
for (aggr, addr_offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) {
414425
if aggr.need_manual_drop_state() {
415-
'PAGE_END: for page in self.pages.iter() {
416-
for row in 0..page.rows {
426+
for page in self.pages.iter() {
427+
for row in 0..page.state_rows {
417428
let ptr = self.data_ptr(page, row);
418429
unsafe {
419430
let state_addr =
420431
read::<u64>(ptr.add(self.state_offset) as _) as usize;
421-
422-
// row is reserved, but not written (maybe throw by oom error)
423-
if state_addr == 0 {
424-
break 'PAGE_END;
425-
}
426-
427432
let state_place = StateAddr::new(state_addr);
428433
aggr.drop_state(state_place.next(*addr_offset));
429434
}

src/query/expression/src/aggregate/probe_state.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::BATCH_SIZE;
2222
pub struct ProbeState {
2323
pub group_hashes: [u64; BATCH_SIZE],
2424
pub addresses: [*const u8; BATCH_SIZE],
25+
pub page_index: [usize; BATCH_SIZE],
2526
pub state_places: [StateAddr; BATCH_SIZE],
2627
pub group_compare_vector: SelectVector,
2728
pub no_match_vector: SelectVector,
@@ -38,6 +39,7 @@ impl Default for ProbeState {
3839
Self {
3940
group_hashes: [0_u64; BATCH_SIZE],
4041
addresses: [std::ptr::null::<u8>(); BATCH_SIZE],
42+
page_index: [0; BATCH_SIZE],
4143
state_places: [StateAddr::new(0); BATCH_SIZE],
4244
group_compare_vector: new_sel(),
4345
no_match_vector: new_sel(),

0 commit comments

Comments
 (0)