Skip to content

Commit 62dd811

Browse files
authored
Merge branch 'backport/v1.2.680' into backport-fix-agg-hang
2 parents 7af80b1 + 9987c34 commit 62dd811

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+1082
-523
lines changed

Cargo.lock

Lines changed: 84 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ members = [
3636
"src/query/expression",
3737
"src/query/formats",
3838
"src/query/functions",
39+
"src/query/functions",
40+
"src/query/functions/src/scalars/mathematics",
41+
"src/query/functions/src/scalars/geographic",
42+
"src/query/functions/src/scalars/timestamp",
43+
"src/query/functions/src/scalars/numeric_basic_arithmetic",
44+
"src/query/functions/src/scalars/arithmetic",
3945
"src/query/management",
4046
"src/query/pipeline/core",
4147
"src/query/pipeline/sinks",
@@ -184,6 +190,12 @@ databend-enterprise-storage-quota = { path = "src/query/ee_features/storage_quot
184190
databend-enterprise-stream-handler = { path = "src/query/ee_features/stream_handler" }
185191
databend-enterprise-vacuum-handler = { path = "src/query/ee_features/vacuum_handler" }
186192
databend-enterprise-virtual-column = { path = "src/query/ee_features/virtual_column" }
193+
databend-functions-scalar-arithmetic = { path = "src/query/functions/src/scalars/arithmetic" }
194+
databend-functions-scalar-datetime = { path = "src/query/functions/src/scalars/timestamp" }
195+
databend-functions-scalar-decimal = { path = "src/query/functions/src/scalars/decimal" }
196+
databend-functions-scalar-geo = { path = "src/query/functions/src/scalars/geographic" }
197+
databend-functions-scalar-math = { path = "src/query/functions/src/scalars/mathematics" }
198+
databend-functions-scalar-numeric-basic-arithmetic = { path = "src/query/functions/src/scalars/numeric_basic_arithmetic" }
187199
databend-meta = { path = "src/meta/service" }
188200
databend-query = { path = "src/query/service" }
189201
databend-sqllogictests = { path = "tests/sqllogictests" }

src/query/expression/src/register_vectorize.rs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -283,8 +283,7 @@ pub fn passthrough_nullable_1_arg<I1: ArgType, O: ArgType>(
283283

284284
match out {
285285
Value::Column(out) => Value::Column(NullableColumn::new(out, args_validity)),
286-
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(Some(out)),
287-
_ => Value::Scalar(None),
286+
Value::Scalar(out) => Value::Scalar(Some(out)),
288287
}
289288
}
290289
_ => Value::Scalar(None),
@@ -308,15 +307,15 @@ pub fn passthrough_nullable_2_arg<I1: ArgType, I2: ArgType, O: ArgType>(
308307
if let Some(validity) = ctx.validity.as_ref() {
309308
args_validity = &args_validity & validity;
310309
}
310+
311311
ctx.validity = Some(args_validity.clone());
312312
match (arg1.value(), arg2.value()) {
313313
(Some(arg1), Some(arg2)) => {
314314
let out = func(arg1, arg2, ctx);
315315

316316
match out {
317317
Value::Column(out) => Value::Column(NullableColumn::new(out, args_validity)),
318-
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(Some(out)),
319-
_ => Value::Scalar(None),
318+
Value::Scalar(out) => Value::Scalar(Some(out)),
320319
}
321320
}
322321
_ => Value::Scalar(None),
@@ -352,8 +351,7 @@ pub fn passthrough_nullable_3_arg<I1: ArgType, I2: ArgType, I3: ArgType, O: ArgT
352351

353352
match out {
354353
Value::Column(out) => Value::Column(NullableColumn::new(out, args_validity)),
355-
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(Some(out)),
356-
_ => Value::Scalar(None),
354+
Value::Scalar(out) => Value::Scalar(Some(out)),
357355
}
358356
}
359357
_ => Value::Scalar(None),
@@ -397,8 +395,7 @@ pub fn passthrough_nullable_4_arg<
397395

398396
match out {
399397
Value::Column(out) => Value::Column(NullableColumn::new(out, args_validity)),
400-
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(Some(out)),
401-
_ => Value::Scalar(None),
398+
Value::Scalar(out) => Value::Scalar(Some(out)),
402399
}
403400
}
404401
_ => Value::Scalar(None),
@@ -427,8 +424,7 @@ pub fn combine_nullable_1_arg<I1: ArgType, O: ArgType>(
427424
out.column,
428425
&args_validity & &out.validity,
429426
)),
430-
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(out),
431-
_ => Value::Scalar(None),
427+
Value::Scalar(out) => Value::Scalar(out),
432428
}
433429
}
434430
_ => Value::Scalar(None),
@@ -465,8 +461,7 @@ pub fn combine_nullable_2_arg<I1: ArgType, I2: ArgType, O: ArgType>(
465461
out.column,
466462
&args_validity & &out.validity,
467463
)),
468-
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(out),
469-
_ => Value::Scalar(None),
464+
Value::Scalar(out) => Value::Scalar(out),
470465
}
471466
}
472467
_ => Value::Scalar(None),
@@ -505,8 +500,7 @@ pub fn combine_nullable_3_arg<I1: ArgType, I2: ArgType, I3: ArgType, O: ArgType>
505500
out.column,
506501
&args_validity & &out.validity,
507502
)),
508-
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(out),
509-
_ => Value::Scalar(None),
503+
Value::Scalar(out) => Value::Scalar(out),
510504
}
511505
}
512506
_ => Value::Scalar(None),
@@ -552,8 +546,7 @@ pub fn combine_nullable_4_arg<I1: ArgType, I2: ArgType, I3: ArgType, I4: ArgType
552546
out.column,
553547
&args_validity & &out.validity,
554548
)),
555-
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(out),
556-
_ => Value::Scalar(None),
549+
Value::Scalar(out) => Value::Scalar(out),
557550
}
558551
}
559552
_ => Value::Scalar(None),

