Skip to content

Commit 964a086

Browse files
sundy-lieverpcpc
authored andcommitted
chore(query): cherrypick to 636: Decimal128 state use align 8 (#16989)
* chore(query): add more logs on aggregation (#16552) * add consume_convert_blocks: * update * add tests * add more logs * add more logs * chore(query): improve decimal aggregate state * update * update * update * chore(ci): upgrade bendsql (#16965) * chore(ci): install bendsql from packages (#16970) --------- Co-authored-by: everpcpc <[email protected]>
1 parent 0ab6572 commit 964a086

File tree

20 files changed

+528
-51
lines changed

20 files changed

+528
-51
lines changed

.github/actions/setup_bendsql/action.yml

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,34 @@ description: "Setup BendSQL for CI"
33
runs:
44
using: "composite"
55
steps:
6-
- name: Download and Install for Linux
6+
- name: Install for Linux
77
if: runner.os == 'Linux'
88
shell: bash
99
run: |
1010
if bendsql --version; then
1111
exit 0
1212
fi
13-
curl --retry 5 -Lo /tmp/bendsql.tar.gz https://github.com/datafuselabs/bendsql/releases/download/v0.18.3/bendsql-x86_64-unknown-linux-gnu.tar.gz
14-
tar -xzf /tmp/bendsql.tar.gz -C /tmp
15-
mv /tmp/bendsql /usr/local/bin/bendsql
13+
case $RUNNER_PROVIDER in
14+
aws)
15+
aws s3 cp s3://databend-ci/packages/bendsql_$(dpkg --print-architecture).deb /tmp/bendsql.deb
16+
sudo dpkg -i /tmp/bendsql.deb
17+
;;
18+
gcp)
19+
gsutil cp gs://databend-ci/packages/bendsql_$(dpkg --print-architecture).deb /tmp/bendsql.deb
20+
sudo dpkg -i /tmp/bendsql.deb
21+
;;
22+
*)
23+
curl -fsSL https://repo.databend.com/install/bendsql.sh | bash -s -- -y --prefix /usr/local
24+
;;
25+
esac
1626
bendsql --version
17-
- name: Download and Install for macOS
27+
28+
- name: Install for macOS
1829
if: runner.os == 'macOS'
1930
shell: bash
2031
run: |
2132
if bendsql --version; then
2233
exit 0
2334
fi
24-
curl --retry 5 -Lo /tmp/bendsql.tar.gz https://github.com/datafuselabs/bendsql/releases/download/v0.18.3/bendsql-x86_64-apple-darwin.tar.gz
25-
tar -xzf /tmp/bendsql.tar.gz -C /tmp
26-
mv /tmp/bendsql /usr/local/bin/bendsql
35+
brew install databendcloud/homebrew-tap/bendsql
2736
bendsql --version

Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,17 @@ useless_format = "allow"
349349
mutable_key_type = "allow"
350350
result_large_err = "allow"
351351

352+
## DONT'T DELETE THIS: If we want best performance, we should use this profile but it will take longer time to compile.
353+
## Test SQL:
354+
## select sum(number) from numbers_mt(10000000000); ~ 3x performance
355+
## select max(number) from numbers_mt(10000000000); ~ 3x performance
356+
# [profile.release]
357+
# debug = 1
358+
# lto = "thin"
359+
# overflow-checks = false
360+
# incremental = false
361+
# codegen-units = 1
362+
352363
[profile.release]
353364
debug = 1
354365
lto = "thin"

src/query/expression/src/aggregate/aggregate_hashtable.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub struct AggregateHashTable {
4949
// use for append rows directly during deserialize
5050
pub direct_append: bool,
5151
pub config: HashTableConfig,
52+
5253
current_radix_bits: u64,
5354
entries: Vec<Entry>,
5455
count: usize,
@@ -585,6 +586,7 @@ impl AggregateHashTable {
585586
.iter()
586587
.map(|arena| arena.allocated_bytes())
587588
.sum::<usize>()
589+
+ self.entries.len() * std::mem::size_of::<Entry>()
588590
}
589591
}
590592

src/query/expression/src/block.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,18 @@ impl DataBlock {
239239
self.columns().iter().map(|entry| entry.memory_size()).sum()
240240
}
241241

