Skip to content

Commit 19c8921

Browse files
authored
feat: Add hdfs support in iceberg and fill iceberg statistics (#17352)
* feat: Add hdfs support in iceberg Signed-off-by: Xuanwo <[email protected]> * Enable webhdfs support Signed-off-by: Xuanwo <[email protected]> * Fix iceberg support Signed-off-by: Xuanwo <[email protected]> * Add table statistics for iceberg Signed-off-by: Xuanwo <[email protected]> * Also implement column statistics Signed-off-by: Xuanwo <[email protected]> * Fix scan Signed-off-by: Xuanwo <[email protected]> * Fix ndv is wrong Signed-off-by: Xuanwo <[email protected]> * Add more info in iceberg table Signed-off-by: Xuanwo <[email protected]> * Fix column id Signed-off-by: Xuanwo <[email protected]> * Add debug for inner join Signed-off-by: Xuanwo <[email protected]> * Try fix tests Signed-off-by: Xuanwo <[email protected]> * Fix tests Signed-off-by: Xuanwo <[email protected]> * Fix system tables query Signed-off-by: Xuanwo <[email protected]> * Fix stats not calculated correctly Signed-off-by: Xuanwo <[email protected]> * Fix stats Signed-off-by: Xuanwo <[email protected]> * Fix stats Signed-off-by: Xuanwo <[email protected]> * Try fix predicate Signed-off-by: Xuanwo <[email protected]> * Fix lock not updated Signed-off-by: Xuanwo <[email protected]> * Fix cargo Signed-off-by: Xuanwo <[email protected]> * Fix tests Signed-off-by: Xuanwo <[email protected]> * revert scan changes Signed-off-by: Xuanwo <[email protected]> * Revert changes to tests Signed-off-by: Xuanwo <[email protected]> * Revert change to tests Signed-off-by: Xuanwo <[email protected]> --------- Signed-off-by: Xuanwo <[email protected]>
1 parent ded3893 commit 19c8921

File tree

17 files changed

+537
-251
lines changed

17 files changed

+537
-251
lines changed

Cargo.lock

Lines changed: 139 additions & 80 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ arrow-data = { version = "53" }
227227
arrow-flight = { version = "53", features = ["flight-sql-experimental", "tls"] }
228228
arrow-ipc = { version = "53" }
229229
arrow-ord = { version = "53" }
230-
arrow-schema = { version = "53", features = ["serde"] }
230+
arrow-schema = { version = "53.4", features = ["serde"] }
231231
arrow-select = { version = "53" }
232232
arrow-udf-js = { version = "0.5.0" }
233233
arrow-udf-python = { version = "0.4.0" }
@@ -323,10 +323,12 @@ http = "1"
323323
humantime = "2.1.0"
324324
hyper = "1"
325325
hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] }
326-
iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
327-
iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
328-
iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
329-
iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
326+
iceberg = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11", features = [
327+
"storage-all",
328+
] }
329+
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" }
330+
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" }
331+
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" }
330332
indexmap = "2.0.0"
331333
indicatif = "0.17.5"
332334
itertools = "0.13.0"

src/query/functions/src/scalars/other.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ fn register_grouping(registry: &mut FunctionRegistry) {
383383
.collect::<Vec<_>>();
384384
Value::Column(Column::Number(NumberColumn::UInt32(output.into())))
385385
}
386-
_ => unreachable!(),
386+
v => unreachable!("unexpected value type for grouping function: {:?}", v),
387387
}),
388388
},
389389
}))

src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,10 @@ impl HashJoinProbeState {
9090
}
9191

9292
if FROM_LEFT_SINGLE && match_count > 1 {
93-
return Err(ErrorCode::Internal(
94-
"Scalar subquery can't return more than one row",
95-
));
93+
return Err(ErrorCode::Internal(format!(
94+
"Scalar subquery can't return more than one row, but got {}",
95+
match_count
96+
)));
9697
}
9798

9899
// Fill `probe_indexes`.