src/query/formats/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ databend-common-expression = { workspace = true }
1919
databend-common-io = { workspace = true }
2020
databend-common-meta-app = { workspace = true }
2121
databend-common-settings = { workspace = true }
22+
databend-functions-scalar-datetime = { workspace = true }
2223
databend-storages-common-blocks = { workspace = true }
2324
databend-storages-common-table-meta = { workspace = true }
2425

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::io::Cursor;
16+
17+
use bstr::ByteSlice;
18+
use databend_common_exception::ErrorCode;
19+
use databend_common_exception::Result;
20+
use databend_common_expression::types::timestamp::clamp_timestamp;
21+
use databend_common_io::cursor_ext::read_num_text_exact;
22+
use databend_common_io::cursor_ext::BufferReadDateTimeExt;
23+
use databend_common_io::cursor_ext::DateTimeResType;
24+
use databend_common_io::cursor_ext::ReadBytesExt;
25+
use databend_functions_scalar_datetime::datetime::int64_to_timestamp;
26+
27+
use crate::InputCommonSettings;
28+
29+
pub(crate) fn read_timestamp(
30+
column: &mut Vec<i64>,
31+
data: &[u8],
32+
settings: &InputCommonSettings,
33+
) -> Result<()> {
34+
let ts = if !data.contains(&b'-') {
35+
int64_to_timestamp(read_num_text_exact(data)?)
36+
} else {
37+
let mut buffer_readr = Cursor::new(&data);
38+
let t = buffer_readr.read_timestamp_text(&settings.jiff_timezone)?;
39+
match t {
40+
DateTimeResType::Datetime(t) => {
41+
if !buffer_readr.eof() {
42+
let data = data.to_str().unwrap_or("not utf8");
43+
let msg = format!(
44+
"fail to deserialize timestamp, unexpected end at pos {} of {}",
45+
buffer_readr.position(),
46+
data
47+
);
48+
return Err(ErrorCode::BadBytes(msg));
49+
}
50+
let mut ts = t.timestamp().as_microsecond();
51+
clamp_timestamp(&mut ts);
52+
ts
53+
}
54+
_ => unreachable!(),
55+
}
56+
};
57+
column.push(ts);
58+
Ok(())
59+
}

