diff --git a/Cargo.lock b/Cargo.lock index e778bf359c6d9..80d38f10cba70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -331,9 +331,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91f2dfd1a7ec0aca967dfaa616096aec49779adc8eccec005e2f5e4111b1192a" +checksum = "31dce77d2985522288edae7206bffd5fc4996491841dda01a13a58415867e681" dependencies = [ "arrow-array", "arrow-buffer", @@ -346,9 +346,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d39387ca628be747394890a6e47f138ceac1aa912eab64f02519fed24b637af8" +checksum = "2d45fe6d3faed0435b7313e59a02583b14c6c6339fa7729e94c32a20af319a79" dependencies = [ "ahash 0.8.11", "arrow-buffer", @@ -357,15 +357,15 @@ dependencies = [ "chrono", "chrono-tz 0.10.0", "half", - "hashbrown 0.14.5", + "hashbrown 0.15.2", "num", ] [[package]] name = "arrow-buffer" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e51e05228852ffe3eb391ce7178a0f97d2cf80cc6ef91d3c4a6b3cb688049ec" +checksum = "2b02656a35cc103f28084bc80a0159668e0a680d919cef127bd7e0aaccb06ec1" dependencies = [ "bytes", "half", @@ -374,9 +374,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d09aea56ec9fa267f3f3f6cdab67d8a9974cbba90b3aa38c8fe9d0bb071bd8c1" +checksum = "c73c6233c5b5d635a56f6010e6eb1ab9e30e94707db21cea03da317f67d84cf3" dependencies = [ "arrow-array", "arrow-buffer", @@ -414,9 +414,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b98ae0af50890b494cebd7d6b04b35e896205c1d1df7b29a6272c5d0d0249ef5" +checksum = "b7f2861ffa86f107b8ab577d86cff7c7a490243eabe961ba1e1af4f27542bb79" dependencies = [ "arrow-buffer", "arrow-schema", @@ -454,9 +454,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ed91bdeaff5a1c00d28d8f73466bcb64d32bbd7093b5a30156b4b9f4dba3eee" +checksum = "0270dc511f11bb5fa98a25020ad51a99ca5b08d8a8dfbd17503bb9dba0388f0b" dependencies = [ "arrow-array", "arrow-buffer", @@ -490,9 +490,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2883d7035e0b600fb4c30ce1e50e66e53d8656aa729f2bfa4b51d359cf3ded52" +checksum = "c6f202a879d287099139ff0d121e7f55ae5e0efe634b8cf2106ebc27a8715dee" dependencies = [ "arrow-array", "arrow-buffer", @@ -519,18 +519,18 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "539ada65246b949bd99ffa0881a9a15a4a529448af1a07a9838dd78617dafab1" +checksum = "9579b9d8bce47aa41389fe344f2c6758279983b7c0ebb4013e283e3e91bb450e" dependencies = [ "serde", ] [[package]] name = "arrow-select" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6259e566b752da6dceab91766ed8b2e67bf6270eb9ad8a6e07a33c1bede2b125" +checksum = "7471ba126d0b0aaa24b50a36bc6c25e4e74869a1fd1a5553357027a0b1c8d1f1" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -542,9 +542,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3179ccbd18ebf04277a095ba7321b93fd1f774f18816bd5f6b3ce2f594edb6c" +checksum = "72993b01cb62507b06f1fb49648d7286c8989ecfabdb7b77a750fcb54410731b" dependencies = [ "arrow-array", "arrow-buffer", @@ -620,7 +620,7 @@ dependencies = [ "dashmap 5.5.3", "futures", "itertools 0.10.5", - "loom", + "loom 0.5.6", "once_cell", "pin-project-lite", "rustc-hash 1.1.0", @@ -768,9 +768,9 @@ checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" [[package]] name = "async-trait" -version = "0.1.81" +version = "0.1.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" +checksum = "3f934833b4b7233644e5848f235df3f57ed8c80f1528a26c3dfa13d2147fa056" dependencies = [ "proc-macro2", "quote", @@ -4224,6 +4224,7 @@ dependencies = [ "databend-common-meta-store", "databend-common-meta-types", "databend-common-pipeline-core", + "databend-common-storage", "databend-common-storages-parquet", "databend-storages-common-table-meta", "fastrace", @@ -4232,9 +4233,11 @@ dependencies = [ "iceberg-catalog-glue", "iceberg-catalog-hms", "iceberg-catalog-rest", + "log", "serde", "serde_json", "typetag", + "uuid", ] [[package]] @@ -6724,6 +6727,19 @@ dependencies = [ "windows 0.48.0", ] +[[package]] +name = "generator" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc6bd114ceda131d3b1d665eba35788690ad37f5916457286b32ab6fd3c438dd" +dependencies = [ + "cfg-if", + "libc", + "log", + "rustversion", + "windows 0.58.0", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -8329,7 +8345,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.52.0", ] [[package]] @@ -8343,8 +8359,8 @@ dependencies = [ [[package]] name = "iceberg" -version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=01d706a1#01d706a1b01a3f93cbc9f4a43b3030b963104e47" +version = "0.4.0" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=56fa9d11#56fa9d1196fe33b80726ee8195ba21320b5f8952" dependencies = [ "anyhow", "apache-avro", @@ -8369,7 +8385,7 @@ dependencies = [ "murmur3", "num-bigint", "once_cell", - "opendal 0.50.1", + "opendal 0.51.1", "ordered-float 4.5.0", "parquet", "paste", @@ -8391,8 +8407,8 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" -version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=01d706a1#01d706a1b01a3f93cbc9f4a43b3030b963104e47" +version = "0.4.0" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=56fa9d11#56fa9d1196fe33b80726ee8195ba21320b5f8952" dependencies = [ "anyhow", "async-trait", @@ -8408,8 +8424,8 @@ dependencies = [ [[package]] name = "iceberg-catalog-hms" -version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=01d706a1#01d706a1b01a3f93cbc9f4a43b3030b963104e47" +version = "0.4.0" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=56fa9d11#56fa9d1196fe33b80726ee8195ba21320b5f8952" dependencies = [ "anyhow", "async-trait", @@ -8427,8 +8443,8 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" -version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=01d706a1#01d706a1b01a3f93cbc9f4a43b3030b963104e47" +version = "0.4.0" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=56fa9d11#56fa9d1196fe33b80726ee8195ba21320b5f8952" dependencies = [ "async-trait", "chrono", @@ -9379,7 +9395,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" dependencies = [ "cfg-if", - "generator", + "generator 0.7.5", "scoped-tls", "serde", "serde_json", @@ -9387,6 +9403,19 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator 0.8.4", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lru" version = "0.12.4" @@ -9697,25 +9726,23 @@ dependencies = [ [[package]] name = "moka" -version = "0.12.8" +version = "0.12.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" +checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926" dependencies = [ "async-lock", - "async-trait", "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", "event-listener 5.3.1", "futures-util", - "once_cell", + "loom 0.7.2", "parking_lot 0.12.3", - "quanta", + "portable-atomic", "rustc_version", "smallvec", "tagptr", "thiserror", - "triomphe", "uuid", ] @@ -10277,9 +10304,8 @@ dependencies = [ [[package]] name = "opendal" -version = "0.50.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213222b6c86949314d8f51acb26d8241e7c8dd0879b016a79471d49f21ee592f" +version = "0.51.1" +source = "git+https://github.com/apache/opendal?rev=b8a3b7a#b8a3b7aa093d8695d89d579e146907850bb6565c" dependencies = [ "anyhow", "async-trait", @@ -10288,7 +10314,6 @@ dependencies = [ "bytes", "chrono", "crc32c", - "flagset", "futures", "getrandom", "http 1.1.0", @@ -10709,9 +10734,9 @@ dependencies = [ [[package]] name = "parquet" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dea02606ba6f5e856561d8d507dba8bac060aefca2a6c0f1aa1d361fed91ff3e" +checksum = "8957c0c95a6a1804f3e51a18f69df29be53856a8c5768cc9b6d00fcafcd2917c" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -10728,7 +10753,7 @@ dependencies = [ "flate2", "futures", "half", - "hashbrown 0.14.5", + "hashbrown 0.15.2", "lz4_flex", "num", "num-bigint", @@ -11676,21 +11701,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" -[[package]] -name = "quanta" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" -dependencies = [ - "crossbeam-utils", - "libc", - "once_cell", - "raw-cpuid", - "wasi", - "web-sys", - "winapi", -] - [[package]] name = "quick-error" version = "1.2.3" @@ -11915,15 +11925,6 @@ dependencies = [ "random-number", ] -[[package]] -name = "raw-cpuid" -version = "11.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb9ee317cfe3fbd54b36a511efc1edd42e216903c9cd575e686dd68a2ba90d8d" -dependencies = [ - "bitflags 2.6.0", -] - [[package]] name = "rawpointer" version = "0.2.1" @@ -13609,7 +13610,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b8c4a4445d81357df8b1a650d0d0d6fbbbfe99d064aa5e02f3e4022061476d8" dependencies = [ - "loom", + "loom 0.5.6", ] [[package]] @@ -14787,12 +14788,6 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" -[[package]] -name = "triomphe" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" - [[package]] name = "triple_accel" version = "0.3.4" @@ -15893,7 +15888,17 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ - "windows-core", + "windows-core 0.52.0", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd04d41d93c4992d421894c18c8b43496aa748dd4c081bac0dc93eb0489272b6" +dependencies = [ + "windows-core 0.58.0", "windows-targets 0.52.6", ] @@ -15906,6 +15911,60 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-implement" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.95", +] + +[[package]] +name = "windows-interface" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.95", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.42.0" diff --git a/Cargo.toml b/Cargo.toml index c63ff5e1a0be9..5b6d5adc1762b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -227,7 +227,7 @@ arrow-data = { version = "53" } arrow-flight = { version = "53", features = ["flight-sql-experimental", "tls"] } arrow-ipc = { version = "53" } arrow-ord = { version = "53" } -arrow-schema = { version = "53", features = ["serde"] } +arrow-schema = { version = "53.4", features = ["serde"] } arrow-select = { version = "53" } arrow-udf-js = { version = "0.5.0" } arrow-udf-python = { version = "0.4.0" } @@ -323,10 +323,12 @@ http = "1" humantime = "2.1.0" hyper = "1" hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] } -iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" } -iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" } -iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" } -iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" } +iceberg = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11", features = [ + "storage-all", +] } +iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" } +iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" } +iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" } indexmap = "2.0.0" indicatif = "0.17.5" itertools = "0.13.0" diff --git a/src/query/functions/src/scalars/other.rs b/src/query/functions/src/scalars/other.rs index 4fe4b43cd17d1..62e5657d16e1b 100644 --- a/src/query/functions/src/scalars/other.rs +++ b/src/query/functions/src/scalars/other.rs @@ -383,7 +383,7 @@ fn register_grouping(registry: &mut FunctionRegistry) { .collect::>(); Value::Column(Column::Number(NumberColumn::UInt32(output.into()))) } - _ => unreachable!(), + v => unreachable!("unexpected value type for grouping function: {:?}", v), }), }, })) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs index 51bcbbbb4cc6b..f0fd7ccc95c6a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs @@ -90,9 +90,10 @@ impl HashJoinProbeState { } if FROM_LEFT_SINGLE && match_count > 1 { - return Err(ErrorCode::Internal( - "Scalar subquery can't return more than one row", - )); + return Err(ErrorCode::Internal(format!( + "Scalar subquery can't return more than one row, but got {}", + match_count + ))); } // Fill `probe_indexes`. diff --git a/src/query/sql/src/planner/binder/ddl/dictionary.rs b/src/query/sql/src/planner/binder/ddl/dictionary.rs index 94099664073da..be52a193143a6 100644 --- a/src/query/sql/src/planner/binder/ddl/dictionary.rs +++ b/src/query/sql/src/planner/binder/ddl/dictionary.rs @@ -400,7 +400,7 @@ impl Binder { ) -> Result { let ShowDictionariesStmt { database, limit } = stmt; - let mut select_builder = SelectBuilder::from("system.dictionaries"); + let mut select_builder = SelectBuilder::from("default.system.dictionaries"); select_builder .with_column("database AS Database") diff --git a/src/query/sql/src/planner/binder/ddl/stream.rs b/src/query/sql/src/planner/binder/ddl/stream.rs index a18914afd7615..74137ec63de7f 100644 --- a/src/query/sql/src/planner/binder/ddl/stream.rs +++ b/src/query/sql/src/planner/binder/ddl/stream.rs @@ -127,9 +127,9 @@ impl Binder { let database = self.check_database_exist(catalog, database).await?; let mut select_builder = if *full { - SelectBuilder::from("system.streams") + SelectBuilder::from("default.system.streams") } else { - SelectBuilder::from("system.streams_terse") + SelectBuilder::from("default.system.streams_terse") }; if *full { @@ -200,7 +200,7 @@ impl Binder { let (catalog, database, stream) = self.normalize_object_identifier_triple(catalog, database, stream); - let mut select_builder = SelectBuilder::from("system.streams"); + let mut select_builder = SelectBuilder::from("default.system.streams"); select_builder .with_column("created_on") .with_column("name") diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 401a03d381898..6a38f06de3b6c 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -174,9 +174,9 @@ impl Binder { let database = self.check_database_exist(catalog, database).await?; let mut select_builder = if stmt.with_history { - SelectBuilder::from("system.tables_with_history") + SelectBuilder::from("default.system.tables_with_history") } else { - SelectBuilder::from("system.tables") + SelectBuilder::from("default.system.tables") }; if *full { @@ -318,21 +318,21 @@ impl Binder { // Use `system.tables` AS the "base" table to construct the result-set of `SHOW TABLE STATUS ..` // // To constraint the schema of the final result-set, - // `(select ${select_cols} from system.tables where ..)` + // `(select ${select_cols} from default.system.tables where ..)` // is used AS a derived table. // (unlike mysql, alias of derived table is not required in databend). let query = match limit { None => format!( - "SELECT {} FROM system.tables WHERE database = '{}' ORDER BY Name", + "SELECT {} FROM default.system.tables WHERE database = '{}' ORDER BY Name", select_cols, database ), Some(ShowLimit::Like { pattern }) => format!( - "SELECT * from (SELECT {} FROM system.tables WHERE database = '{}') \ + "SELECT * from (SELECT {} FROM default.system.tables WHERE database = '{}') \ WHERE Name LIKE '{}' ORDER BY Name", select_cols, database, pattern ), Some(ShowLimit::Where { selection }) => format!( - "SELECT * from (SELECT {} FROM system.tables WHERE database = '{}') \ + "SELECT * from (SELECT {} FROM default.system.tables WHERE database = '{}') \ WHERE ({}) ORDER BY Name", select_cols, database, selection ), @@ -353,7 +353,7 @@ impl Binder { let database = self.check_database_exist(&None, database).await?; - let mut select_builder = SelectBuilder::from("system.tables_with_history"); + let mut select_builder = SelectBuilder::from("default.system.tables_with_history"); select_builder .with_column("name AS Tables") diff --git a/src/query/sql/src/planner/binder/ddl/view.rs b/src/query/sql/src/planner/binder/ddl/view.rs index 7a8321cc05a09..031f60e63bbee 100644 --- a/src/query/sql/src/planner/binder/ddl/view.rs +++ b/src/query/sql/src/planner/binder/ddl/view.rs @@ -157,9 +157,9 @@ impl Binder { let database = self.check_database_exist(catalog, database).await?; let mut select_builder = if stmt.with_history { - SelectBuilder::from("system.views_with_history") + SelectBuilder::from("default.system.views_with_history") } else { - SelectBuilder::from("system.views") + SelectBuilder::from("default.system.views") }; if *full { diff --git a/src/query/sql/src/planner/binder/ddl/virtual_column.rs b/src/query/sql/src/planner/binder/ddl/virtual_column.rs index 02386502a8d0d..b73cc0ff9df71 100644 --- a/src/query/sql/src/planner/binder/ddl/virtual_column.rs +++ b/src/query/sql/src/planner/binder/ddl/virtual_column.rs @@ -385,7 +385,7 @@ impl Binder { } }; - let mut select_builder = SelectBuilder::from("system.virtual_columns"); + let mut select_builder = SelectBuilder::from("default.system.virtual_columns"); select_builder .with_column("database") .with_column("table") diff --git a/src/query/sql/src/planner/plans/scan.rs b/src/query/sql/src/planner/plans/scan.rs index 67d250067d7a7..1040bf97049d2 100644 --- a/src/query/sql/src/planner/plans/scan.rs +++ b/src/query/sql/src/planner/plans/scan.rs @@ -234,10 +234,15 @@ impl Operator for Scan { continue; } if let Some(col_stat) = v.clone() { - // Safe to unwrap: min, max and ndv are all `Some(_)`. + // Safe to unwrap: min, max are all `Some(_)`. let min = col_stat.min.unwrap(); let max = col_stat.max.unwrap(); - let ndv = col_stat.ndv.unwrap(); + // ndv could be `None`, we will use `num_rows - null_count` as ndv instead. + // + // NOTE: don't touch the original num_rows, since it will be used in other places. + let ndv = col_stat + .ndv + .unwrap_or_else(|| num_rows.saturating_sub(col_stat.null_count)); let histogram = if let Some(histogram) = self.statistics.histograms.get(k) && histogram.is_some() { diff --git a/src/query/storages/iceberg/Cargo.toml b/src/query/storages/iceberg/Cargo.toml index a1a3a34d6d200..e1a8547496774 100644 --- a/src/query/storages/iceberg/Cargo.toml +++ b/src/query/storages/iceberg/Cargo.toml @@ -20,6 +20,7 @@ databend-common-meta-app = { workspace = true } databend-common-meta-store = { workspace = true } databend-common-meta-types = { workspace = true } databend-common-pipeline-core = { workspace = true } +databend-common-storage = { workspace = true } databend-common-storages-parquet = { workspace = true } databend-storages-common-table-meta = { workspace = true } fastrace = { workspace = true } @@ -28,9 +29,11 @@ iceberg = { workspace = true } iceberg-catalog-glue = { workspace = true } iceberg-catalog-hms = { workspace = true } iceberg-catalog-rest = { workspace = true } +log = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } typetag = { workspace = true } +uuid = { workspace = true } [lints] workspace = true diff --git a/src/query/storages/iceberg/src/lib.rs b/src/query/storages/iceberg/src/lib.rs index 8949eee3c52d1..c423cd8d0925e 100644 --- a/src/query/storages/iceberg/src/lib.rs +++ b/src/query/storages/iceberg/src/lib.rs @@ -23,6 +23,7 @@ mod catalog; mod database; mod partition; mod predicate; +mod statistics; mod table; mod table_source; diff --git a/src/query/storages/iceberg/src/predicate.rs b/src/query/storages/iceberg/src/predicate.rs index 11a69727710cc..059b4a20e1787 100644 --- a/src/query/storages/iceberg/src/predicate.rs +++ b/src/query/storages/iceberg/src/predicate.rs @@ -19,14 +19,12 @@ use databend_common_expression::Scalar; use iceberg::expr::Predicate; use iceberg::expr::Reference; use iceberg::spec::Datum; +use log::debug; -#[derive(Default, Copy, Clone, Debug)] -pub struct PredicateBuilder { - uncertain: bool, -} +pub struct PredicateBuilder; impl PredicateBuilder { - pub fn build(&mut self, expr: &RemoteExpr) -> Predicate { + pub fn build(expr: &RemoteExpr) -> (bool, Predicate) { match expr { RemoteExpr::Constant { span: _, @@ -36,9 +34,9 @@ impl PredicateBuilder { let value = scalar.as_boolean(); let is_true = value.copied().unwrap_or(false); if is_true { - Predicate::AlwaysTrue + (false, Predicate::AlwaysTrue) } else { - Predicate::AlwaysFalse + (false, Predicate::AlwaysFalse) } } @@ -50,14 +48,14 @@ impl PredicateBuilder { args, return_type: _, } if args.len() == 1 && id.name().as_ref() == "is_true" => { - let predicate = self.build(&args[0]); - if self.uncertain { - return Predicate::AlwaysTrue; + let (uncertain, predicate) = Self::build(&args[0]); + if uncertain { + return (uncertain, Predicate::AlwaysTrue); } match predicate { - Predicate::AlwaysTrue => Predicate::AlwaysTrue, - Predicate::AlwaysFalse => Predicate::AlwaysFalse, - _ => predicate, + Predicate::AlwaysTrue => (false, Predicate::AlwaysTrue), + Predicate::AlwaysFalse => (false, Predicate::AlwaysFalse), + _ => (false, predicate), } } @@ -72,10 +70,9 @@ impl PredicateBuilder { let (_, name, _, _) = args[0].as_column_ref().unwrap(); let r = Reference::new(name); if let Some(op) = build_unary(r, id.name().as_ref()) { - return op; + return (false, op); } - self.uncertain = true; - Predicate::AlwaysTrue + (true, Predicate::AlwaysTrue) } // not @@ -86,15 +83,18 @@ impl PredicateBuilder { args, return_type: _, } if args.len() == 1 && id.name().as_ref() == "not" => { - let predicate = self.build(&args[0]); - if self.uncertain { - return Predicate::AlwaysTrue; + let (uncertain, predicate) = Self::build(&args[0]); + if uncertain { + return (true, Predicate::AlwaysTrue); } - match predicate { + + let predicate = match predicate { Predicate::AlwaysTrue => Predicate::AlwaysFalse, Predicate::AlwaysFalse => Predicate::AlwaysTrue, _ => predicate.negate(), - } + }; + + (false, predicate) } // binary {a op datum} @@ -105,16 +105,18 @@ impl PredicateBuilder { args, return_type: _, } if args.len() == 2 && ["and", "and_filters", "or"].contains(&id.name().as_ref()) => { - let left = self.build(&args[0]); - let right = self.build(&args[1]); - if self.uncertain { - return Predicate::AlwaysTrue; + let (left_uncertain, left) = Self::build(&args[0]); + let (right_uncertain, right) = Self::build(&args[1]); + if left_uncertain || right_uncertain { + return (true, Predicate::AlwaysTrue); } - match id.name().as_ref() { + let predicate = match id.name().as_ref() { "and" | "and_filters" => left.and(right), "or" => left.or(right), _ => unreachable!(), - } + }; + + (false, predicate) } // binary {a op datum} @@ -135,11 +137,10 @@ impl PredicateBuilder { let r = Reference::new(name); let p = build_binary(r, id.name().as_ref(), datum); if let Some(op) = p { - return op; + return (false, op); } } - self.uncertain = true; - Predicate::AlwaysTrue + (true, Predicate::AlwaysTrue) } // binary {datum op a} @@ -160,16 +161,15 @@ impl PredicateBuilder { let r = Reference::new(name); let p = build_reverse_binary(r, id.name().as_ref(), datum); if let Some(op) = p { - return op; + return (false, op); } } - self.uncertain = true; - Predicate::AlwaysTrue + (true, Predicate::AlwaysTrue) } - _ => { - self.uncertain = true; - Predicate::AlwaysTrue + v => { + debug!("predicate build for {v:?} is nit supported yet"); + (true, Predicate::AlwaysTrue) } } } diff --git a/src/query/storages/iceberg/src/statistics.rs b/src/query/storages/iceberg/src/statistics.rs new file mode 100644 index 0000000000000..0956dd4b6eb40 --- /dev/null +++ b/src/query/storages/iceberg/src/statistics.rs @@ -0,0 +1,253 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use databend_common_base::base::OrderedFloat; +use databend_common_catalog::statistics; +use databend_common_catalog::statistics::BasicColumnStatistics; +use databend_common_catalog::table::ColumnStatisticsProvider; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::Decimal; +use databend_common_expression::types::DecimalSize; +use databend_common_expression::types::Number; +use databend_common_expression::types::F32; +use databend_common_expression::types::F64; +use databend_common_expression::ColumnId; +use databend_common_expression::Scalar; +use databend_common_expression::TableField; +use databend_common_storage::Datum as DatabendDatum; +use iceberg::spec::DataContentType; +use iceberg::spec::Datum; +use iceberg::spec::ManifestStatus; +use iceberg::spec::PrimitiveLiteral; +use iceberg::spec::PrimitiveType; +use uuid::Uuid; + +use crate::IcebergTable; + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Default)] +pub struct IcebergStatistics { + /// Number of records in this table + pub record_count: u64, + /// Total table size in bytes + pub file_size_in_bytes: u64, + /// Number of manifest files in this table + pub number_of_manifest_files: u64, + /// Number of data files in this table + pub number_of_data_files: u64, + + /// Computed statistics for each column + pub computed_statistics: HashMap, +} + +impl IcebergStatistics { + /// Get statistics of an iceberg table. + pub async fn parse(table: &iceberg::table::Table) -> Result { + let Some(snapshot) = table.metadata().current_snapshot() else { + return Ok(IcebergStatistics::default()); + }; + + // Map from column id to the total size on disk of all regions that + // store the column. Does not include bytes necessary to read other + // columns, like footers. Leave null for row-oriented formats (Avro) + let mut column_sizes: HashMap = HashMap::new(); + // Map from column id to number of values in the column (including null + // and NaN values) + let mut value_counts: HashMap = HashMap::new(); + // Map from column id to number of null values in the column + let mut null_value_counts: HashMap = HashMap::new(); + // Map from column id to number of NaN values in the column + let mut nan_value_counts: HashMap = HashMap::new(); + // Map from column id to lower bound in the column serialized as biary. + // Each value must be less than or equal to all non-null, non-NaN values + // in the column for the file. + // Reference: + // + // - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) + let mut lower_bounds: HashMap = HashMap::new(); + // Map from column id to upper bound in the column serialized as binary. + // Each value must be greater than or equal to all non-null, non-Nan + // values in the column for the file. + // + // Reference: + // + // - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) + let mut upper_bounds: HashMap = HashMap::new(); + + let mut statistics = IcebergStatistics::default(); + + let manifest = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .map_err(|err| ErrorCode::Internal(format!("load manifest list error: {err:?}")))?; + + for manifest_file in manifest.entries() { + statistics.number_of_manifest_files += 1; + + let manifest = manifest_file + .load_manifest(table.file_io()) + .await + .map_err(|err| ErrorCode::Internal(format!("load manifest file error: {err:?}")))?; + + manifest.entries().iter().for_each(|entry| { + if entry.status() == ManifestStatus::Deleted { + return; + } + let data_file = entry.data_file(); + if data_file.content_type() != DataContentType::Data { + return; + } + + statistics.record_count += data_file.record_count(); + statistics.file_size_in_bytes += data_file.file_size_in_bytes(); + statistics.number_of_data_files += 1; + + data_file.column_sizes().iter().for_each(|(col_id, size)| { + *column_sizes.entry(*col_id).or_default() += size; + }); + data_file.value_counts().iter().for_each(|(col_id, count)| { + *value_counts.entry(*col_id).or_default() += count; + }); + data_file + .null_value_counts() + .iter() + .for_each(|(col_id, count)| { + *null_value_counts.entry(*col_id).or_default() += count; + }); + data_file + .nan_value_counts() + .iter() + .for_each(|(col_id, count)| { + *nan_value_counts.entry(*col_id).or_default() += *count; + }); + + data_file + .lower_bounds() + .iter() + .for_each(|(&col_id, new_value)| { + lower_bounds + .entry(col_id) + .and_modify(|existing_value| { + if new_value < existing_value { + *existing_value = new_value.clone(); + } + }) + .or_insert_with(|| new_value.clone()); + }); + + data_file + .upper_bounds() + .iter() + .for_each(|(&col_id, new_value)| { + upper_bounds + .entry(col_id) + .and_modify(|existing_value| { + if new_value > existing_value { + *existing_value = new_value.clone(); + } + }) + .or_insert_with(|| new_value.clone()); + }); + }); + } + + let mut computed_statistics = HashMap::new(); + for field in IcebergTable::get_schema(table)?.fields() { + let column_stats = get_column_stats( + field, + &column_sizes, + &lower_bounds, + &upper_bounds, + &null_value_counts, + ); + computed_statistics.insert(field.column_id, column_stats); + } + + statistics.computed_statistics = computed_statistics; + Ok(statistics) + } +} + +impl ColumnStatisticsProvider for IcebergStatistics { + fn column_statistics(&self, column_id: ColumnId) -> Option<&statistics::BasicColumnStatistics> { + self.computed_statistics.get(&column_id) + } + + fn num_rows(&self) -> Option { + Some(self.record_count) + } +} + +/// Try get [`ColumnStatistics`] for one column. +fn get_column_stats( + field: &TableField, + _column_size: &HashMap, + lower: &HashMap, + upper: &HashMap, + null_counts: &HashMap, +) -> BasicColumnStatistics { + // The column id in iceberg is 1-based while the column id in Databend is 0-based. + let iceberg_col_id = field.column_id as i32 + 1; + BasicColumnStatistics { + min: lower + .get(&iceberg_col_id) + .and_then(parse_datum) + .and_then(DatabendDatum::from_scalar), + max: upper + .get(&iceberg_col_id) + .and_then(parse_datum) + .and_then(DatabendDatum::from_scalar), + ndv: None, + null_count: null_counts + .get(&iceberg_col_id) + .copied() + .unwrap_or_default(), + } +} + +/// Try to parse iceberg [`Datum`] to databend [`Scalar`]. +pub fn parse_datum(data: &Datum) -> Option { + match data.literal() { + PrimitiveLiteral::Boolean(v) => Some(Scalar::Boolean(*v)), + PrimitiveLiteral::Int(v) => Some(Scalar::Number(i32::upcast_scalar(*v))), + PrimitiveLiteral::Long(v) => Some(Scalar::Number(i64::upcast_scalar(*v))), + PrimitiveLiteral::Float(v) => { + Some(Scalar::Number(F32::upcast_scalar(OrderedFloat::from(v.0)))) + } + PrimitiveLiteral::Double(v) => { + Some(Scalar::Number(F64::upcast_scalar(OrderedFloat::from(v.0)))) + } + PrimitiveLiteral::String(v) => Some(Scalar::String(v.clone())), + PrimitiveLiteral::Binary(v) => Some(Scalar::Binary(v.clone())), + // Iceberg use i128 to represent decimal + PrimitiveLiteral::Int128(v) => { + if let PrimitiveType::Decimal { precision, scale } = data.data_type() { + Some(Scalar::Decimal(v.to_scalar(DecimalSize { + precision: *precision as u8, + scale: *scale as u8, + }))) + } else { + None + } + } + // Iceberg use u128 to represent uuid + PrimitiveLiteral::UInt128(v) => { + Some(Scalar::String(Uuid::from_u128(*v).as_simple().to_string())) + } + PrimitiveLiteral::AboveMax => None, + PrimitiveLiteral::BelowMin => None, + } +} diff --git a/src/query/storages/iceberg/src/stats.rs b/src/query/storages/iceberg/src/stats.rs deleted file mode 100644 index 95c9dc65bf4fb..0000000000000 --- a/src/query/storages/iceberg/src/stats.rs +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use databend_common_expression::types::Number; -use databend_common_expression::types::F32; -use databend_common_expression::types::F64; -use databend_common_expression::Scalar; -use databend_common_expression::TableField; -use databend_common_expression::TableSchema; -use databend_storages_common_table_meta::meta::ColumnStatistics; -use databend_storages_common_table_meta::meta::StatisticsOfColumns; -use iceberg::spec::DataFile; -use iceberg::spec::Datum; -use iceberg::spec::PrimitiveLiteral; -use ordered_float::OrderedFloat; - -/// Try to convert statistics in [`DataFile`] to [`StatisticsOfColumns`]. -pub fn get_stats_of_data_file(schema: &TableSchema, df: &DataFile) -> Option { - let mut stats: HashMap = HashMap::with_capacity(schema.num_fields()); - for field in schema.fields.iter() { - if let Some(stat) = get_column_stats( - field, - df.lower_bounds(), - df.upper_bounds(), - df.null_value_counts(), - ) { - stats.insert(field.column_id, stat); - } - } - Some(stats) -} - -/// Try get [`ColumnStatistics`] for one column. -fn get_column_stats( - field: &TableField, - lower: &HashMap, - upper: &HashMap, - null_counts: &HashMap, -) -> Option { - // The column id in iceberg is 1-based while the column id in Databend is 0-based. - let iceberg_col_id = field.column_id as i32 + 1; - match ( - lower.get(&iceberg_col_id), - upper.get(&iceberg_col_id), - null_counts.get(&iceberg_col_id), - ) { - (Some(lo), Some(up), Some(nc)) => { - let min = parse_datum(lo)?; - let max = parse_datum(up)?; - Some(ColumnStatistics::new( - min, max, *nc, 0, // this field is not used. - None, - )) - } - (_, _, _) => None, - } -} - -/// TODO: we need to support more types. -fn parse_datum(data: &Datum) -> Option { - match data.literal() { - PrimitiveLiteral::Boolean(v) => Some(Scalar::Boolean(*v)), - PrimitiveLiteral::Int(v) => Some(Scalar::Number(i32::upcast_scalar(*v))), - PrimitiveLiteral::Long(v) => Some(Scalar::Number(i64::upcast_scalar(*v))), - PrimitiveLiteral::Float(OrderedFloat(v)) => { - Some(Scalar::Number(F32::upcast_scalar(F32::from(*v)))) - } - PrimitiveLiteral::Double(OrderedFloat(v)) => { - Some(Scalar::Number(F64::upcast_scalar(F64::from(*v)))) - } - PrimitiveLiteral::String(v) => Some(Scalar::String(v.clone())), - _ => None, - } -} diff --git a/src/query/storages/iceberg/src/table.rs b/src/query/storages/iceberg/src/table.rs index 47fbacb5b79de..d08c96eb20dee 100644 --- a/src/query/storages/iceberg/src/table.rs +++ b/src/query/storages/iceberg/src/table.rs @@ -28,6 +28,7 @@ use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::ColumnStatisticsProvider; use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table::TableStatistics; @@ -47,6 +48,8 @@ use iceberg::io::FileIOBuilder; use crate::partition::IcebergPartInfo; use crate::predicate::PredicateBuilder; +use crate::statistics; +use crate::statistics::IcebergStatistics; use crate::table_source::IcebergTableSource; use crate::IcebergCatalog; @@ -58,13 +61,18 @@ pub struct IcebergTable { info: TableInfo, pub table: iceberg::table::Table, + statistics: IcebergStatistics, } impl IcebergTable { /// create a new table on the table directory pub fn try_create(info: TableInfo) -> Result> { - let table = Self::parse_engine_options(&info.meta.engine_options)?; - Ok(Box::new(Self { info, table })) + let (table, statistics) = Self::parse_engine_options(&info.meta.engine_options)?; + Ok(Box::new(Self { + info, + table, + statistics, + })) } pub fn description() -> StorageDescription { @@ -106,8 +114,11 @@ impl IcebergTable { /// /// We will never persist the `engine_options` to storage, so it's safe to change the implementation. /// As long as you make sure both [`build_engine_options`] and [`parse_engine_options`] been updated. - pub fn build_engine_options(table: &iceberg::table::Table) -> Result> { - let (file_io_scheme, file_io_props) = table.file_io().clone().into_props(); + pub fn build_engine_options( + table: &iceberg::table::Table, + statistics: &statistics::IcebergStatistics, + ) -> Result> { + let (file_io_scheme, file_io_props) = table.file_io().clone().into_builder().into_parts(); let file_io_props = serde_json::to_string(&file_io_props)?; let metadata_location = table .metadata_location() @@ -115,6 +126,7 @@ impl IcebergTable { .unwrap_or_default(); let metadata = serde_json::to_string(table.metadata())?; let identifier = serde_json::to_string(table.identifier())?; + let statistics = serde_json::to_string(statistics)?; Ok(BTreeMap::from_iter([ ("iceberg.file_io.scheme".to_string(), file_io_scheme), @@ -122,6 +134,7 @@ impl IcebergTable { ("iceberg.metadata_location".to_string(), metadata_location), ("iceberg.metadata".to_string(), metadata), ("iceberg.identifier".to_string(), identifier), + ("iceberg.statistics".to_string(), statistics), ])) } @@ -130,7 +143,7 @@ impl IcebergTable { /// See [`build_engine_options`] for more information. pub fn parse_engine_options( options: &BTreeMap, - ) -> Result { + ) -> Result<(iceberg::table::Table, statistics::IcebergStatistics)> { let file_io_scheme = options.get("iceberg.file_io.scheme").ok_or_else(|| { ErrorCode::ReadTableDataError( "Rebuild iceberg table failed: Missing iceberg.file_io.scheme", @@ -163,6 +176,13 @@ impl IcebergTable { ) })?)?; + let statistics: statistics::IcebergStatistics = + serde_json::from_str(options.get("iceberg.statistics").ok_or_else(|| { + ErrorCode::ReadTableDataError( + "Rebuild iceberg table failed: Missing iceberg.statistics", + ) + })?)?; + let file_io = FileIOBuilder::new(file_io_scheme) .with_props(file_io_props) .build() @@ -172,7 +192,7 @@ impl IcebergTable { )) })?; - iceberg::table::Table::builder() + let table = iceberg::table::Table::builder() .identifier(identifier) .metadata(metadata) .metadata_location(metadata_location) @@ -180,7 +200,9 @@ impl IcebergTable { .build() .map_err(|err| { ErrorCode::ReadTableDataError(format!("Rebuild iceberg table failed: {err:?}")) - }) + })?; + + Ok((table, statistics)) } /// create a new table on the table directory @@ -192,8 +214,9 @@ impl IcebergTable { ) -> Result { let table = Self::load_iceberg_table(&ctl, database_name, table_name).await?; let table_schema = Self::get_schema(&table)?; + let statistics = statistics::IcebergStatistics::parse(&table).await?; - let engine_options = Self::build_engine_options(&table)?; + let engine_options = Self::build_engine_options(&table, &statistics)?; // construct table info let info = TableInfo { @@ -211,7 +234,11 @@ impl IcebergTable { ..Default::default() }; - Ok(Self { info, table }) + Ok(Self { + info, + table, + statistics, + }) } pub fn do_read_data( @@ -253,7 +280,7 @@ impl IcebergTable { ); } if let Some(filter) = &push_downs.filters { - let predicate = PredicateBuilder::default().build(&filter.filter); + let (_, predicate) = PredicateBuilder::build(&filter.filter); scan = scan.with_filter(predicate) } } @@ -281,7 +308,7 @@ impl IcebergTable { .collect(); Ok(( - PartStatistics::new_estimated(None, read_rows, read_bytes, parts.len(), total_files), + PartStatistics::new_exact(read_rows, read_bytes, parts.len(), total_files), Partitions::create(PartitionsShuffleKind::Mod, parts), )) } @@ -305,14 +332,32 @@ impl Table for IcebergTable { &self.get_table_info().name } - // TODO load summary async fn table_statistics( &self, _ctx: Arc, _require_fresh: bool, _change_type: Option, ) -> Result> { - Ok(None) + let table = self.table.clone(); + if table.metadata().current_snapshot().is_none() { + return Ok(None); + }; + + let mut statistics = TableStatistics::default(); + statistics.num_rows = Some(self.statistics.record_count); + statistics.data_size_compressed = Some(self.statistics.file_size_in_bytes); + statistics.number_of_segments = Some(self.statistics.number_of_manifest_files); + statistics.number_of_blocks = Some(self.statistics.number_of_data_files); + + Ok(Some(statistics)) + } + + #[async_backtrace::framed] + async fn column_statistics_provider( + &self, + _ctx: Arc, + ) -> Result> { + Ok(Box::new(self.statistics.clone())) } #[async_backtrace::framed] @@ -347,4 +392,8 @@ impl Table for IcebergTable { fn support_prewhere(&self) -> bool { false } + + fn has_exact_total_row_count(&self) -> bool { + true + } } diff --git a/tests/sqllogictests/suites/tpch_iceberg/prune.test b/tests/sqllogictests/suites/tpch_iceberg/prune.test index c32b0ae6101f3..b0a6d04646405 100644 --- a/tests/sqllogictests/suites/tpch_iceberg/prune.test +++ b/tests/sqllogictests/suites/tpch_iceberg/prune.test @@ -32,7 +32,7 @@ EvalScalar ├── partitions total: 0 ├── partitions scanned: 0 ├── push downs: [filters: [is_true(lineitem.l_orderkey (#0) < 1)], limit: NONE] - └── estimated rows: 0.00 + └── estimated rows: 600572.00 query T explain select 1 from ctl.tpch.lineitem where l_orderkey < 1 or l_commitdate < '1992-01-31'; @@ -53,7 +53,7 @@ EvalScalar ├── partitions total: 0 ├── partitions scanned: 0 ├── push downs: [filters: [is_true((lineitem.l_orderkey (#0) < 1 OR lineitem.l_commitdate (#11) < '1992-01-31'))], limit: NONE] - └── estimated rows: 0.00 + └── estimated rows: 600572.00 query T explain select 1 from ctl.tpch.lineitem where l_orderkey < 1 and l_commitdate > '1992-01-31'; @@ -74,7 +74,7 @@ EvalScalar ├── partitions total: 0 ├── partitions scanned: 0 ├── push downs: [filters: [and_filters(lineitem.l_orderkey (#0) < 1, lineitem.l_commitdate (#11) > '1992-01-31')], limit: NONE] - └── estimated rows: 0.00 + └── estimated rows: 600572.00 query T explain select 1 from ctl.tpch.lineitem where l_orderkey > 1 and l_commitdate = '1992-01-22'; @@ -82,11 +82,11 @@ explain select 1 from ctl.tpch.lineitem where l_orderkey > 1 and l_commitdate = EvalScalar ├── output columns: [1 (#16)] ├── expressions: [1] -├── estimated rows: 0.00 +├── estimated rows: 1.00 └── Filter ├── output columns: [] ├── filters: [is_true(lineitem.l_orderkey (#0) > 1), is_true(lineitem.l_commitdate (#11) = '1992-01-22')] - ├── estimated rows: 0.00 + ├── estimated rows: 1.00 └── TableScan ├── table: ctl.tpch.lineitem ├── output columns: [l_orderkey (#0), l_commitdate (#11)] @@ -95,7 +95,7 @@ EvalScalar ├── partitions total: 0 ├── partitions scanned: 0 ├── push downs: [filters: [and_filters(lineitem.l_orderkey (#0) > 1, lineitem.l_commitdate (#11) = '1992-01-22')], limit: NONE] - └── estimated rows: 0.00 + └── estimated rows: 600572.00 query T @@ -117,7 +117,7 @@ EvalScalar ├── partitions total: 0 ├── partitions scanned: 0 ├── push downs: [filters: [NOT is_not_null(lineitem.l_orderkey (#0))], limit: NONE] - └── estimated rows: 0.00 + └── estimated rows: 600572.00 query T explain select 1 from ctl.tpch.lineitem where l_orderkey is null or l_commitdate is not null; @@ -125,11 +125,11 @@ explain select 1 from ctl.tpch.lineitem where l_orderkey is null or l_commitdat EvalScalar ├── output columns: [1 (#16)] ├── expressions: [1] -├── estimated rows: 0.00 +├── estimated rows: 600572.00 └── Filter ├── output columns: [] ├── filters: [(NOT is_not_null(lineitem.l_orderkey (#0)) OR is_not_null(lineitem.l_commitdate (#11)))] - ├── estimated rows: 0.00 + ├── estimated rows: 600572.00 └── TableScan ├── table: ctl.tpch.lineitem ├── output columns: [l_orderkey (#0), l_commitdate (#11)] @@ -138,4 +138,4 @@ EvalScalar ├── partitions total: 4 ├── partitions scanned: 4 ├── push downs: [filters: [(NOT is_not_null(lineitem.l_orderkey (#0)) OR is_not_null(lineitem.l_commitdate (#11)))], limit: NONE] - └── estimated rows: 0.00 + └── estimated rows: 600572.00