Skip to content

Commit 66ec553

Browse files
committed
fix
1 parent 0bde63e commit 66ec553

File tree

4 files changed

+47
-19
lines changed

4 files changed

+47
-19
lines changed

src/query/functions/src/scalars/hilbert.rs

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ use databend_common_expression::types::ALL_NUMERICS_TYPES;
2424
use databend_common_expression::vectorize_with_builder_1_arg;
2525
use databend_common_expression::vectorize_with_builder_2_arg;
2626
use databend_common_expression::with_number_mapped_type;
27+
use databend_common_expression::Column;
2728
use databend_common_expression::FixedLengthEncoding;
2829
use databend_common_expression::FunctionDomain;
2930
use databend_common_expression::FunctionRegistry;
31+
use databend_common_expression::ScalarRef;
3032

3133
pub fn register(registry: &mut FunctionRegistry) {
3234
registry.register_passthrough_nullable_1_arg::<StringType, BinaryType, _, _>(
@@ -92,22 +94,45 @@ pub fn register(registry: &mut FunctionRegistry) {
9294
// The column values are conceptually divided into multiple partitions defined by the range_bounds.
9395
// For example, given the column values (0, 1, 3, 6, 8) and a partition configuration with 3 partitions,
9496
// the range_bounds might be [1, 6]. The function would then return partition IDs as (0, 0, 1, 1, 2).
95-
registry.register_passthrough_nullable_2_arg::<GenericType<0>, ArrayType<GenericType<0>>, NumberType<u64>, _, _>(
97+
registry
98+
.register_2_arg_core::<GenericType<0>, ArrayType<GenericType<0>>, NumberType<u64>, _, _>(
99+
"range_partition_id",
100+
|_, _, _| FunctionDomain::Full,
101+
vectorize_with_builder_2_arg::<
102+
GenericType<0>,
103+
ArrayType<GenericType<0>>,
104+
NumberType<u64>,
105+
>(|val, arr, builder, _| {
106+
let id = calc_range_partition_id(val, arr);
107+
builder.push(id);
108+
}),
109+
);
110+
111+
registry.register_2_arg_core::<NullableType<GenericType<0>>, NullableType<ArrayType<GenericType<0>>>, NumberType<u64>, _, _>(
96112
"range_partition_id",
97113
|_, _, _| FunctionDomain::Full,
98-
vectorize_with_builder_2_arg::<GenericType<0>, ArrayType<GenericType<0>>, NumberType<u64>>(|val, arr, builder, _| {
99-
let mut low = 0;
100-
let mut high = arr.len();
101-
while low < high {
102-
let mid = low + ((high - low) / 2);
103-
let bound = unsafe {arr.index_unchecked(mid)};
104-
if val > bound {
105-
low = mid + 1;
106-
} else {
107-
high = mid;
108-
}
109-
}
110-
builder.push(low as u64);
114+
vectorize_with_builder_2_arg::<NullableType<GenericType<0>>, NullableType<ArrayType<GenericType<0>>>, NumberType<u64>>(|val, arr, builder, _| {
115+
let id = match (val, arr) {
116+
(Some(val), Some(arr)) => calc_range_partition_id(val, arr),
117+
(None, Some(arr)) => arr.len() as u64 + 1,
118+
_ => 0,
119+
};
120+
builder.push(id);
111121
}),
112122
);
113123
}
124+
125+
fn calc_range_partition_id(val: ScalarRef, arr: Column) -> u64 {
126+
let mut low = 0;
127+
let mut high = arr.len();
128+
while low < high {
129+
let mid = low + ((high - low) / 2);
130+
let bound = unsafe { arr.index_unchecked(mid) };
131+
if val > bound {
132+
low = mid + 1;
133+
} else {
134+
high = mid;
135+
}
136+
}
137+
low as u64
138+
}

src/query/functions/tests/it/scalars/testdata/function_list.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3211,7 +3211,7 @@ Functions overloads:
32113211
0 range(UInt64, UInt64) :: Array(UInt64)
32123212
1 range(UInt64 NULL, UInt64 NULL) :: Array(UInt64) NULL
32133213
0 range_partition_id(T0, Array(T0)) :: UInt64
3214-
1 range_partition_id(T0 NULL, Array(T0) NULL) :: UInt64 NULL
3214+
1 range_partition_id(T0 NULL, Array(T0) NULL) :: UInt64
32153215
0 regexp(String, String) :: Boolean
32163216
1 regexp(String NULL, String NULL) :: Boolean NULL
32173217
0 regexp_instr FACTORY

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,7 @@ impl ReclusterTableInterpreter {
279279
"range_bound({partitions}, {sample_size})({cluster_key_str}) AS bound_{index}"
280280
));
281281
hilbert_keys.push(format!(
282-
"hilbert_key(cast(ifnull(range_partition_id({table}.{cluster_key_str}, \
283-
_keys_bound.bound_{index}), {partitions}) as uint16))"
282+
"hilbert_key(cast(range_partition_id({table}.{cluster_key_str}, _keys_bound.bound_{index}) as uint16))"
284283
));
285284
}
286285
let keys_bounds_str = keys_bounds.join(", ");

src/query/service/src/pipelines/processors/transforms/transform_partition_collect.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,12 @@ impl Exchange for HilbertPartitionExchange {
5454
fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
5555
// Extract the columns used for hash computation.
5656
let mut data_block = data_block.consume_convert_to_full();
57-
let last = data_block.get_last_column().as_nullable().unwrap();
58-
let range_ids = last.column.as_number().unwrap().as_u_int64().unwrap();
57+
let range_ids = data_block
58+
.get_last_column()
59+
.as_number()
60+
.unwrap()
61+
.as_u_int64()
62+
.unwrap();
5963

6064
// Scatter the data block to different partitions.
6165
let indices = range_ids

0 commit comments

Comments
 (0)