Skip to content

Commit 625e31a

Browse files
authored
refactor(query): Multiple states Aggregate function (#17148)
* refine * AggrState * ref AggrState * AggrStateLoc * fix * fix * fix * num_states * get_state_layout * fix * new ornull * fix * fix * AggregatePartial output_schema * revert mutil serialize * fix * refine * fix * update * update * fix * fix * refine * fix * fix * update * array alloc * test * comment * fix * fix
1 parent 4db02ec commit 625e31a

File tree

55 files changed

+1685
-1400
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1685
-1400
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/expression/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ arrow-ord = { workspace = true }
6969
criterion = { workspace = true }
7070
goldenfile = { workspace = true }
7171
pretty_assertions = { workspace = true }
72+
proptest = { workspace = true }
7273
rand = { workspace = true }
7374

7475
[[bench]]

src/query/expression/src/aggregate/aggregate_function.rs

Lines changed: 35 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,17 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::alloc::Layout;
1615
use std::fmt;
1716
use std::sync::Arc;
1817

1918
use databend_common_column::bitmap::Bitmap;
2019
use databend_common_exception::Result;
2120

21+
use super::AggrState;
22+
use super::AggrStateLoc;
23+
use super::AggrStateRegistry;
2224
use super::StateAddr;
23-
use crate::types::binary::BinaryColumnBuilder;
25+
use crate::types::BinaryColumn;
2426
use crate::types::DataType;
2527
use crate::Column;
2628
use crate::ColumnBuilder;
@@ -35,76 +37,61 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
3537
fn name(&self) -> &str;
3638
fn return_type(&self) -> Result<DataType>;
3739

38-
fn init_state(&self, place: StateAddr);
40+
fn init_state(&self, place: AggrState);
3941

40-
fn is_state(&self) -> bool {
41-
false
42-
}
43-
44-
fn state_layout(&self) -> Layout;
42+
fn register_state(&self, registry: &mut AggrStateRegistry);
4543

4644
// accumulate is to accumulate the arrays in batch mode
4745
// common used when there is no group by for aggregate function
4846
fn accumulate(
4947
&self,
50-
_place: StateAddr,
51-
_columns: InputColumns,
52-
_validity: Option<&Bitmap>,
53-
_input_rows: usize,
48+
place: AggrState,
49+
columns: InputColumns,
50+
validity: Option<&Bitmap>,
51+
input_rows: usize,
5452
) -> Result<()>;
5553

5654
// used when we need to calculate with group keys
5755
fn accumulate_keys(
5856
&self,
59-
places: &[StateAddr],
60-
offset: usize,
57+
addrs: &[StateAddr],
58+
loc: &[AggrStateLoc],
6159
columns: InputColumns,
6260
_input_rows: usize,
6361
) -> Result<()> {
64-
for (row, place) in places.iter().enumerate() {
65-
self.accumulate_row(place.next(offset), columns, row)?;
62+
for (row, addr) in addrs.iter().enumerate() {
63+
self.accumulate_row(AggrState::new(*addr, loc), columns, row)?;
6664
}
6765
Ok(())
6866
}
6967

7068
// Used in aggregate_null_adaptor
71-
fn accumulate_row(&self, _place: StateAddr, _columns: InputColumns, _row: usize) -> Result<()>;
69+
fn accumulate_row(&self, place: AggrState, columns: InputColumns, row: usize) -> Result<()>;
7270

73-
// serialize the state into binary array
74-
fn batch_serialize(
75-
&self,
76-
places: &[StateAddr],
77-
offset: usize,
78-
builder: &mut BinaryColumnBuilder,
79-
) -> Result<()> {
80-
for place in places {
81-
self.serialize(place.next(offset), &mut builder.data)?;
82-
builder.commit_row();
83-
}
84-
Ok(())
85-
}
86-
87-
fn serialize(&self, _place: StateAddr, _writer: &mut Vec<u8>) -> Result<()>;
71+
fn serialize(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()>;
8872

8973
fn serialize_size_per_row(&self) -> Option<usize> {
9074
None
9175
}
9276

93-
fn merge(&self, _place: StateAddr, _reader: &mut &[u8]) -> Result<()>;
77+
fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()>;
9478

9579
/// Batch merge and deserialize the state from binary array
96-
fn batch_merge(&self, places: &[StateAddr], offset: usize, column: &Column) -> Result<()> {
97-
let c = column.as_binary().unwrap();
98-
for (place, mut data) in places.iter().zip(c.iter()) {
99-
self.merge(place.next(offset), &mut data)?;
80+
fn batch_merge(
81+
&self,
82+
places: &[StateAddr],
83+
loc: &[AggrStateLoc],
84+
state: &BinaryColumn,
85+
) -> Result<()> {
86+
for (place, mut data) in places.iter().zip(state.iter()) {
87+
self.merge(AggrState::new(*place, loc), &mut data)?;
10088
}
10189

10290
Ok(())
10391
}
10492

105-
fn batch_merge_single(&self, place: StateAddr, column: &Column) -> Result<()> {
106-
let c = column.as_binary().unwrap();
107-
93+
fn batch_merge_single(&self, place: AggrState, state: &Column) -> Result<()> {
94+
let c = state.as_binary().unwrap();
10895
for mut data in c.iter() {
10996
self.merge(place, &mut data)?;
11097
}
@@ -115,29 +102,29 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
115102
&self,
116103
places: &[StateAddr],
117104
rhses: &[StateAddr],
118-
offset: usize,
105+
loc: &[AggrStateLoc],
119106
) -> Result<()> {
120107
for (place, rhs) in places.iter().zip(rhses.iter()) {
121-
self.merge_states(place.next(offset), rhs.next(offset))?;
108+
self.merge_states(AggrState::new(*place, loc), AggrState::new(*rhs, loc))?;
122109
}
123110
Ok(())
124111
}
125112

126-
fn merge_states(&self, _place: StateAddr, _rhs: StateAddr) -> Result<()>;
113+
fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()>;
127114

128115
fn batch_merge_result(
129116
&self,
130117
places: &[StateAddr],
131-
offset: usize,
118+
loc: Box<[AggrStateLoc]>,
132119
builder: &mut ColumnBuilder,
133120
) -> Result<()> {
134121
for place in places {
135-
self.merge_result(place.next(offset), builder)?;
122+
self.merge_result(AggrState::new(*place, &loc), builder)?;
136123
}
137124
Ok(())
138125
}
139-
// TODO append the value into the column builder
140-
fn merge_result(&self, _place: StateAddr, _builder: &mut ColumnBuilder) -> Result<()>;
126+
127+
fn merge_result(&self, place: AggrState, builder: &mut ColumnBuilder) -> Result<()>;
141128

142129
// std::mem::needs_drop::<State>
143130
// if true will call drop_state
@@ -147,7 +134,7 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
147134

148135
/// # Safety
149136
/// The caller must ensure that the [`_place`] has defined memory.
150-
unsafe fn drop_state(&self, _place: StateAddr) {}
137+
unsafe fn drop_state(&self, _place: AggrState) {}
151138

152139
fn get_own_null_adaptor(
153140
&self,

0 commit comments

Comments
 (0)