src/query/sql/src/planner/binder/ddl/dictionary.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ impl Binder {
400400
) -> Result<Plan> {
401401
let ShowDictionariesStmt { database, limit } = stmt;
402402

403-
let mut select_builder = SelectBuilder::from("system.dictionaries");
403+
let mut select_builder = SelectBuilder::from("default.system.dictionaries");
404404

405405
select_builder
406406
.with_column("database AS Database")

src/query/sql/src/planner/binder/ddl/stream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ impl Binder {
127127
let database = self.check_database_exist(catalog, database).await?;
128128

129129
let mut select_builder = if *full {
130-
SelectBuilder::from("system.streams")
130+
SelectBuilder::from("default.system.streams")
131131
} else {
132-
SelectBuilder::from("system.streams_terse")
132+
SelectBuilder::from("default.system.streams_terse")
133133
};
134134

135135
if *full {
@@ -200,7 +200,7 @@ impl Binder {
200200
let (catalog, database, stream) =
201201
self.normalize_object_identifier_triple(catalog, database, stream);
202202

203-
let mut select_builder = SelectBuilder::from("system.streams");
203+
let mut select_builder = SelectBuilder::from("default.system.streams");
204204
select_builder
205205
.with_column("created_on")
206206
.with_column("name")

src/query/sql/src/planner/binder/ddl/table.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,9 @@ impl Binder {
174174
let database = self.check_database_exist(catalog, database).await?;
175175

176176
let mut select_builder = if stmt.with_history {
177-
SelectBuilder::from("system.tables_with_history")
177+
SelectBuilder::from("default.system.tables_with_history")
178178
} else {
179-
SelectBuilder::from("system.tables")
179+
SelectBuilder::from("default.system.tables")
180180
};
181181

182182
if *full {
@@ -318,21 +318,21 @@ impl Binder {
318318
// Use `system.tables` AS the "base" table to construct the result-set of `SHOW TABLE STATUS ..`
319319
//
320320
// To constraint the schema of the final result-set,
321-
// `(select ${select_cols} from system.tables where ..)`
321+
// `(select ${select_cols} from default.system.tables where ..)`
322322
// is used AS a derived table.
323323
// (unlike mysql, alias of derived table is not required in databend).
324324
let query = match limit {
325325
None => format!(
326-
"SELECT {} FROM system.tables WHERE database = '{}' ORDER BY Name",
326+
"SELECT {} FROM default.system.tables WHERE database = '{}' ORDER BY Name",
327327
select_cols, database
328328
),
329329
Some(ShowLimit::Like { pattern }) => format!(
330-
"SELECT * from (SELECT {} FROM system.tables WHERE database = '{}') \
330+
"SELECT * from (SELECT {} FROM default.system.tables WHERE database = '{}') \
331331
WHERE Name LIKE '{}' ORDER BY Name",
332332
select_cols, database, pattern
333333
),
334334
Some(ShowLimit::Where { selection }) => format!(
335-
"SELECT * from (SELECT {} FROM system.tables WHERE database = '{}') \
335+
"SELECT * from (SELECT {} FROM default.system.tables WHERE database = '{}') \
336336
WHERE ({}) ORDER BY Name",
337337
select_cols, database, selection
338338
),
@@ -353,7 +353,7 @@ impl Binder {
353353

354354
let database = self.check_database_exist(&None, database).await?;
355355

356-
let mut select_builder = SelectBuilder::from("system.tables_with_history");
356+
let mut select_builder = SelectBuilder::from("default.system.tables_with_history");
357357

358358
select_builder
359359
.with_column("name AS Tables")

src/query/sql/src/planner/binder/ddl/view.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,9 @@ impl Binder {
157157
let database = self.check_database_exist(catalog, database).await?;
158158

159159
let mut select_builder = if stmt.with_history {
160-
SelectBuilder::from("system.views_with_history")
160+
SelectBuilder::from("default.system.views_with_history")
161161
} else {
162-
SelectBuilder::from("system.views")
162+
SelectBuilder::from("default.system.views")
163163
};
164164

165165
if *full {

src/query/sql/src/planner/binder/ddl/virtual_column.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ impl Binder {
385385
}
386386
};
387387

388-
let mut select_builder = SelectBuilder::from("system.virtual_columns");
388+
let mut select_builder = SelectBuilder::from("default.system.virtual_columns");
389389
select_builder
390390
.with_column("database")
391391
.with_column("table")

src/query/sql/src/planner/plans/scan.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,15 @@ impl Operator for Scan {
234234
continue;
235235
}
236236
if let Some(col_stat) = v.clone() {
237-
// Safe to unwrap: min, max and ndv are all `Some(_)`.
237+
// Safe to unwrap: min, max are all `Some(_)`.
238238
let min = col_stat.min.unwrap();
239239
let max = col_stat.max.unwrap();
240-
let ndv = col_stat.ndv.unwrap();
240+
// ndv could be `None`, we will use `num_rows - null_count` as ndv instead.
241+
//
242+
// NOTE: don't touch the original num_rows, since it will be used in other places.
243+
let ndv = col_stat
244+
.ndv
245+
.unwrap_or_else(|| num_rows.saturating_sub(col_stat.null_count));
241246
let histogram = if let Some(histogram) = self.statistics.histograms.get(k)
242247
&& histogram.is_some()
243248
{

0 commit comments

Comments
 (0)