242+
pub fn consume_convert_to_full(self) -> Self {
243+
if self
244+
.columns()
245+
.iter()
246+
.all(|entry| entry.value.as_column().is_some())
247+
{
248+
return self;
249+
}
250+
251+
self.convert_to_full()
252+
}
253+
242254
pub fn convert_to_full(&self) -> Self {
243255
let columns = self
244256
.columns()

src/query/expression/src/converts/arrow/to.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl DataBlock {
101101
let arrow_schema = table_schema_to_arrow_schema(table_schema);
102102
let mut arrays = Vec::with_capacity(self.columns().len());
103103
for (entry, arrow_field) in self
104-
.convert_to_full()
104+
.consume_convert_to_full()
105105
.columns()
106106
.iter()
107107
.zip(arrow_schema.fields())

src/query/expression/src/types/decimal.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,21 +340,28 @@ pub trait Decimal:
340340
+ Clone
341341
+ PartialEq
342342
+ Eq
343+
+ std::ops::AddAssign
343344
+ PartialOrd
344345
+ Ord
345346
+ Sync
346347
+ Send
347348
+ 'static
348349
{
350+
// the Layout align size of i128 and i256 have changed
351+
// https://blog.rust-lang.org/2024/03/30/i128-layout-update.html
352+
// Here we keep this struct in aggregate state which minimize the align of the struct
353+
type U64Array: Send + Sync + Copy + Default + Debug;
349354
fn zero() -> Self;
350355
fn one() -> Self;
351356
fn minus_one() -> Self;
352357

353358
// 10**scale
354359
fn e(n: u32) -> Self;
355-
356360
fn mem_size() -> usize;
357361

362+
fn to_u64_array(self) -> Self::U64Array;
363+
fn from_u64_array(v: Self::U64Array) -> Self;
364+
358365
fn checked_add(self, rhs: Self) -> Option<Self>;
359366
fn checked_sub(self, rhs: Self) -> Option<Self>;
360367
fn checked_div(self, rhs: Self) -> Option<Self>;
@@ -422,6 +429,16 @@ pub trait Decimal:
422429
}
423430

424431
impl Decimal for i128 {
432+
type U64Array = [u64; 2];
433+
434+
fn to_u64_array(self) -> Self::U64Array {
435+
unsafe { std::mem::transmute(self) }
436+
}
437+
438+
fn from_u64_array(v: Self::U64Array) -> Self {
439+
unsafe { std::mem::transmute(v) }
440+
}
441+
425442
fn zero() -> Self {
426443
0_i128
427444
}
@@ -656,6 +673,16 @@ impl Decimal for i128 {
656673
}
657674

658675
impl Decimal for i256 {
676+
type U64Array = [u64; 4];
677+
678+
fn to_u64_array(self) -> Self::U64Array {
679+
unsafe { std::mem::transmute(self) }
680+
}
681+
682+
fn from_u64_array(v: Self::U64Array) -> Self {
683+
unsafe { std::mem::transmute(v) }
684+
}
685+
659686
fn zero() -> Self {
660687
i256::ZERO
661688
}

src/query/functions/src/aggregates/aggregate_min_max_any.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::sync::Arc;
1717

1818
use borsh::BorshDeserialize;
1919
use borsh::BorshSerialize;
20+
use databend_common_arrow::arrow::bitmap::Bitmap;
2021
use databend_common_exception::ErrorCode;
2122
use databend_common_exception::Result;
2223
use databend_common_expression::types::decimal::*;
@@ -38,6 +39,7 @@ use super::aggregate_scalar_state::TYPE_MIN;
3839
use super::AggregateUnaryFunction;
3940
use super::FunctionData;
4041
use super::UnaryState;
42+
use crate::aggregates::aggregate_min_max_any_decimal::MinMaxAnyDecimalState;
4143
use crate::aggregates::assert_unary_arguments;
4244
use crate::aggregates::AggregateFunction;
4345
use crate::with_compare_mapped_type;
@@ -92,6 +94,36 @@ where
9294
Ok(())
9395
}
9496

97+
fn add_batch(
98+
&mut self,
99+
other: T::Column,
100+
validity: Option<&Bitmap>,
101+
function_data: Option<&dyn FunctionData>,
102+
) -> Result<()> {
103+
let column_len = T::column_len(&other);
104+
if column_len == 0 {
105+
return Ok(());
106+
}
107+
108+
let column_iter = T::iter_column(&other);
109+
if let Some(validity) = validity {
110+
if validity.unset_bits() == column_len {
111+
return Ok(());
112+
}
113+
for (data, valid) in column_iter.zip(validity.iter()) {
114+
if valid {
115+
let _ = self.add(data, function_data);
116+
}
117+
}
118+
} else {
119+
let v = column_iter.reduce(|l, r| if !C::change_if(&l, &r) { l } else { r });
120+
if let Some(v) = v {
121+
let _ = self.add(v, function_data);
122+
}
123+
}
124+
Ok(())
125+
}
126+
95127
fn merge(&mut self, rhs: &Self) -> Result<()> {
96128
if let Some(v) = &rhs.value {
97129
self.add(T::to_scalar_ref(v), None)?;
@@ -164,7 +196,7 @@ pub fn try_create_aggregate_min_max_any_function<const CMP_TYPE: u8>(
164196
};
165197
let return_type = DataType::Decimal(DecimalDataType::from_size(decimal_size)?);
166198
AggregateUnaryFunction::<
167-
MinMaxAnyState<DecimalType<i128>, CMP>,
199+
MinMaxAnyDecimalState<DecimalType<i128>, CMP>,
168200
DecimalType<i128>,
169201
DecimalType<i128>,
170202
>::try_create_unary(
@@ -178,7 +210,7 @@ pub fn try_create_aggregate_min_max_any_function<const CMP_TYPE: u8>(
178210
};
179211
let return_type = DataType::Decimal(DecimalDataType::from_size(decimal_size)?);
180212
AggregateUnaryFunction::<
181-
MinMaxAnyState<DecimalType<i256>, CMP>,
213+
MinMaxAnyDecimalState<DecimalType<i256>, CMP>,
182214
DecimalType<i256>,
183215
DecimalType<i256>,
184216
>::try_create_unary(
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::marker::PhantomData;
16+
17+
use borsh::BorshDeserialize;
18+
use borsh::BorshSerialize;
19+
use databend_common_arrow::arrow::bitmap::Bitmap;
20+
use databend_common_exception::Result;
21+
use databend_common_expression::types::decimal::*;
22+
use databend_common_expression::types::*;
23+
24+
use super::aggregate_scalar_state::ChangeIf;
25+
use super::FunctionData;
26+
use super::UnaryState;
27+
28+
#[derive(BorshSerialize, BorshDeserialize)]
29+
pub struct MinMaxAnyDecimalState<T, C>
30+
where
31+
T: ValueType,
32+
T::Scalar: Decimal,
33+
<T::Scalar as Decimal>::U64Array: BorshSerialize + BorshDeserialize,
34+
C: ChangeIf<T>,
35+
{
36+
pub value: Option<<T::Scalar as Decimal>::U64Array>,
37+
#[borsh(skip)]
38+
_c: PhantomData<C>,
39+
}
40+
41+
impl<T, C> Default for MinMaxAnyDecimalState<T, C>
42+
where
43+
T: ValueType,
44+
T::Scalar: Decimal,
45+
<T::Scalar as Decimal>::U64Array: BorshSerialize + BorshDeserialize,
46+
C: ChangeIf<T>,
47+
{
48+
fn default() -> Self {
49+
Self {
50+
value: None,
51+
_c: PhantomData,
52+
}
53+
}
54+
}
55+
56+
impl<T, C> UnaryState<T, T> for MinMaxAnyDecimalState<T, C>
57+
where
58+
T: ValueType,
59+
T::Scalar: Decimal,
60+
<T::Scalar as Decimal>::U64Array: BorshSerialize + BorshDeserialize,
61+
C: ChangeIf<T> + Default,
62+
{
63+
fn add(
64+
&mut self,
65+
other: T::ScalarRef<'_>,
66+
_function_data: Option<&dyn FunctionData>,
67+
) -> Result<()> {
68+
match self.value {
69+
Some(v) => {
70+
let v = T::Scalar::from_u64_array(v);
71+
if C::change_if(&T::to_scalar_ref(&v), &other) {
72+
self.value = Some(T::Scalar::to_u64_array(T::to_owned_scalar(other)));
73+
}
74+
}
75+
None => {
76+
self.value = Some(T::Scalar::to_u64_array(T::to_owned_scalar(other)));
77+
}
78+
}
79+
Ok(())
80+
}
81+
82+
fn add_batch(
83+
&mut self,
84+
other: T::Column,
85+
validity: Option<&Bitmap>,
86+
function_data: Option<&dyn FunctionData>,
87+
) -> Result<()> {
88+
let column_len = T::column_len(&other);
89+
if column_len == 0 {
90+
return Ok(());
91+
}
92+
93+
let column_iter = T::iter_column(&other);
94+
match validity {
95+
Some(validity)
96+
if validity.unset_bits() > 0 && validity.unset_bits() < validity.len() =>
97+
{
98+
let v = column_iter
99+
.zip(validity)
100+
.filter(|(_, valid)| *valid)
101+
.map(|(v, _)| v)
102+
.reduce(|l, r| if !C::change_if(&l, &r) { l } else { r });
103+
if let Some(v) = v {
104+
let _ = self.add(v, function_data);
105+
}
106+
}
107+
Some(validity) if validity.unset_bits() == validity.len() => {}
108+
_ => {
109+
let v = column_iter.reduce(|l, r| if !C::change_if(&l, &r) { l } else { r });
110+
if let Some(v) = v {
111+
let _ = self.add(v, function_data);
112+
}
113+
}
114+
}
115+
116+
Ok(())
117+
}
118+
119+
fn merge(&mut self, rhs: &Self) -> Result<()> {
120+
if let Some(v) = rhs.value {
121+
let v = T::Scalar::from_u64_array(v);
122+
self.add(T::to_scalar_ref(&v), None)?;
123+
}
124+
Ok(())
125+
}
126+
127+
fn merge_result(
128+
&mut self,
129+
builder: &mut T::ColumnBuilder,
130+
_function_data: Option<&dyn FunctionData>,
131+
) -> Result<()> {
132+
if let Some(v) = self.value {
133+
let v = T::Scalar::from_u64_array(v);
134+
T::push_item(builder, T::to_scalar_ref(&v));
135+
} else {
136+
T::push_default(builder);
137+
}
138+
Ok(())
139+
}
140+
}

0 commit comments

Comments
 (0)