Skip to content

Commit fcf8e09

Browse files
authored
chore: resolve todo in dynamic sample (#16629)
1 parent ac03117 commit fcf8e09

File tree

6 files changed

+318
-96
lines changed

6 files changed

+318
-96
lines changed

src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::ops::Deref;
1516
use std::sync::Arc;
1617
use std::time::Duration;
1718

@@ -25,8 +26,13 @@ use crate::optimizer::RelExpr;
2526
use crate::optimizer::SExpr;
2627
use crate::optimizer::StatInfo;
2728
use crate::planner::query_executor::QueryExecutor;
29+
use crate::plans::Aggregate;
30+
use crate::plans::AggregateMode;
31+
use crate::plans::Limit;
2832
use crate::plans::Operator;
33+
use crate::plans::ProjectSet;
2934
use crate::plans::RelOperator;
35+
use crate::plans::UnionAll;
3036
use crate::MetadataRef;
3137

3238
#[async_recursion::async_recursion(#[recursive::recursive])]
@@ -78,11 +84,73 @@ pub async fn dynamic_sample(
7884
RelOperator::Join(_) => {
7985
join_selectivity_sample(ctx, metadata, s_expr, sample_executor).await
8086
}
81-
RelOperator::Scan(_) => s_expr.plan().derive_stats(&RelExpr::with_s_expr(s_expr)),
82-
// Todo: add more operators here, and support more query patterns.
83-
_ => {
84-
let rel_expr = RelExpr::with_s_expr(s_expr);
85-
rel_expr.derive_cardinality()
87+
RelOperator::Scan(_)
88+
| RelOperator::DummyTableScan(_)
89+
| RelOperator::CteScan(_)
90+
| RelOperator::ConstantTableScan(_)
91+
| RelOperator::CacheScan(_)
92+
| RelOperator::ExpressionScan(_)
93+
| RelOperator::RecursiveCteScan(_)
94+
| RelOperator::Mutation(_)
95+
| RelOperator::Recluster(_)
96+
| RelOperator::CompactBlock(_)
97+
| RelOperator::MutationSource(_) => {
98+
s_expr.plan().derive_stats(&RelExpr::with_s_expr(s_expr))
99+
}
100+
101+
RelOperator::Aggregate(agg) => {
102+
let child_stat_info =
103+
dynamic_sample(ctx, metadata, s_expr.child(0)?, sample_executor).await?;
104+
if agg.mode == AggregateMode::Final {
105+
return Ok(child_stat_info);
106+
}
107+
let agg = Aggregate::try_from(s_expr.plan().clone())?;
108+
agg.derive_agg_stats(child_stat_info)
109+
}
110+
RelOperator::Limit(_) => {
111+
let child_stat_info =
112+
dynamic_sample(ctx, metadata, s_expr.child(0)?, sample_executor).await?;
113+
let limit = Limit::try_from(s_expr.plan().clone())?;
114+
limit.derive_limit_stats(child_stat_info)
115+
}
116+
RelOperator::UnionAll(_) => {
117+
let left_stat_info = dynamic_sample(
118+
ctx.clone(),
119+
metadata.clone(),
120+
s_expr.child(0)?,
121+
sample_executor.clone(),
122+
)
123+
.await?;
124+
let right_stat_info =
125+
dynamic_sample(ctx, metadata, s_expr.child(1)?, sample_executor).await?;
126+
let union = UnionAll::try_from(s_expr.plan().clone())?;
127+
union.derive_union_stats(left_stat_info, right_stat_info)
128+
}
129+
RelOperator::ProjectSet(_) => {
130+
let mut child_stat_info =
131+
dynamic_sample(ctx, metadata, s_expr.child(0)?, sample_executor)
132+
.await?
133+
.deref()
134+
.clone();
135+
let project_set = ProjectSet::try_from(s_expr.plan().clone())?;
136+
project_set.derive_project_set_stats(&mut child_stat_info)
137+
}
138+
RelOperator::MaterializedCte(_) => {
139+
let right_stat_info =
140+
dynamic_sample(ctx, metadata, s_expr.child(1)?, sample_executor).await?;
141+
Ok(Arc::new(StatInfo {
142+
cardinality: right_stat_info.cardinality,
143+
statistics: right_stat_info.statistics.clone(),
144+
}))
145+
}
146+
147+
RelOperator::EvalScalar(_)
148+
| RelOperator::Sort(_)
149+
| RelOperator::Exchange(_)
150+
| RelOperator::Window(_)
151+
| RelOperator::Udf(_)
152+
| RelOperator::AsyncFunction(_) => {
153+
dynamic_sample(ctx, metadata, s_expr.child(0)?, sample_executor).await
86154
}
87155
}
88156
}

src/query/sql/src/planner/plans/aggregate.rs

Lines changed: 54 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,59 @@ impl Aggregate {
104104
}
105105
Ok(col_set)
106106
}
107+
108+
pub fn derive_agg_stats(&self, stat_info: Arc<StatInfo>) -> Result<Arc<StatInfo>> {
109+
let (cardinality, mut statistics) = (stat_info.cardinality, stat_info.statistics.clone());
110+
let cardinality = if self.group_items.is_empty() {
111+
// Scalar aggregation
112+
1.0
113+
} else if self
114+
.group_items
115+
.iter()
116+
.any(|item| !statistics.column_stats.contains_key(&item.index))
117+
{
118+
cardinality
119+
} else {
120+
// A upper bound
121+
let res = self.group_items.iter().fold(1.0, |acc, item| {
122+
let item_stat = statistics.column_stats.get(&item.index).unwrap();
123+
acc * item_stat.ndv
124+
});
125+
for item in self.group_items.iter() {
126+
let item_stat = statistics.column_stats.get_mut(&item.index).unwrap();
127+
if let Some(histogram) = &mut item_stat.histogram {
128+
let mut num_values = 0.0;
129+
let mut num_distinct = 0.0;
130+
for bucket in histogram.buckets.iter() {
131+
num_distinct += bucket.num_distinct();
132+
num_values += bucket.num_values();
133+
}
134+
// When there is a high probability that eager aggregation
135+
// is better, we will update the histogram.
136+
if num_values / num_distinct >= 10.0 {
137+
for bucket in histogram.buckets.iter_mut() {
138+
bucket.aggregate_values();
139+
}
140+
}
141+
}
142+
}
143+
// To avoid res is very large
144+
f64::min(res, cardinality)
145+
};
146+
147+
let precise_cardinality = if self.group_items.is_empty() {
148+
Some(1)
149+
} else {
150+
None
151+
};
152+
Ok(Arc::new(StatInfo {
153+
cardinality,
154+
statistics: Statistics {
155+
precise_cardinality,
156+
column_stats: statistics.column_stats,
157+
},
158+
}))
159+
}
107160
}
108161

