Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
338bc81
feat: Add hdfs support in iceberg
Xuanwo Jan 22, 2025
7906c26
Enable webhdfs support
Xuanwo Jan 24, 2025
88d96af
Fix iceberg support
Xuanwo Feb 10, 2025
c2d5357
Merge remote-tracking branch 'origin/main' into iceberg-hdfs
Xuanwo Feb 10, 2025
d610d76
Add table statistics for iceberg
Xuanwo Feb 11, 2025
ed5b364
Also implement column statistics
Xuanwo Feb 11, 2025
6ea601c
Fix scan
Xuanwo Feb 11, 2025
fd15437
Fix ndv is wrong
Xuanwo Feb 11, 2025
db65403
Add more info in iceberg table
Xuanwo Feb 11, 2025
40db9fd
Fix column id
Xuanwo Feb 11, 2025
6e26b30
Add debug for inner join
Xuanwo Feb 11, 2025
fd438d4
Merge remote-tracking branch 'origin' into iceberg-hdfs
Xuanwo Feb 12, 2025
7b852ad
Try fix tests
Xuanwo Feb 12, 2025
8fad3d8
Fix tests
Xuanwo Feb 12, 2025
ebd9afb
Merge branch 'main' into iceberg-hdfs
Xuanwo Feb 12, 2025
02008f5
Fix system tables query
Xuanwo Feb 12, 2025
864a810
Fix stats not calculated correctly
Xuanwo Feb 13, 2025
4fa9ca0
Fix stats
Xuanwo Feb 13, 2025
1427581
Fix stats
Xuanwo Feb 13, 2025
d4b3352
Try fix predicate
Xuanwo Feb 13, 2025
52fc72e
Fix lock not updated
Xuanwo Feb 13, 2025
5fc5ee1
Merge branch 'main' into iceberg-hdfs
Xuanwo Feb 14, 2025
9199c19
Fix cargo
Xuanwo Feb 14, 2025
526f55a
Fix tests
Xuanwo Feb 14, 2025
a3dffc5
Merge branch 'main' into iceberg-hdfs
Xuanwo Feb 14, 2025
74488d7
revert scan changes
Xuanwo Feb 14, 2025
d931da8
Revert changes to tests
Xuanwo Feb 14, 2025
812ee03
Merge remote-tracking branch 'refs/remotes/xuanwo/iceberg-hdfs' into …
Xuanwo Feb 14, 2025
2d0797e
Merge remote-tracking branch 'origin/main' into iceberg-hdfs
Xuanwo Feb 14, 2025
d530039
Revert change to tests
Xuanwo Feb 14, 2025
e46820f
Merge branch 'main' into iceberg-hdfs
Xuanwo Feb 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 139 additions & 80 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ arrow-data = { version = "53" }
arrow-flight = { version = "53", features = ["flight-sql-experimental", "tls"] }
arrow-ipc = { version = "53" }
arrow-ord = { version = "53" }
arrow-schema = { version = "53", features = ["serde"] }
arrow-schema = { version = "53.4", features = ["serde"] }
arrow-select = { version = "53" }
arrow-udf-js = { version = "0.5.0" }
arrow-udf-python = { version = "0.4.0" }
Expand Down Expand Up @@ -323,10 +323,12 @@ http = "1"
humantime = "2.1.0"
hyper = "1"
hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] }
iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
iceberg = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11", features = [
"storage-all",
] }
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" }
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" }
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" }
indexmap = "2.0.0"
indicatif = "0.17.5"
itertools = "0.13.0"
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/src/scalars/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ fn register_grouping(registry: &mut FunctionRegistry) {
.collect::<Vec<_>>();
Value::Column(Column::Number(NumberColumn::UInt32(output.into())))
}
_ => unreachable!(),
v => unreachable!("unexpected value type for grouping function: {:?}", v),
}),
},
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ impl HashJoinProbeState {
}

if FROM_LEFT_SINGLE && match_count > 1 {
return Err(ErrorCode::Internal(
"Scalar subquery can't return more than one row",
));
return Err(ErrorCode::Internal(format!(
"Scalar subquery can't return more than one row, but got {}",
match_count
)));
}

