Skip to content

Commit 96394fe

Browse files
committed
Revert "fix(query): record state_rows in aggregate payload (databendlabs#17194)"
This reverts commit 64c53b2.
1 parent 71debec commit 96394fe

File tree

3 files changed

+22
-31
lines changed

3 files changed

+22
-31
lines changed

src/query/expression/src/aggregate/partitioned_payload.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ impl PartitionedPayload {
108108
&state.empty_vector,
109109
&state.group_hashes,
110110
&mut state.addresses,
111-
&mut state.page_index,
112111
new_group_rows,
113112
group_columns,
114113
);
@@ -135,7 +134,6 @@ impl PartitionedPayload {
135134
sel,
136135
&state.group_hashes,
137136
&mut state.addresses,
138-
&mut state.page_index,
139137
count,
140138
group_columns,
141139
);

src/query/expression/src/aggregate/payload.rs

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ unsafe impl Sync for Payload {}
7878
pub struct Page {
7979
pub(crate) data: Vec<MaybeUninit<u8>>,
8080
pub(crate) rows: usize,
81-
pub(crate) state_rows: usize,
8281
pub(crate) capacity: usize,
8382
}
8483

@@ -168,26 +167,21 @@ impl Payload {
168167
}
169168

170169
#[inline]
171-
pub fn writable_page(&mut self) -> (&mut Page, usize) {
170+
pub fn writable_page(&mut self) -> &mut Page {
172171
if self.current_write_page == 0
173172
|| self.pages[self.current_write_page - 1].rows
174173
== self.pages[self.current_write_page - 1].capacity
175174
{
176175
self.current_write_page += 1;
177176
if self.current_write_page > self.pages.len() {
178-
let data = Vec::with_capacity(self.row_per_page * self.tuple_size);
179177
self.pages.push(Page {
180-
data,
178+
data: Vec::with_capacity(self.row_per_page * self.tuple_size),
181179
rows: 0,
182-
state_rows: 0,
183180
capacity: self.row_per_page,
184181
});
185182
}
186183
}
187-
(
188-
&mut self.pages[self.current_write_page - 1],
189-
self.current_write_page - 1,
190-
)
184+
&mut self.pages[self.current_write_page - 1]
191185
}
192186

193187
#[inline]
@@ -200,27 +194,31 @@ impl Payload {
200194
select_vector: &SelectVector,
201195
group_hashes: &[u64],
202196
address: &mut [*const u8],
203-
page_index: &mut [usize],
204197
new_group_rows: usize,
205198
group_columns: InputColumns,
206199
) {
207200
let tuple_size = self.tuple_size;
208-
let (mut page, mut page_index_value) = self.writable_page();
201+
let mut page = self.writable_page();
209202
for idx in select_vector.iter().take(new_group_rows).copied() {
210203
address[idx] = unsafe { page.data.as_ptr().add(page.rows * tuple_size) as *const u8 };
211-
page_index[idx] = page_index_value;
212204
page.rows += 1;
213205

214206
if page.rows == page.capacity {
215-
(page, page_index_value) = self.writable_page();
207+
page = self.writable_page();
216208
}
217209
}
218210

211+
self.total_rows += new_group_rows;
212+
213+
debug_assert_eq!(
214+
self.total_rows,
215+
self.pages.iter().map(|x| x.rows).sum::<usize>()
216+
);
217+
219218
self.append_rows(
220219
select_vector,
221220
group_hashes,
222221
address,
223-
page_index,
224222
new_group_rows,
225223
group_columns,
226224
)
@@ -231,7 +229,6 @@ impl Payload {
231229
select_vector: &SelectVector,
232230
group_hashes: &[u64],
233231
address: &mut [*const u8],
234-
page_index: &mut [usize],
235232
new_group_rows: usize,
236233
group_columns: InputColumns,
237234
) {
@@ -303,16 +300,8 @@ impl Payload {
303300
for (aggr, offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) {
304301
aggr.init_state(place.next(*offset));
305302
}
306-
self.pages[page_index[idx]].state_rows += 1;
307303
}
308304
}
309-
310-
self.total_rows += new_group_rows;
311-
312-
debug_assert_eq!(
313-
self.total_rows,
314-
self.pages.iter().map(|x| x.rows).sum::<usize>()
315-
);
316305
}
317306

318307
pub fn combine(&mut self, mut other: Payload) {
@@ -338,7 +327,7 @@ impl Payload {
338327
address: &[*const u8],
339328
) {
340329
let tuple_size = self.tuple_size;
341-
let (mut page, _) = self.writable_page();
330+
let mut page = self.writable_page();
342331
for i in 0..row_count {
343332
let index = select_vector[i];
344333

@@ -352,7 +341,7 @@ impl Payload {
352341
page.rows += 1;
353342

354343
if page.rows == page.capacity {
355-
(page, _) = self.writable_page();
344+
page = self.writable_page();
356345
}
357346
}
358347

@@ -423,12 +412,18 @@ impl Drop for Payload {
423412
if !self.state_move_out {
424413
for (aggr, addr_offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) {
425414
if aggr.need_manual_drop_state() {
426-
for page in self.pages.iter() {
427-
for row in 0..page.state_rows {
415+
'PAGE_END: for page in self.pages.iter() {
416+
for row in 0..page.rows {
428417
let ptr = self.data_ptr(page, row);
429418
unsafe {
430419
let state_addr =
431420
read::<u64>(ptr.add(self.state_offset) as _) as usize;
421+
422+
// row is reserved, but not written (maybe throw by oom error)
423+
if state_addr == 0 {
424+
break 'PAGE_END;
425+
}
426+
432427
let state_place = StateAddr::new(state_addr);
433428
aggr.drop_state(state_place.next(*addr_offset));
434429
}

src/query/expression/src/aggregate/probe_state.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use crate::BATCH_SIZE;
2222
pub struct ProbeState {
2323
pub group_hashes: [u64; BATCH_SIZE],
2424
pub addresses: [*const u8; BATCH_SIZE],
25-
pub page_index: [usize; BATCH_SIZE],
2625
pub state_places: [StateAddr; BATCH_SIZE],
2726
pub group_compare_vector: SelectVector,
2827
pub no_match_vector: SelectVector,
@@ -39,7 +38,6 @@ impl Default for ProbeState {
3938
Self {
4039
group_hashes: [0_u64; BATCH_SIZE],
4140
addresses: [std::ptr::null::<u8>(); BATCH_SIZE],
42-
page_index: [0; BATCH_SIZE],
4341
state_places: [StateAddr::new(0); BATCH_SIZE],
4442
group_compare_vector: new_sel(),
4543
no_match_vector: new_sel(),

0 commit comments

Comments
 (0)