109162
impl Operator for Aggregate {
@@ -242,56 +295,7 @@ impl Operator for Aggregate {
242295
return rel_expr.derive_cardinality_child(0);
243296
}
244297
let stat_info = rel_expr.derive_cardinality_child(0)?;
245-
let (cardinality, mut statistics) = (stat_info.cardinality, stat_info.statistics.clone());
246-
let cardinality = if self.group_items.is_empty() {
247-
// Scalar aggregation
248-
1.0
249-
} else if self
250-
.group_items
251-
.iter()
252-
.any(|item| !statistics.column_stats.contains_key(&item.index))
253-
{
254-
cardinality
255-
} else {
256-
// A upper bound
257-
let res = self.group_items.iter().fold(1.0, |acc, item| {
258-
let item_stat = statistics.column_stats.get(&item.index).unwrap();
259-
acc * item_stat.ndv
260-
});
261-
for item in self.group_items.iter() {
262-
let item_stat = statistics.column_stats.get_mut(&item.index).unwrap();
263-
if let Some(histogram) = &mut item_stat.histogram {
264-
let mut num_values = 0.0;
265-
let mut num_distinct = 0.0;
266-
for bucket in histogram.buckets.iter() {
267-
num_distinct += bucket.num_distinct();
268-
num_values += bucket.num_values();
269-
}
270-
// When there is a high probability that eager aggregation
271-
// is better, we will update the histogram.
272-
if num_values / num_distinct >= 10.0 {
273-
for bucket in histogram.buckets.iter_mut() {
274-
bucket.aggregate_values();
275-
}
276-
}
277-
}
278-
}
279-
// To avoid res is very large
280-
f64::min(res, cardinality)
281-
};
282-
283-
let precise_cardinality = if self.group_items.is_empty() {
284-
Some(1)
285-
} else {
286-
None
287-
};
288-
Ok(Arc::new(StatInfo {
289-
cardinality,
290-
statistics: Statistics {
291-
precise_cardinality,
292-
column_stats: statistics.column_stats,
293-
},
294-
}))
298+
self.derive_agg_stats(stat_info)
295299
}
296300

