Skip to content

Commit ac39d05

Browse files
committed
try to improve
1 parent 5272401 commit ac39d05

File tree

9 files changed

+157
-49
lines changed

9 files changed

+157
-49
lines changed

scripts/ci/deploy/databend-query-cluster-3-nodes.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ set -e
66

77
SCRIPT_PATH="$(cd "$(dirname "$0")" >/dev/null 2>&1 && pwd)"
88
cd "$SCRIPT_PATH/../../.." || exit
9-
BUILD_PROFILE=${BUILD_PROFILE:-debug}
9+
BUILD_PROFILE=${BUILD_PROFILE:-release}
1010

1111
# Caveat: has to kill query first.
1212
# `query` tries to remove its liveness record from meta before shutting down.

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,12 @@ use databend_common_sql::executor::physical_plans::Recluster;
4242
use databend_common_sql::executor::PhysicalPlan;
4343
use databend_common_sql::executor::PhysicalPlanBuilder;
4444
use databend_common_sql::plans::set_update_stream_columns;
45-
use databend_common_sql::plans::BoundColumnRef;
4645
use databend_common_sql::plans::Plan;
4746
use databend_common_sql::plans::ReclusterPlan;
4847
use databend_common_sql::query_executor::QueryExecutor;
4948
use databend_common_sql::IdentifierNormalizer;
5049
use databend_common_sql::NameResolutionContext;
5150
use databend_common_sql::Planner;
52-
use databend_common_sql::ScalarExpr;
5351
use databend_common_sql::TypeChecker;
5452
use databend_enterprise_hilbert_clustering::get_hilbert_clustering_handler;
5553
use databend_storages_common_table_meta::table::ClusterType;
@@ -61,7 +59,6 @@ use log::warn;
6159
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
6260
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
6361
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
64-
use crate::interpreters::interpreter_insert_multi_table::scalar_expr_to_remote_expr;
6562
use crate::interpreters::Interpreter;
6663
use crate::interpreters::InterpreterClusteringHistory;
6764
use crate::pipelines::executor::ExecutorSettings;
@@ -267,11 +264,11 @@ impl ReclusterTableInterpreter {
267264
acc
268265
},
269266
);
270-
267+
271268
let query_str = self.ctx.get_query_str();
272269
let write_progress = self.ctx.get_write_progress();
273270
let write_progress_value = write_progress.as_ref().get_values();
274-
271+
275272
let subquery_executor = Arc::new(ServiceQueryExecutor::new(
276273
QueryContext::create_from(self.ctx.as_ref()),
277274
));
@@ -353,8 +350,7 @@ impl ReclusterTableInterpreter {
353350
.unwrap_or_default();
354351
let (stmt, _) = parse_sql(&tokens, sql_dialect)?;
355352

356-
let mut planner =
357-
Planner::new_with_query_executor(self.ctx.clone(), subquery_executor);
353+
let mut planner = Planner::new(self.ctx.clone());
358354
let plan = planner.plan_stmt(&stmt, false).await?;
359355
let Plan::Query {
360356
mut s_expr,
@@ -369,8 +365,6 @@ impl ReclusterTableInterpreter {
369365
*s_expr = set_update_stream_columns(&s_expr)?;
370366
}
371367

372-
write_progress.set(&write_progress_value);
373-
self.ctx.attach_query_str(QueryKind::Other, query_str);
374368
let mut builder = PhysicalPlanBuilder::new(metadata, self.ctx.clone(), false);
375369
let mut plan = Box::new(builder.build(&s_expr, bind_context.column_set()).await?);
376370
let mut is_exchange = false;
@@ -384,33 +378,22 @@ impl ReclusterTableInterpreter {
384378
plan = input.clone();
385379
}
386380

381+
let mut nodes_num = 1;
387382
let cluster = self.ctx.get_cluster();
388-
let is_distributed = is_exchange || !cluster.is_empty();
389-
if is_distributed {
390-
let expr = scalar_expr_to_remote_expr(
391-
&ScalarExpr::BoundColumnRef(BoundColumnRef {
392-
span: None,
393-
column: bind_context.columns.last().unwrap().clone(),
394-
}),
395-
plan.output_schema()?.as_ref(),
396-
)?;
397-
plan = Box::new(PhysicalPlan::Exchange(Exchange {
398-
plan_id: 0,
399-
input: plan,
400-
kind: FragmentKind::Normal,
401-
keys: vec![expr],
402-
allow_adjust_parallelism: true,
403-
ignore_exchange: false,
404-
}));
383+
if !cluster.is_empty() {
384+
nodes_num = cluster.nodes.len();
405385
}
406-
386+
let is_distributed = is_exchange || !cluster.is_empty();
407387
let mut plan = PhysicalPlan::HilbertPartition(Box::new(HilbertPartition {
408388
plan_id: 0,
409389
input: plan,
410390
table_info: table_info.clone(),
411391
range_id: bind_context.columns.last().unwrap().index,
412-
num_partitions: total_partitions,
392+
num_partitions: total_partitions.div_ceil(nodes_num),
413393
rows_per_block,
394+
node_id: None,
395+
projections: bind_context.column_set(),
396+
nodes_num,
414397
}));
415398

416399
if is_distributed {
@@ -424,6 +407,8 @@ impl ReclusterTableInterpreter {
424407
});
425408
}
426409

410+
write_progress.set(&write_progress_value);
411+
self.ctx.attach_query_str(QueryKind::Other, query_str);
427412
PhysicalPlan::CommitSink(Box::new(CommitSink {
428413
input: Box::new(plan),
429414
table_info,

src/query/service/src/pipelines/builders/builder_hilbert_partition.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,20 @@ use std::sync::atomic::AtomicUsize;
1717

1818
use databend_common_catalog::table_context::TableContext;
1919
use databend_common_exception::Result;
20+
use databend_common_expression::types::DataType;
21+
use databend_common_expression::types::NumberDataType;
22+
use databend_common_expression::types::NumberScalar;
23+
use databend_common_expression::Scalar;
24+
use databend_common_expression::PREDICATE_COLUMN_NAME;
2025
use databend_common_pipeline_core::processors::ProcessorPtr;
2126
use databend_common_sql::executor::physical_plans::HilbertPartition;
2227
use databend_common_sql::executor::physical_plans::MutationKind;
28+
use databend_common_sql::plans::BoundColumnRef;
29+
use databend_common_sql::plans::ConstantExpr;
30+
use databend_common_sql::plans::FunctionCall;
31+
use databend_common_sql::ColumnBindingBuilder;
32+
use databend_common_sql::ScalarExpr;
33+
use databend_common_sql::Visibility;
2334
use databend_common_storages_fuse::operations::TransformSerializeBlock;
2435
use databend_common_storages_fuse::statistics::ClusterStatsGenerator;
2536
use databend_common_storages_fuse::FuseTable;
@@ -36,6 +47,50 @@ use crate::spillers::SpillerDiskConfig;
3647
impl PipelineBuilder {
3748
pub(crate) fn build_hilbert_partition(&mut self, partition: &HilbertPartition) -> Result<()> {
3849
self.build_pipeline(&partition.input)?;
50+
if let Some(node_id) = partition.node_id {
51+
let input_schema = partition.input.output_schema()?;
52+
let column = ColumnBindingBuilder::new(
53+
PREDICATE_COLUMN_NAME.to_string(),
54+
input_schema.num_fields() - 1,
55+
Box::new(DataType::Number(NumberDataType::UInt64)),
56+
Visibility::Visible,
57+
)
58+
.build();
59+
let expr = ScalarExpr::FunctionCall(FunctionCall {
60+
span: None,
61+
func_name: "eq".to_string(),
62+
params: vec![],
63+
arguments: vec![
64+
ScalarExpr::FunctionCall(FunctionCall {
65+
span: None,
66+
func_name: "mod".to_string(),
67+
params: vec![],
68+
arguments: vec![
69+
ScalarExpr::BoundColumnRef(BoundColumnRef { span: None, column }),
70+
ScalarExpr::ConstantExpr(ConstantExpr {
71+
span: None,
72+
value: Scalar::Number(NumberScalar::UInt8(
73+
partition.nodes_num as u8,
74+
)),
75+
}),
76+
],
77+
}),
78+
ScalarExpr::ConstantExpr(ConstantExpr {
79+
span: None,
80+
value: Scalar::Number(NumberScalar::UInt8(node_id)),
81+
}),
82+
],
83+
});
84+
85+
let remote_expr = expr
86+
.as_expr()?
87+
.project_column_ref(|col| col.index)
88+
.as_remote_expr();
89+
self.main_pipeline.add_transform(
90+
self.filter_transform_builder(&[remote_expr], partition.projections.clone())?,
91+
)?;
92+
}
93+
3994
let num_processors = self.main_pipeline.output_len();
4095
let table = self
4196
.ctx
@@ -44,7 +99,7 @@ impl PipelineBuilder {
4499

45100
self.main_pipeline.exchange(
46101
num_processors,
47-
HilbertPartitionExchange::create(partition.num_partitions),
102+
HilbertPartitionExchange::create(partition.num_partitions, partition.nodes_num),
48103
);
49104

50105
let settings = self.ctx.get_settings();

src/query/service/src/pipelines/processors/transforms/transform_partition_collect.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,15 @@ use crate::spillers::SpillerType;
4141

4242
pub struct HilbertPartitionExchange {
4343
num_partitions: usize,
44+
nod_nums: usize,
4445
}
4546

4647
impl HilbertPartitionExchange {
47-
pub fn create(num_partitions: usize) -> Arc<HilbertPartitionExchange> {
48-
Arc::new(HilbertPartitionExchange { num_partitions })
48+
pub fn create(num_partitions: usize, nod_nums: usize) -> Arc<HilbertPartitionExchange> {
49+
Arc::new(HilbertPartitionExchange {
50+
num_partitions,
51+
nod_nums,
52+
})
4953
}
5054
}
5155

@@ -64,15 +68,20 @@ impl Exchange for HilbertPartitionExchange {
6468
// Scatter the data block to different partitions.
6569
let indices = range_ids
6670
.iter()
67-
.map(|&id| (id % self.num_partitions as u64) as u16)
71+
.map(|&id| ((id / self.nod_nums as u64) % self.num_partitions as u64) as u16)
6872
.collect::<Vec<_>>();
6973
data_block.pop_columns(1);
70-
let scatter_blocks = DataBlock::scatter(&data_block, &indices, self.num_partitions)?;
71-
74+
let scatter_indices =
75+
DataBlock::divide_indices_by_scatter_size(&indices, self.num_partitions);
7276
// Partition the data blocks to different processors.
7377
let mut output_data_blocks = vec![vec![]; n];
74-
for (partition_id, data_block) in scatter_blocks.into_iter().enumerate() {
75-
output_data_blocks[partition_id % n].push((partition_id, data_block));
78+
for (partition_id, indices) in scatter_indices.iter().take(self.num_partitions).enumerate()
79+
{
80+
if indices.is_empty() {
81+
continue;
82+
}
83+
let block = data_block.take_with_optimize_size(indices)?;
84+
output_data_blocks[partition_id % n].push((partition_id, block));
7685
}
7786

7887
// Union data blocks for each processor.

src/query/service/src/schedulers/fragments/fragmenter.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use databend_common_sql::executor::physical_plans::ExchangeSink;
2525
use databend_common_sql::executor::physical_plans::ExchangeSource;
2626
use databend_common_sql::executor::physical_plans::FragmentKind;
2727
use databend_common_sql::executor::physical_plans::HashJoin;
28+
use databend_common_sql::executor::physical_plans::HilbertPartition;
2829
use databend_common_sql::executor::physical_plans::MutationSource;
2930
use databend_common_sql::executor::physical_plans::Recluster;
3031
use databend_common_sql::executor::physical_plans::ReplaceInto;
@@ -65,6 +66,7 @@ enum State {
6566
ReplaceInto,
6667
Compact,
6768
Recluster,
69+
Hilbert,
6870
Other,
6971
}
7072

@@ -201,6 +203,11 @@ impl PhysicalPlanReplacer for Fragmenter {
201203
Ok(PhysicalPlan::Recluster(Box::new(plan.clone())))
202204
}
203205

206+
fn replace_hilbert_recluster(&mut self, plan: &HilbertPartition) -> Result<PhysicalPlan> {
207+
self.state = State::Hilbert;
208+
Ok(PhysicalPlan::HilbertPartition(Box::new(plan.clone())))
209+
}
210+
204211
fn replace_compact_source(&mut self, plan: &CompactSource) -> Result<PhysicalPlan> {
205212
self.state = State::Compact;
206213
Ok(PhysicalPlan::CompactSource(Box::new(plan.clone())))
@@ -301,6 +308,7 @@ impl PhysicalPlanReplacer for Fragmenter {
301308
State::Other => FragmentType::Intermediate,
302309
State::ReplaceInto => FragmentType::ReplaceInto,
303310
State::Compact => FragmentType::Compact,
311+
State::Hilbert => FragmentType::Hilbert,
304312
State::Recluster => FragmentType::Recluster,
305313
};
306314
self.state = State::Other;

src/query/service/src/schedulers/fragments/plan_fragment.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use databend_common_sql::executor::physical_plans::CompactSource;
2929
use databend_common_sql::executor::physical_plans::ConstantTableScan;
3030
use databend_common_sql::executor::physical_plans::CopyIntoTable;
3131
use databend_common_sql::executor::physical_plans::CopyIntoTableSource;
32+
use databend_common_sql::executor::physical_plans::HilbertPartition;
3233
use databend_common_sql::executor::physical_plans::MutationSource;
3334
use databend_common_sql::executor::physical_plans::Recluster;
3435
use databend_common_sql::executor::physical_plans::ReplaceDeduplicate;
@@ -64,6 +65,7 @@ pub enum FragmentType {
6465
ReplaceInto,
6566
Compact,
6667
Recluster,
68+
Hilbert,
6769
MutationSource,
6870
}
6971

@@ -137,6 +139,9 @@ impl PlanFragment {
137139
FragmentType::Recluster => {
138140
self.redistribute_recluster(ctx, &mut fragment_actions)?;
139141
}
142+
FragmentType::Hilbert => {
143+
self.redistribute_hilbert(ctx, &mut fragment_actions)?;
144+
}
140145
}
141146

142147
if let Some(ref exchange) = self.exchange {
@@ -346,6 +351,24 @@ impl PlanFragment {
346351
Ok(())
347352
}
348353

354+
fn redistribute_hilbert(
355+
&self,
356+
ctx: Arc<QueryContext>,
357+
fragment_actions: &mut QueryFragmentActions,
358+
) -> Result<()> {
359+
let executors = Fragmenter::get_executors(ctx.clone());
360+
for (executor_idx, executor) in executors.into_iter().enumerate() {
361+
let mut plan = self.plan.clone();
362+
let mut replace_hilbert = ReplaceHilbertRecluster {
363+
node: executor_idx as u8,
364+
};
365+
plan = replace_hilbert.replace(&plan)?;
366+
367+
fragment_actions.add_action(QueryFragmentAction::create(executor, plan));
368+
}
369+
Ok(())
370+
}
371+
349372
fn redistribute_recluster(
350373
&self,
351374
ctx: Arc<QueryContext>,
@@ -569,6 +592,19 @@ impl PhysicalPlanReplacer for ReplaceRecluster {
569592
}
570593
}
571594

595+
struct ReplaceHilbertRecluster {
596+
pub node: u8,
597+
}
598+
599+
impl PhysicalPlanReplacer for ReplaceHilbertRecluster {
600+
fn replace_hilbert_recluster(&mut self, plan: &HilbertPartition) -> Result<PhysicalPlan> {
601+
Ok(PhysicalPlan::HilbertPartition(Box::new(HilbertPartition {
602+
node_id: Some(self.node),
603+
..plan.clone()
604+
})))
605+
}
606+
}
607+
572608
struct ReplaceMutationSource {
573609
pub partitions: Partitions,
574610
}

src/query/sql/src/executor/physical_plan_visitor.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub trait PhysicalPlanReplacer {
108108
PhysicalPlan::ExpressionScan(plan) => self.replace_expression_scan(plan),
109109
PhysicalPlan::CacheScan(plan) => self.replace_cache_scan(plan),
110110
PhysicalPlan::Recluster(plan) => self.replace_recluster(plan),
111-
PhysicalPlan::HilbertPartition(plan) => self.replace_hilbert_serialize(plan),
111+
PhysicalPlan::HilbertPartition(plan) => self.replace_hilbert_recluster(plan),
112112
PhysicalPlan::Udf(plan) => self.replace_udf(plan),
113113
PhysicalPlan::AsyncFunction(plan) => self.replace_async_function(plan),
114114
PhysicalPlan::Duplicate(plan) => self.replace_duplicate(plan),
@@ -127,7 +127,7 @@ pub trait PhysicalPlanReplacer {
127127
Ok(PhysicalPlan::Recluster(Box::new(plan.clone())))
128128
}
129129

130-
fn replace_hilbert_serialize(&mut self, plan: &HilbertPartition) -> Result<PhysicalPlan> {
130+
fn replace_hilbert_recluster(&mut self, plan: &HilbertPartition) -> Result<PhysicalPlan> {
131131
let input = self.replace(&plan.input)?;
132132
Ok(PhysicalPlan::HilbertPartition(Box::new(HilbertPartition {
133133
plan_id: plan.plan_id,
@@ -136,6 +136,9 @@ pub trait PhysicalPlanReplacer {
136136
range_id: plan.range_id,
137137
num_partitions: plan.num_partitions,
138138
rows_per_block: plan.rows_per_block,
139+
node_id: plan.node_id,
140+
projections: plan.projections.clone(),
141+
nodes_num: plan.nodes_num,
139142
})))
140143
}
141144

src/query/sql/src/executor/physical_plans/physical_recluster.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use databend_common_catalog::plan::ReclusterTask;
1616
use databend_common_meta_app::schema::TableInfo;
1717

1818
use crate::executor::PhysicalPlan;
19+
use crate::ColumnSet;
1920
use crate::IndexType;
2021

2122
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
@@ -33,4 +34,8 @@ pub struct HilbertPartition {
3334
pub range_id: IndexType,
3435
pub num_partitions: usize,
3536
pub rows_per_block: usize,
37+
38+
pub node_id: Option<u8>,
39+
pub projections: ColumnSet,
40+
pub nodes_num: usize,
3641
}

0 commit comments

Comments
 (0)