src/query/formats/src/field_decoder/fast_values.rs

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use databend_common_expression::types::decimal::DecimalSize;
3737
use databend_common_expression::types::nullable::NullableColumnBuilder;
3838
use databend_common_expression::types::number::Number;
3939
use databend_common_expression::types::string::StringColumnBuilder;
40-
use databend_common_expression::types::timestamp::clamp_timestamp;
4140
use databend_common_expression::types::AnyType;
4241
use databend_common_expression::types::MutableBitmap;
4342
use databend_common_expression::types::NumberColumnBuilder;
@@ -51,7 +50,6 @@ use databend_common_io::constants::NULL_BYTES_UPPER;
5150
use databend_common_io::constants::TRUE_BYTES_LOWER;
5251
use databend_common_io::cursor_ext::BufferReadDateTimeExt;
5352
use databend_common_io::cursor_ext::BufferReadStringExt;
54-
use databend_common_io::cursor_ext::DateTimeResType;
5553
use databend_common_io::cursor_ext::ReadBytesExt;
5654
use databend_common_io::cursor_ext::ReadCheckPointExt;
5755
use databend_common_io::cursor_ext::ReadNumberExt;
@@ -62,9 +60,9 @@ use databend_common_io::prelude::FormatSettings;
6260
use databend_common_io::Interval;
6361
use jsonb::parse_value;
6462
use lexical_core::FromLexical;
65-
use num::cast::AsPrimitive;
6663
use num_traits::NumCast;
6764

65+
use crate::field_decoder::common::read_timestamp;
6866
use crate::FieldDecoder;
6967
use crate::InputCommonSettings;
7068

@@ -323,26 +321,7 @@ impl FastFieldDecoderValues {
323321
) -> Result<()> {
324322
let mut buf = Vec::new();
325323
self.read_string_inner(reader, &mut buf, positions)?;
326-
let mut buffer_readr = Cursor::new(&buf);
327-
let ts = buffer_readr.read_timestamp_text(&self.common_settings().jiff_timezone)?;
328-
match ts {
329-
DateTimeResType::Datetime(ts) => {
330-
if !buffer_readr.eof() {
331-
let data = buf.to_str().unwrap_or("not utf8");
332-
let msg = format!(
333-
"fail to deserialize timestamp, unexpected end at pos {} of {}",
334-
buffer_readr.position(),
335-
data
336-
);
337-
return Err(ErrorCode::BadBytes(msg));
338-
}
339-
let mut micros = ts.timestamp().as_microsecond();
340-
clamp_timestamp(&mut micros);
341-
column.push(micros.as_());
342-
}
343-
_ => unreachable!(),
344-
}
345-
Ok(())
324+
read_timestamp(column, &buf, self.common_settings())
346325
}
347326

348327
fn read_array<R: AsRef<[u8]>>(

src/query/formats/src/field_decoder/json_ast.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use databend_common_io::cursor_ext::DateTimeResType;
4040
use databend_common_io::geography::geography_from_ewkt;
4141
use databend_common_io::geometry_from_ewkt;
4242
use databend_common_io::parse_bitmap;
43+
use databend_functions_scalar_datetime::datetime::int64_to_timestamp;
4344
use jiff::tz::TimeZone;
4445
use lexical_core::FromLexical;
4546
use num::cast::AsPrimitive;
@@ -296,8 +297,8 @@ impl FieldJsonAstDecoder {
296297
Ok(())
297298
}
298299
Value::Number(number) => match number.as_i64() {
299-
Some(mut n) => {
300-
clamp_timestamp(&mut n);
300+
Some(n) => {
301+
let n = int64_to_timestamp(n);
301302
column.push(n);
302303
Ok(())
303304
}

src/query/formats/src/field_decoder/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod common;
1516
mod fast_values;
1617
mod json_ast;
1718
mod nested;

0 commit comments

Comments
 (0)