297301
fn compute_required_prop_children(

src/query/sql/src/planner/plans/limit.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,29 @@ pub struct Limit {
3333
pub offset: usize,
3434
}
3535

36+
impl Limit {
37+
pub fn derive_limit_stats(&self, stat_info: Arc<StatInfo>) -> Result<Arc<StatInfo>> {
38+
let cardinality = match self.limit {
39+
Some(limit) if (limit as f64) < stat_info.cardinality => limit as f64,
40+
_ => stat_info.cardinality,
41+
};
42+
let precise_cardinality = match (self.limit, stat_info.statistics.precise_cardinality) {
43+
(Some(limit), Some(pc)) => {
44+
Some((pc.saturating_sub(self.offset as u64)).min(limit as u64))
45+
}
46+
_ => None,
47+
};
48+
49+
Ok(Arc::new(StatInfo {
50+
cardinality,
51+
statistics: Statistics {
52+
precise_cardinality,
53+
column_stats: Default::default(),
54+
},
55+
}))
56+
}
57+
}
58+
3659
impl Operator for Limit {
3760
fn rel_op(&self) -> RelOp {
3861
RelOp::Limit
@@ -67,23 +90,6 @@ impl Operator for Limit {
6790

6891
fn derive_stats(&self, rel_expr: &RelExpr) -> Result<Arc<StatInfo>> {
6992
let stat_info = rel_expr.derive_cardinality_child(0)?;
70-
let cardinality = match self.limit {
71-
Some(limit) if (limit as f64) < stat_info.cardinality => limit as f64,
72-
_ => stat_info.cardinality,
73-
};
74-
let precise_cardinality = match (self.limit, stat_info.statistics.precise_cardinality) {
75-
(Some(limit), Some(pc)) => {
76-
Some((pc.saturating_sub(self.offset as u64)).min(limit as u64))
77-
}
78-
_ => None,
79-
};
80-
81-
Ok(Arc::new(StatInfo {
82-
cardinality,
83-
statistics: Statistics {
84-
precise_cardinality,
85-
column_stats: Default::default(),
86-
},
87-
}))
93+
self.derive_limit_stats(stat_info)
8894
}
8995
}

src/query/sql/src/planner/plans/project_set.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
use std::ops::Deref;
1616
use std::sync::Arc;
1717

18+
use databend_common_exception::Result;
19+
1820
use crate::optimizer::RelExpr;
1921
use crate::optimizer::RelationalProperty;
2022
use crate::optimizer::StatInfo;
@@ -30,6 +32,14 @@ pub struct ProjectSet {
3032
pub srfs: Vec<ScalarItem>,
3133
}
3234

35+
impl ProjectSet {
36+
pub fn derive_project_set_stats(&self, input_stat: &mut StatInfo) -> Result<Arc<StatInfo>> {
37+
// ProjectSet is set-returning functions, precise_cardinality set None
38+
input_stat.statistics.precise_cardinality = None;
39+
Ok(Arc::new(input_stat.clone()))
40+
}
41+
}
42+
3343
impl Operator for ProjectSet {
3444
fn rel_op(&self) -> RelOp {
3545
RelOp::ProjectSet
@@ -75,8 +85,6 @@ impl Operator for ProjectSet {
7585

7686
fn derive_stats(&self, rel_expr: &RelExpr) -> databend_common_exception::Result<Arc<StatInfo>> {
7787
let mut input_stat = rel_expr.derive_cardinality_child(0)?.deref().clone();
78-
// ProjectSet is set-returning functions, precise_cardinality set None
79-
input_stat.statistics.precise_cardinality = None;
80-
Ok(Arc::new(input_stat))
88+
self.derive_project_set_stats(&mut input_stat)
8189
}
8290
}

src/query/sql/src/planner/plans/union_all.rs

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,33 @@ impl UnionAll {
5454
}
5555
Ok(used_columns)
5656
}
57+
58+
pub fn derive_union_stats(
59+
&self,
60+
left_stat_info: Arc<StatInfo>,
61+
right_stat_info: Arc<StatInfo>,
62+
) -> Result<Arc<StatInfo>> {
63+
let cardinality = left_stat_info.cardinality + right_stat_info.cardinality;
64+
65+
let precise_cardinality =
66+
left_stat_info
67+
.statistics
68+
.precise_cardinality
69+
.and_then(|left_cardinality| {
70+
right_stat_info
71+
.statistics
72+
.precise_cardinality
73+
.map(|right_cardinality| left_cardinality + right_cardinality)
74+
});
75+
76+
Ok(Arc::new(StatInfo {
77+
cardinality,
78+
statistics: Statistics {
79+
precise_cardinality,
80+
column_stats: Default::default(),
81+
},
82+
}))
83+
}
5784
}
5885

5986
impl Operator for UnionAll {
@@ -117,26 +144,7 @@ impl Operator for UnionAll {
117144
fn derive_stats(&self, rel_expr: &RelExpr) -> Result<Arc<StatInfo>> {
118145
let left_stat_info = rel_expr.derive_cardinality_child(0)?;
119146
let right_stat_info = rel_expr.derive_cardinality_child(1)?;
120-
let cardinality = left_stat_info.cardinality + right_stat_info.cardinality;
121-
122-
let precise_cardinality =
123-
left_stat_info
124-
.statistics
125-
.precise_cardinality
126-
.and_then(|left_cardinality| {
127-
right_stat_info
128-
.statistics
129-
.precise_cardinality
130-
.map(|right_cardinality| left_cardinality + right_cardinality)
131-
});
132-
133-
Ok(Arc::new(StatInfo {
134-
cardinality,
135-
statistics: Statistics {
136-
precise_cardinality,
137-
column_stats: Default::default(),
138-
},
139-
}))
147+
self.derive_union_stats(left_stat_info, right_stat_info)
140148
}
141149

142150
fn compute_required_prop_child(

0 commit comments

Comments
 (0)