Skip to content

Commit 0154424

Browse files
authored
fix(query): aviod building TransformSortMergeLimit with a big number limit (#17339)
* fix * update * update * fix * fix
1 parent 71e0c50 commit 0154424

File tree

14 files changed

+166
-120
lines changed

14 files changed

+166
-120
lines changed

src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::processors::sort::Merger;
4343
pub struct TransformSortMerge<R: Rows> {
4444
schema: DataSchemaRef,
4545
enable_loser_tree: bool,
46-
46+
limit: Option<usize>,
4747
block_size: usize,
4848
buffer: Vec<Option<(DataBlock, Column)>>,
4949

@@ -62,10 +62,12 @@ impl<R: Rows> TransformSortMerge<R> {
6262
_sort_desc: Arc<Vec<SortColumnDescription>>,
6363
block_size: usize,
6464
enable_loser_tree: bool,
65+
limit: Option<usize>,
6566
) -> Self {
6667
TransformSortMerge {
6768
schema,
6869
enable_loser_tree,
70+
limit,
6971
block_size,
7072
buffer: vec![],
7173
aborting: Arc::new(AtomicBool::new(false)),
@@ -171,7 +173,8 @@ impl<R: Rows> TransformSortMerge<R> {
171173
let streams = self.buffer.drain(..).collect::<Vec<BlockStream>>();
172174
let mut result = Vec::with_capacity(size_hint);
173175

174-
let mut merger = Merger::<A, _>::create(self.schema.clone(), streams, batch_size, None);
176+
let mut merger =
177+
Merger::<A, _>::create(self.schema.clone(), streams, batch_size, self.limit);
175178

176179
while let Some(block) = merger.next_block()? {
177180
if unlikely(self.aborting.load(Ordering::Relaxed)) {
@@ -218,7 +221,7 @@ pub fn sort_merge(
218221
0,
219222
0,
220223
sort_spilling_batch_bytes,
221-
MergeSortCommonImpl::create(schema, sort_desc, block_size, enable_loser_tree),
224+
MergeSortCommonImpl::create(schema, sort_desc, block_size, enable_loser_tree, None),
222225
)?;
223226
for block in data_blocks {
224227
processor.transform(block)?;

src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ impl TransformSortMergeBuilder {
337337
!self.schema.has_field(ORDER_COL_NAME)
338338
});
339339

340-
if self.limit.is_some() {
340+
if self.limit.map(|limit| limit < 10000).unwrap_or_default() {
341341
self.build_sort_limit()
342342
} else {
343343
self.build_sort()
@@ -400,6 +400,7 @@ impl TransformSortMergeBuilder {
400400
self.sort_desc,
401401
self.block_size,
402402
self.enable_loser_tree,
403+
self.limit,
403404
),
404405
)?,
405406
))

src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_limit.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ impl<R: Rows> CursorOrder<R> for LocalCursorOrder {
127127

128128
impl<R: Rows> TransformSortMergeLimit<R> {
129129
pub fn create(block_size: usize, limit: usize) -> Self {
130-
debug_assert!(limit <= 10000, "Too large sort merge limit: {}", limit);
131130
TransformSortMergeLimit {
132131
heap: FixedHeap::new(limit),
133132
buffer: HashMap::with_capacity(limit),

src/query/service/src/pipelines/builders/builder_window.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@ impl PipelineBuilder {
170170
})
171171
.collect::<Result<Vec<_>>>()?;
172172

173-
if let Some(top_n) = &window_partition.top_n {
173+
if let Some(top_n) = &window_partition.top_n
174+
&& top_n.top < 10000
175+
{
174176
self.main_pipeline.exchange(
175177
num_processors,
176178
WindowPartitionTopNExchange::create(

src/query/service/tests/it/pipelines/transforms/sort/spill.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ fn create_sort_spill_pipeline(
7171
sort_desc.clone(),
7272
block_size,
7373
enable_loser_tree,
74+
None,
7475
),
7576
)
7677
})?;

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,13 @@ impl DefaultSettings {
324324
scope: SettingScope::Both,
325325
range: Some(SettingRange::Numeric(0..=1)),
326326
}),
327+
("max_push_down_limit", DefaultSettingValue {
328+
value: UserSettingValue::UInt64(10000),
329+
desc: "Sets the maximum number of rows limit that can be pushed down to the leaf operator.",
330+
mode: SettingMode::Both,
331+
scope: SettingScope::Both,
332+
range: Some(SettingRange::Numeric(0..=u64::MAX)),
333+
}),
327334
("join_spilling_memory_ratio", DefaultSettingValue {
328335
value: UserSettingValue::UInt64(60),
329336
desc: "Sets the maximum memory ratio in bytes that hash join can use before spilling data to storage during query execution, 0 is unlimited",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,10 @@ impl Settings {
319319
Ok(self.unchecked_try_get_u64("disable_join_reorder")? != 0)
320320
}
321321

322+
pub fn get_max_push_down_limit(&self) -> Result<usize> {
323+
Ok(self.try_get_u64("max_push_down_limit")? as usize)
324+
}
325+
322326
pub fn get_join_spilling_memory_ratio(&self) -> Result<usize> {
323327
Ok(self.try_get_u64("join_spilling_memory_ratio")? as usize)
324328
}

src/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ pub struct OptimizerContext {
7171
pub(crate) enable_distributed_optimization: bool,
7272
enable_join_reorder: bool,
7373
enable_dphyp: bool,
74+
pub(crate) max_push_down_limit: usize,
7475
planning_agg_index: bool,
7576
#[educe(Debug(ignore))]
7677
pub(crate) sample_executor: Option<Arc<dyn QueryExecutor>>,
@@ -85,6 +86,7 @@ impl OptimizerContext {
8586
enable_distributed_optimization: false,
8687
enable_join_reorder: true,
8788
enable_dphyp: true,
89+
max_push_down_limit: 10000,
8890
sample_executor: None,
8991
planning_agg_index: false,
9092
}
@@ -114,6 +116,11 @@ impl OptimizerContext {
114116
self.planning_agg_index = true;
115117
self
116118
}
119+
120+
pub fn with_max_push_down_limit(mut self, max_push_down_limit: usize) -> Self {
121+
self.max_push_down_limit = max_push_down_limit;
122+
self
123+
}
117124
}
118125

119126
/// A recursive optimizer that will apply the given rules recursively.

src/query/sql/src/planner/optimizer/rule/factory.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@ use crate::optimizer::OptimizerContext;
5656

5757
pub struct RuleFactory;
5858

59-
pub const MAX_PUSH_DOWN_LIMIT: usize = 10000;
60-
6159
impl RuleFactory {
6260
pub fn create_rule(id: RuleID, ctx: OptimizerContext) -> Result<RulePtr> {
6361
match id {
@@ -81,15 +79,15 @@ impl RuleFactory {
8179
}
8280
RuleID::PushDownLimitOuterJoin => Ok(Box::new(RulePushDownLimitOuterJoin::new())),
8381
RuleID::PushDownLimitEvalScalar => Ok(Box::new(RulePushDownLimitEvalScalar::new())),
84-
RuleID::PushDownLimitSort => {
85-
Ok(Box::new(RulePushDownLimitSort::new(MAX_PUSH_DOWN_LIMIT)))
86-
}
87-
RuleID::PushDownLimitWindow => {
88-
Ok(Box::new(RulePushDownLimitWindow::new(MAX_PUSH_DOWN_LIMIT)))
89-
}
90-
RuleID::RulePushDownRankLimitAggregate => {
91-
Ok(Box::new(RulePushDownRankLimitAggregate::new()))
92-
}
82+
RuleID::PushDownLimitSort => Ok(Box::new(RulePushDownLimitSort::new(
83+
ctx.max_push_down_limit,
84+
))),
85+
RuleID::PushDownLimitWindow => Ok(Box::new(RulePushDownLimitWindow::new(
86+
ctx.max_push_down_limit,
87+
))),
88+
RuleID::RulePushDownRankLimitAggregate => Ok(Box::new(
89+
RulePushDownRankLimitAggregate::new(ctx.max_push_down_limit),
90+
)),
9391
RuleID::PushDownFilterAggregate => Ok(Box::new(RulePushDownFilterAggregate::new())),
9492
RuleID::PushDownFilterWindow => Ok(Box::new(RulePushDownFilterWindow::new())),
9593
RuleID::PushDownFilterWindowTopN => {

src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs

Lines changed: 82 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ use crate::plans::SortItem;
4141
pub struct RulePushDownRankLimitAggregate {
4242
id: RuleID,
4343
matchers: Vec<Matcher>,
44+
max_limit: usize,
4445
}
4546

4647
impl RulePushDownRankLimitAggregate {
47-
pub fn new() -> Self {
48+
pub fn new(max_limit: usize) -> Self {
4849
Self {
4950
id: RuleID::RulePushDownRankLimitAggregate,
5051
matchers: vec![
@@ -73,6 +74,7 @@ impl RulePushDownRankLimitAggregate {
7374
}],
7475
},
7576
],
77+
max_limit,
7678
}
7779
}
7880

@@ -84,40 +86,44 @@ impl RulePushDownRankLimitAggregate {
8486
state: &mut TransformResult,
8587
) -> databend_common_exception::Result<()> {
8688
let limit: Limit = s_expr.plan().clone().try_into()?;
87-
if let Some(mut count) = limit.limit {
88-
count += limit.offset;
89-
let agg = s_expr.child(0)?;
90-
let mut agg_limit: Aggregate = agg.plan().clone().try_into()?;
91-
92-
let sort_items = agg_limit
93-
.group_items
94-
.iter()
95-
.map(|g| SortItem {
96-
index: g.index,
97-
asc: true,
98-
nulls_first: false,
99-
})
100-
.collect::<Vec<_>>();
101-
agg_limit.rank_limit = Some((sort_items.clone(), count));
102-
103-
let sort = Sort {
104-
items: sort_items.clone(),
105-
limit: Some(count),
106-
after_exchange: None,
107-
pre_projection: None,
108-
window_partition: None,
109-
};
110-
111-
let agg = SExpr::create_unary(
112-
Arc::new(RelOperator::Aggregate(agg_limit)),
113-
Arc::new(agg.child(0)?.clone()),
114-
);
115-
let sort = SExpr::create_unary(Arc::new(RelOperator::Sort(sort)), agg.into());
116-
let mut result = s_expr.replace_children(vec![Arc::new(sort)]);
117-
118-
result.set_applied_rule(&self.id);
119-
state.add_result(result);
89+
let Some(mut count) = limit.limit else {
90+
return Ok(());
91+
};
92+
count += limit.offset;
93+
if count > self.max_limit {
94+
return Ok(());
12095
}
96+
let agg = s_expr.child(0)?;
97+
let mut agg_limit: Aggregate = agg.plan().clone().try_into()?;
98+
99+
let sort_items = agg_limit
100+
.group_items
101+
.iter()
102+
.map(|g| SortItem {
103+
index: g.index,
104+
asc: true,
105+
nulls_first: false,
106+
})
107+
.collect::<Vec<_>>();
108+
agg_limit.rank_limit = Some((sort_items.clone(), count));
109+
110+
let sort = Sort {
111+
items: sort_items.clone(),
112+
limit: Some(count),
113+
after_exchange: None,
114+
pre_projection: None,
115+
window_partition: None,
116+
};
117+
118+
let agg = SExpr::create_unary(
119+
Arc::new(RelOperator::Aggregate(agg_limit)),
120+
Arc::new(agg.child(0)?.clone()),
121+
);
122+
let sort = SExpr::create_unary(Arc::new(RelOperator::Sort(sort)), agg.into());
123+
let mut result = s_expr.replace_children(vec![Arc::new(sort)]);
124+
125+
result.set_applied_rule(&self.id);
126+
state.add_result(result);
121127
Ok(())
122128
}
123129

@@ -137,53 +143,55 @@ impl RulePushDownRankLimitAggregate {
137143
_ => return Ok(()),
138144
};
139145

146+
let Some(limit) = sort.limit else {
147+
return Ok(());
148+
};
149+
140150
let mut agg_limit: Aggregate = agg_limit_expr.plan().clone().try_into()?;
141151

142-
if let Some(limit) = sort.limit {
143-
let is_order_subset = sort
144-
.items
145-
.iter()
146-
.all(|k| agg_limit.group_items.iter().any(|g| g.index == k.index));
152+
let is_order_subset = sort
153+
.items
154+
.iter()
155+
.all(|k| agg_limit.group_items.iter().any(|g| g.index == k.index));
156+
if !is_order_subset {
157+
return Ok(());
158+
}
147159

148-
if !is_order_subset {
149-
return Ok(());
150-
}
151-
let mut sort_items = Vec::with_capacity(agg_limit.group_items.len());
152-
let mut not_found_sort_items = vec![];
153-
for i in 0..agg_limit.group_items.len() {
154-
let group_item = &agg_limit.group_items[i];
155-
if let Some(sort_item) = sort.items.iter().find(|k| k.index == group_item.index) {
156-
sort_items.push(SortItem {
157-
index: group_item.index,
158-
asc: sort_item.asc,
159-
nulls_first: sort_item.nulls_first,
160-
});
161-
} else {
162-
not_found_sort_items.push(SortItem {
163-
index: group_item.index,
164-
asc: true,
165-
nulls_first: false,
166-
});
167-
}
160+
let mut sort_items = Vec::with_capacity(agg_limit.group_items.len());
161+
let mut not_found_sort_items = vec![];
162+
for i in 0..agg_limit.group_items.len() {
163+
let group_item = &agg_limit.group_items[i];
164+
if let Some(sort_item) = sort.items.iter().find(|k| k.index == group_item.index) {
165+
sort_items.push(SortItem {
166+
index: group_item.index,
167+
asc: sort_item.asc,
168+
nulls_first: sort_item.nulls_first,
169+
});
170+
} else {
171+
not_found_sort_items.push(SortItem {
172+
index: group_item.index,
173+
asc: true,
174+
nulls_first: false,
175+
});
168176
}
169-
sort_items.extend(not_found_sort_items);
177+
}
178+
sort_items.extend(not_found_sort_items);
170179

171-
agg_limit.rank_limit = Some((sort_items, limit));
180+
agg_limit.rank_limit = Some((sort_items, limit));
172181

173-
let agg = SExpr::create_unary(
174-
Arc::new(RelOperator::Aggregate(agg_limit)),
175-
Arc::new(agg_limit_expr.child(0)?.clone()),
176-
);
182+
let agg = SExpr::create_unary(
183+
Arc::new(RelOperator::Aggregate(agg_limit)),
184+
Arc::new(agg_limit_expr.child(0)?.clone()),
185+
);
177186

178-
let mut result = if has_eval_scalar {
179-
let eval_scalar = s_expr.child(0)?.replace_children(vec![Arc::new(agg)]);
180-
s_expr.replace_children(vec![Arc::new(eval_scalar)])
181-
} else {
182-
s_expr.replace_children(vec![Arc::new(agg)])
183-
};
184-
result.set_applied_rule(&self.id);
185-
state.add_result(result);
186-
}
187+
let mut result = if has_eval_scalar {
188+
let eval_scalar = s_expr.child(0)?.replace_children(vec![Arc::new(agg)]);
189+
s_expr.replace_children(vec![Arc::new(eval_scalar)])
190+
} else {
191+
s_expr.replace_children(vec![Arc::new(agg)])
192+
};
193+
result.set_applied_rule(&self.id);
194+
state.add_result(result);
187195
Ok(())
188196
}
189197
}

0 commit comments

Comments
 (0)