// Fill `probe_indexes`.
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/ddl/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ impl Binder {
) -> Result<Plan> {
let ShowDictionariesStmt { database, limit } = stmt;

let mut select_builder = SelectBuilder::from("system.dictionaries");
let mut select_builder = SelectBuilder::from("default.system.dictionaries");

select_builder
.with_column("database AS Database")
Expand Down
6 changes: 3 additions & 3 deletions src/query/sql/src/planner/binder/ddl/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ impl Binder {
let database = self.check_database_exist(catalog, database).await?;

let mut select_builder = if *full {
SelectBuilder::from("system.streams")
SelectBuilder::from("default.system.streams")
} else {
SelectBuilder::from("system.streams_terse")
SelectBuilder::from("default.system.streams_terse")
};

if *full {
Expand Down Expand Up @@ -200,7 +200,7 @@ impl Binder {
let (catalog, database, stream) =
self.normalize_object_identifier_triple(catalog, database, stream);

let mut select_builder = SelectBuilder::from("system.streams");
let mut select_builder = SelectBuilder::from("default.system.streams");
select_builder
.with_column("created_on")
.with_column("name")
Expand Down
14 changes: 7 additions & 7 deletions src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ impl Binder {
let database = self.check_database_exist(catalog, database).await?;

let mut select_builder = if stmt.with_history {
SelectBuilder::from("system.tables_with_history")
SelectBuilder::from("default.system.tables_with_history")
} else {
SelectBuilder::from("system.tables")
SelectBuilder::from("default.system.tables")
};

if *full {
Expand Down Expand Up @@ -318,21 +318,21 @@ impl Binder {
// Use `system.tables` AS the "base" table to construct the result-set of `SHOW TABLE STATUS ..`
//
// To constraint the schema of the final result-set,
// `(select ${select_cols} from system.tables where ..)`
// `(select ${select_cols} from default.system.tables where ..)`
// is used AS a derived table.
// (unlike mysql, alias of derived table is not required in databend).
let query = match limit {
None => format!(
"SELECT {} FROM system.tables WHERE database = '{}' ORDER BY Name",
"SELECT {} FROM default.system.tables WHERE database = '{}' ORDER BY Name",
select_cols, database
),
Some(ShowLimit::Like { pattern }) => format!(
"SELECT * from (SELECT {} FROM system.tables WHERE database = '{}') \
"SELECT * from (SELECT {} FROM default.system.tables WHERE database = '{}') \
WHERE Name LIKE '{}' ORDER BY Name",
select_cols, database, pattern
),
Some(ShowLimit::Where { selection }) => format!(
"SELECT * from (SELECT {} FROM system.tables WHERE database = '{}') \
"SELECT * from (SELECT {} FROM default.system.tables WHERE database = '{}') \
WHERE ({}) ORDER BY Name",
select_cols, database, selection
),
Expand All @@ -353,7 +353,7 @@ impl Binder {

let database = self.check_database_exist(&None, database).await?;

let mut select_builder = SelectBuilder::from("system.tables_with_history");
let mut select_builder = SelectBuilder::from("default.system.tables_with_history");

select_builder
.with_column("name AS Tables")
Expand Down
4 changes: 2 additions & 2 deletions src/query/sql/src/planner/binder/ddl/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ impl Binder {
let database = self.check_database_exist(catalog, database).await?;

let mut select_builder = if stmt.with_history {
SelectBuilder::from("system.views_with_history")
SelectBuilder::from("default.system.views_with_history")
} else {
SelectBuilder::from("system.views")
SelectBuilder::from("default.system.views")
};

if *full {
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/ddl/virtual_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ impl Binder {
}
};

let mut select_builder = SelectBuilder::from("system.virtual_columns");
let mut select_builder = SelectBuilder::from("default.system.virtual_columns");
select_builder
.with_column("database")
.with_column("table")
Expand Down
9 changes: 7 additions & 2 deletions src/query/sql/src/planner/plans/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,15 @@ impl Operator for Scan {
continue;
}
if let Some(col_stat) = v.clone() {
// Safe to unwrap: min, max and ndv are all `Some(_)`.
// Safe to unwrap: min, max are all `Some(_)`.
let min = col_stat.min.unwrap();
let max = col_stat.max.unwrap();
let ndv = col_stat.ndv.unwrap();
// ndv could be `None`, we will use `num_rows - null_count` as ndv instead.
//
// NOTE: don't touch the original num_rows, since it will be used in other places.
let ndv = col_stat
.ndv
.unwrap_or_else(|| num_rows.saturating_sub(col_stat.null_count));
let histogram = if let Some(histogram) = self.statistics.histograms.get(k)
&& histogram.is_some()
{
Expand Down
3 changes: 3 additions & 0 deletions src/query/storages/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ databend-common-meta-app = { workspace = true }
databend-common-meta-store = { workspace = true }
databend-common-meta-types = { workspace = true }
databend-common-pipeline-core = { workspace = true }
databend-common-storage = { workspace = true }
databend-common-storages-parquet = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
fastrace = { workspace = true }
Expand All @@ -28,9 +29,11 @@ iceberg = { workspace = true }
iceberg-catalog-glue = { workspace = true }
iceberg-catalog-hms = { workspace = true }
iceberg-catalog-rest = { workspace = true }
log = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
typetag = { workspace = true }
uuid = { workspace = true }

[lints]
workspace = true
1 change: 1 addition & 0 deletions src/query/storages/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod catalog;
mod database;
mod partition;
mod predicate;
mod statistics;
mod table;
mod table_source;

Expand Down
72 changes: 36 additions & 36 deletions src/query/storages/iceberg/src/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ use databend_common_expression::Scalar;
use iceberg::expr::Predicate;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use log::debug;

#[derive(Default, Copy, Clone, Debug)]
pub struct PredicateBuilder {
uncertain: bool,
}
pub struct PredicateBuilder;

impl PredicateBuilder {
pub fn build(&mut self, expr: &RemoteExpr<String>) -> Predicate {
pub fn build(expr: &RemoteExpr<String>) -> (bool, Predicate) {
match expr {
RemoteExpr::Constant {
span: _,
Expand All @@ -36,9 +34,9 @@ impl PredicateBuilder {
let value = scalar.as_boolean();
let is_true = value.copied().unwrap_or(false);
if is_true {
Predicate::AlwaysTrue
(false, Predicate::AlwaysTrue)
} else {
Predicate::AlwaysFalse
(false, Predicate::AlwaysFalse)
}
}

Expand All @@ -50,14 +48,14 @@ impl PredicateBuilder {
args,
return_type: _,
} if args.len() == 1 && id.name().as_ref() == "is_true" => {
let predicate = self.build(&args[0]);
if self.uncertain {
return Predicate::AlwaysTrue;
let (uncertain, predicate) = Self::build(&args[0]);
if uncertain {
return (uncertain, Predicate::AlwaysTrue);
}
match predicate {
Predicate::AlwaysTrue => Predicate::AlwaysTrue,
Predicate::AlwaysFalse => Predicate::AlwaysFalse,
_ => predicate,
Predicate::AlwaysTrue => (false, Predicate::AlwaysTrue),
Predicate::AlwaysFalse => (false, Predicate::AlwaysFalse),
_ => (false, predicate),
}
}

Expand All @@ -72,10 +70,9 @@ impl PredicateBuilder {
let (_, name, _, _) = args[0].as_column_ref().unwrap();
let r = Reference::new(name);
if let Some(op) = build_unary(r, id.name().as_ref()) {
return op;
return (false, op);
}
self.uncertain = true;
Predicate::AlwaysTrue
(true, Predicate::AlwaysTrue)
}

// not
Expand All @@ -86,15 +83,18 @@ impl PredicateBuilder {
args,
return_type: _,
} if args.len() == 1 && id.name().as_ref() == "not" => {
let predicate = self.build(&args[0]);
if self.uncertain {
return Predicate::AlwaysTrue;
let (uncertain, predicate) = Self::build(&args[0]);
if uncertain {
return (true, Predicate::AlwaysTrue);
}
match predicate {

let predicate = match predicate {
Predicate::AlwaysTrue => Predicate::AlwaysFalse,
Predicate::AlwaysFalse => Predicate::AlwaysTrue,
_ => predicate.negate(),
}
};

(false, predicate)
}

// binary {a op datum}
Expand All @@ -105,16 +105,18 @@ impl PredicateBuilder {
args,
return_type: _,
} if args.len() == 2 && ["and", "and_filters", "or"].contains(&id.name().as_ref()) => {
let left = self.build(&args[0]);
let right = self.build(&args[1]);
if self.uncertain {
return Predicate::AlwaysTrue;
let (left_uncertain, left) = Self::build(&args[0]);
let (right_uncertain, right) = Self::build(&args[1]);
if left_uncertain || right_uncertain {
return (true, Predicate::AlwaysTrue);
}
match id.name().as_ref() {
let predicate = match id.name().as_ref() {
"and" | "and_filters" => left.and(right),
"or" => left.or(right),
_ => unreachable!(),
}
};

(false, predicate)
}

// binary {a op datum}
Expand All @@ -135,11 +137,10 @@ impl PredicateBuilder {
let r = Reference::new(name);
let p = build_binary(r, id.name().as_ref(), datum);
if let Some(op) = p {
return op;
return (false, op);
}
}
self.uncertain = true;
Predicate::AlwaysTrue
(true, Predicate::AlwaysTrue)
}

// binary {datum op a}
Expand All @@ -160,16 +161,15 @@ impl PredicateBuilder {
let r = Reference::new(name);
let p = build_reverse_binary(r, id.name().as_ref(), datum);
if let Some(op) = p {
return op;
return (false, op);
}
}
self.uncertain = true;
Predicate::AlwaysTrue
(true, Predicate::AlwaysTrue)
}

_ => {
self.uncertain = true;
Predicate::AlwaysTrue
v => {
debug!("predicate build for {v:?} is nit supported yet");
(true, Predicate::AlwaysTrue)
}
}
}
Expand Down
Loading