From 338bc811c37123f53b2dde4af848c263b2fc3eba Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 22 Jan 2025 17:11:13 +0800 Subject: [PATCH 01/23] feat: Add hdfs support in iceberg Signed-off-by: Xuanwo --- Cargo.lock | 274 +++++++++++++----------- Cargo.toml | 10 +- src/query/storages/iceberg/src/table.rs | 2 +- 3 files changed, 157 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ff025f9ecf169..5dcaadd0f295d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -331,9 +331,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91f2dfd1a7ec0aca967dfaa616096aec49779adc8eccec005e2f5e4111b1192a" +checksum = "31dce77d2985522288edae7206bffd5fc4996491841dda01a13a58415867e681" dependencies = [ "arrow-array", "arrow-buffer", @@ -346,9 +346,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d39387ca628be747394890a6e47f138ceac1aa912eab64f02519fed24b637af8" +checksum = "2d45fe6d3faed0435b7313e59a02583b14c6c6339fa7729e94c32a20af319a79" dependencies = [ "ahash 0.8.11", "arrow-buffer", @@ -357,15 +357,15 @@ dependencies = [ "chrono", "chrono-tz 0.10.0", "half", - "hashbrown 0.14.5", + "hashbrown 0.15.2", "num", ] [[package]] name = "arrow-buffer" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e51e05228852ffe3eb391ce7178a0f97d2cf80cc6ef91d3c4a6b3cb688049ec" +checksum = "2b02656a35cc103f28084bc80a0159668e0a680d919cef127bd7e0aaccb06ec1" dependencies = [ "bytes", "half", @@ -374,9 +374,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d09aea56ec9fa267f3f3f6cdab67d8a9974cbba90b3aa38c8fe9d0bb071bd8c1" +checksum = "c73c6233c5b5d635a56f6010e6eb1ab9e30e94707db21cea03da317f67d84cf3" dependencies = [ "arrow-array", "arrow-buffer", @@ -414,9 +414,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b98ae0af50890b494cebd7d6b04b35e896205c1d1df7b29a6272c5d0d0249ef5" +checksum = "b7f2861ffa86f107b8ab577d86cff7c7a490243eabe961ba1e1af4f27542bb79" dependencies = [ "arrow-buffer", "arrow-schema", @@ -454,9 +454,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ed91bdeaff5a1c00d28d8f73466bcb64d32bbd7093b5a30156b4b9f4dba3eee" +checksum = "0270dc511f11bb5fa98a25020ad51a99ca5b08d8a8dfbd17503bb9dba0388f0b" dependencies = [ "arrow-array", "arrow-buffer", @@ -490,9 +490,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2883d7035e0b600fb4c30ce1e50e66e53d8656aa729f2bfa4b51d359cf3ded52" +checksum = "c6f202a879d287099139ff0d121e7f55ae5e0efe634b8cf2106ebc27a8715dee" dependencies = [ "arrow-array", "arrow-buffer", @@ -519,18 +519,18 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "539ada65246b949bd99ffa0881a9a15a4a529448af1a07a9838dd78617dafab1" +checksum = "9579b9d8bce47aa41389fe344f2c6758279983b7c0ebb4013e283e3e91bb450e" dependencies = [ "serde", ] [[package]] name = "arrow-select" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6259e566b752da6dceab91766ed8b2e67bf6270eb9ad8a6e07a33c1bede2b125" +checksum = "7471ba126d0b0aaa24b50a36bc6c25e4e74869a1fd1a5553357027a0b1c8d1f1" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -542,9 +542,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3179ccbd18ebf04277a095ba7321b93fd1f774f18816bd5f6b3ce2f594edb6c" +checksum = "72993b01cb62507b06f1fb49648d7286c8989ecfabdb7b77a750fcb54410731b" dependencies = [ "arrow-array", "arrow-buffer", @@ -620,7 +620,7 @@ dependencies = [ "dashmap 5.5.3", "futures", "itertools 0.10.5", - "loom", + "loom 0.5.6", "once_cell", "pin-project-lite", "rustc-hash 1.1.0", @@ -768,9 +768,9 @@ checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" [[package]] name = "async-trait" -version = "0.1.81" +version = "0.1.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" +checksum = "3f934833b4b7233644e5848f235df3f57ed8c80f1528a26c3dfa13d2147fa056" dependencies = [ "proc-macro2", "quote", @@ -2931,7 +2931,7 @@ dependencies = [ "databend-storages-common-table-meta", "limits-rs", "log", - "opendal 0.51.1", + "opendal", "serde", "serde_json", "serfig", @@ -3209,7 +3209,7 @@ dependencies = [ "libc", "object", "once_cell", - "opendal 0.51.1", + "opendal", "parquet", "paste", "prost", @@ -3560,7 +3560,7 @@ dependencies = [ "maplit", "num-derive", "num-traits", - "opendal 0.51.1", + "opendal", "paste", "prost", "serde", @@ -3798,7 +3798,7 @@ dependencies = [ "lz4", "match-template", "num", - "opendal 0.51.1", + "opendal", "rand", "ringbuffer", "roaring", @@ -4009,7 +4009,7 @@ dependencies = [ "log", "num-derive", "num-traits", - "opendal 0.51.1", + "opendal", "parking_lot 0.12.3", "prqlc", "rand", @@ -4045,7 +4045,7 @@ dependencies = [ "futures", "http 1.1.0", "log", - "opendal 0.51.1", + "opendal", "parquet", "prometheus-client", "regex", @@ -4154,7 +4154,7 @@ dependencies = [ "itertools 0.13.0", "jsonb", "log", - "opendal 0.51.1", + "opendal", "parking_lot 0.12.3", "parquet", "rand", @@ -4201,7 +4201,7 @@ dependencies = [ "futures", "hive_metastore", "log", - "opendal 0.51.1", + "opendal", "parquet", "recursive", "serde", @@ -4311,7 +4311,7 @@ dependencies = [ "databend-storages-common-table-meta", "futures-util", "log", - "opendal 0.51.1", + "opendal", "orc-rust", "serde", "serde_json", @@ -4347,7 +4347,7 @@ dependencies = [ "ethnum", "futures", "log", - "opendal 0.51.1", + "opendal", "parquet", "rand", "serde", @@ -4392,7 +4392,7 @@ dependencies = [ "databend-common-storages-parquet", "databend-storages-common-blocks", "databend-storages-common-table-meta", - "opendal 0.51.1", + "opendal", "parquet", "serde", "serde_json", @@ -4433,7 +4433,7 @@ dependencies = [ "enum-as-inner", "futures", "log", - "opendal 0.51.1", + "opendal", "parquet", "serde", "serde_json", @@ -4498,7 +4498,7 @@ dependencies = [ "jsonb", "log", "once_cell", - "opendal 0.51.1", + "opendal", "parking_lot 0.12.3", "regex", "serde", @@ -4735,7 +4735,7 @@ dependencies = [ "jsonb", "jwt-simple", "log", - "opendal 0.51.1", + "opendal", "tantivy", "tempfile", ] @@ -5113,7 +5113,7 @@ dependencies = [ "mysql_async", "naive-cityhash", "num_cpus", - "opendal 0.51.1", + "opendal", "opensrv-mysql", "opentelemetry", "opentelemetry_sdk", @@ -5297,7 +5297,7 @@ dependencies = [ "fastrace", "futures", "log", - "opendal 0.51.1", + "opendal", ] [[package]] @@ -6728,6 +6728,19 @@ dependencies = [ "windows 0.48.0", ] +[[package]] +name = "generator" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc6bd114ceda131d3b1d665eba35788690ad37f5916457286b32ab6fd3c438dd" +dependencies = [ + "cfg-if", + "libc", + "log", + "rustversion", + "windows 0.58.0", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -8333,7 +8346,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.52.0", ] [[package]] @@ -8347,8 +8360,8 @@ dependencies = [ [[package]] name = "iceberg" -version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=01d706a1#01d706a1b01a3f93cbc9f4a43b3030b963104e47" +version = "0.4.0" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=7322183#73221833f15c70c5d73e5c7fa9f09006d9609474" dependencies = [ "anyhow", "apache-avro", @@ -8373,7 +8386,7 @@ dependencies = [ "murmur3", "num-bigint", "once_cell", - "opendal 0.50.1", + "opendal", "ordered-float 4.5.0", "parquet", "paste", @@ -8395,8 +8408,8 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" -version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=01d706a1#01d706a1b01a3f93cbc9f4a43b3030b963104e47" +version = "0.4.0" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=7322183#73221833f15c70c5d73e5c7fa9f09006d9609474" dependencies = [ "anyhow", "async-trait", @@ -8412,8 +8425,8 @@ dependencies = [ [[package]] name = "iceberg-catalog-hms" -version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=01d706a1#01d706a1b01a3f93cbc9f4a43b3030b963104e47" +version = "0.4.0" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=7322183#73221833f15c70c5d73e5c7fa9f09006d9609474" dependencies = [ "anyhow", "async-trait", @@ -8431,8 +8444,8 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" -version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=01d706a1#01d706a1b01a3f93cbc9f4a43b3030b963104e47" +version = "0.4.0" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=7322183#73221833f15c70c5d73e5c7fa9f09006d9609474" dependencies = [ "async-trait", "chrono", @@ -9380,7 +9393,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" dependencies = [ "cfg-if", - "generator", + "generator 0.7.5", "scoped-tls", "serde", "serde_json", @@ -9388,6 +9401,19 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator 0.8.4", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lru" version = "0.12.4" @@ -9698,25 +9724,23 @@ dependencies = [ [[package]] name = "moka" -version = "0.12.8" +version = "0.12.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" +checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926" dependencies = [ "async-lock", - "async-trait", "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", "event-listener 5.3.1", "futures-util", - "once_cell", + "loom 0.7.2", "parking_lot 0.12.3", - "quanta", + "portable-atomic", "rustc_version", "smallvec", "tagptr", "thiserror", - "triomphe", "uuid", ] @@ -10235,7 +10259,7 @@ dependencies = [ "futures", "futures-util", "object_store", - "opendal 0.51.1", + "opendal", "pin-project", "tokio", ] @@ -10275,36 +10299,6 @@ dependencies = [ "ureq", ] -[[package]] -name = "opendal" -version = "0.50.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213222b6c86949314d8f51acb26d8241e7c8dd0879b016a79471d49f21ee592f" -dependencies = [ - "anyhow", - "async-trait", - "backon", - "base64 0.22.1", - "bytes", - "chrono", - "crc32c", - "flagset", - "futures", - "getrandom", - "http 1.1.0", - "log", - "md-5", - "once_cell", - "percent-encoding", - "quick-xml 0.36.1", - "reqsign", - "reqwest", - "serde", - "serde_json", - "tokio", - "uuid", -] - [[package]] name = "opendal" version = "0.51.1" @@ -10708,9 +10702,9 @@ dependencies = [ [[package]] name = "parquet" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dea02606ba6f5e856561d8d507dba8bac060aefca2a6c0f1aa1d361fed91ff3e" +checksum = "8957c0c95a6a1804f3e51a18f69df29be53856a8c5768cc9b6d00fcafcd2917c" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -10727,7 +10721,7 @@ dependencies = [ "flate2", "futures", "half", - "hashbrown 0.14.5", + "hashbrown 0.15.2", "lz4_flex", "num", "num-bigint", @@ -11666,21 +11660,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" -[[package]] -name = "quanta" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" -dependencies = [ - "crossbeam-utils", - "libc", - "once_cell", - "raw-cpuid", - "wasi", - "web-sys", - "winapi", -] - [[package]] name = "quick-error" version = "1.2.3" @@ -11905,15 +11884,6 @@ dependencies = [ "random-number", ] -[[package]] -name = "raw-cpuid" -version = "11.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb9ee317cfe3fbd54b36a511efc1edd42e216903c9cd575e686dd68a2ba90d8d" -dependencies = [ - "bitflags 2.6.0", -] - [[package]] name = "rawpointer" version = "0.2.1" @@ -13599,7 +13569,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b8c4a4445d81357df8b1a650d0d0d6fbbbfe99d064aa5e02f3e4022061476d8" dependencies = [ - "loom", + "loom 0.5.6", ] [[package]] @@ -14777,12 +14747,6 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" -[[package]] -name = "triomphe" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" - [[package]] name = "triple_accel" version = "0.3.4" @@ -15883,7 +15847,17 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ - "windows-core", + "windows-core 0.52.0", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd04d41d93c4992d421894c18c8b43496aa748dd4c081bac0dc93eb0489272b6" +dependencies = [ + "windows-core 0.58.0", "windows-targets 0.52.6", ] @@ -15896,6 +15870,60 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-implement" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.95", +] + +[[package]] +name = "windows-interface" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.95", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.42.0" diff --git a/Cargo.toml b/Cargo.toml index 06d5accb6bd7b..9c6b3e7794829 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -227,7 +227,7 @@ arrow-data = { version = "53" } arrow-flight = { version = "53", features = ["flight-sql-experimental", "tls"] } arrow-ipc = { version = "53" } arrow-ord = { version = "53" } -arrow-schema = { version = "53", features = ["serde"] } +arrow-schema = { version = "53.4", features = ["serde"] } arrow-select = { version = "53" } arrow-udf-js = { version = "0.5.0" } arrow-udf-python = { version = "0.4.0" } @@ -323,10 +323,10 @@ http = "1" humantime = "2.1.0" hyper = "1" hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] } -iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" } -iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" } -iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" } -iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" } +iceberg = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "7322183" } +iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "7322183" } +iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "7322183" } +iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "7322183" } indexmap = "2.0.0" indicatif = "0.17.5" itertools = "0.13.0" diff --git a/src/query/storages/iceberg/src/table.rs b/src/query/storages/iceberg/src/table.rs index 47fbacb5b79de..e9369f42d19d2 100644 --- a/src/query/storages/iceberg/src/table.rs +++ b/src/query/storages/iceberg/src/table.rs @@ -107,7 +107,7 @@ impl IcebergTable { /// We will never persist the `engine_options` to storage, so it's safe to change the implementation. /// As long as you make sure both [`build_engine_options`] and [`parse_engine_options`] been updated. pub fn build_engine_options(table: &iceberg::table::Table) -> Result> { - let (file_io_scheme, file_io_props) = table.file_io().clone().into_props(); + let (file_io_scheme, file_io_props) = table.file_io().clone().into_builder().into_parts(); let file_io_props = serde_json::to_string(&file_io_props)?; let metadata_location = table .metadata_location() From 7906c26efa8611206efd860416d449a45446efc0 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 24 Jan 2025 14:30:01 +0800 Subject: [PATCH 02/23] Enable webhdfs support Signed-off-by: Xuanwo --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 9c6b3e7794829..06f2ff62a97a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -323,7 +323,7 @@ http = "1" humantime = "2.1.0" hyper = "1" hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] } -iceberg = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "7322183" } +iceberg = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "7322183", features = ["storage-all"] } iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "7322183" } iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "7322183" } iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "7322183" } From 88d96afbe5cc3cacf2ae1e23f501b4a7fcc3b5e7 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 10 Feb 2025 20:22:35 +0800 Subject: [PATCH 03/23] Fix iceberg support Signed-off-by: Xuanwo --- Cargo.lock | 8 ++++---- Cargo.toml | 10 ++++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5dcaadd0f295d..0470c2809060b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8361,7 +8361,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.4.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=7322183#73221833f15c70c5d73e5c7fa9f09006d9609474" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=56fa9d11#56fa9d1196fe33b80726ee8195ba21320b5f8952" dependencies = [ "anyhow", "apache-avro", @@ -8409,7 +8409,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.4.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=7322183#73221833f15c70c5d73e5c7fa9f09006d9609474" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=56fa9d11#56fa9d1196fe33b80726ee8195ba21320b5f8952" dependencies = [ "anyhow", "async-trait", @@ -8426,7 +8426,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-hms" version = "0.4.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=7322183#73221833f15c70c5d73e5c7fa9f09006d9609474" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=56fa9d11#56fa9d1196fe33b80726ee8195ba21320b5f8952" dependencies = [ "anyhow", "async-trait", @@ -8445,7 +8445,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.4.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=7322183#73221833f15c70c5d73e5c7fa9f09006d9609474" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=56fa9d11#56fa9d1196fe33b80726ee8195ba21320b5f8952" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 06f2ff62a97a9..50b1bd6b2328b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -323,10 +323,12 @@ http = "1" humantime = "2.1.0" hyper = "1" hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] } -iceberg = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "7322183", features = ["storage-all"] } -iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "7322183" } -iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "7322183" } -iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "7322183" } +iceberg = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11", features = [ + "storage-all", +] } +iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" } +iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" } +iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" } indexmap = "2.0.0" indicatif = "0.17.5" itertools = "0.13.0" From d610d76f444112bbfc3e362c75429c41b99cc32a Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 11 Feb 2025 12:41:34 +0800 Subject: [PATCH 04/23] Add table statistics for iceberg Signed-off-by: Xuanwo --- src/query/functions/src/scalars/other.rs | 2 +- src/query/storages/iceberg/src/table.rs | 49 +++++++++++++++++++++++- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/src/query/functions/src/scalars/other.rs b/src/query/functions/src/scalars/other.rs index 4fe4b43cd17d1..62e5657d16e1b 100644 --- a/src/query/functions/src/scalars/other.rs +++ b/src/query/functions/src/scalars/other.rs @@ -383,7 +383,7 @@ fn register_grouping(registry: &mut FunctionRegistry) { .collect::>(); Value::Column(Column::Number(NumberColumn::UInt32(output.into()))) } - _ => unreachable!(), + v => unreachable!("unexpected value type for grouping function: {:?}", v), }), }, })) diff --git a/src/query/storages/iceberg/src/table.rs b/src/query/storages/iceberg/src/table.rs index e9369f42d19d2..ccdef1750ad39 100644 --- a/src/query/storages/iceberg/src/table.rs +++ b/src/query/storages/iceberg/src/table.rs @@ -44,6 +44,8 @@ use databend_common_pipeline_core::Pipeline; use databend_storages_common_table_meta::table::ChangeType; use futures::TryStreamExt; use iceberg::io::FileIOBuilder; +use iceberg::spec::DataContentType; +use iceberg::spec::ManifestStatus; use crate::partition::IcebergPartInfo; use crate::predicate::PredicateBuilder; @@ -305,14 +307,57 @@ impl Table for IcebergTable { &self.get_table_info().name } - // TODO load summary async fn table_statistics( &self, _ctx: Arc, _require_fresh: bool, _change_type: Option, ) -> Result> { - Ok(None) + let table = self.table.clone(); + let Some(snapshot) = table.metadata().current_snapshot() else { + return Ok(None); + }; + + let mut statistics = TableStatistics::default(); + let mut num_rows = 0; + let mut data_size_compressed = 0; + let mut number_of_segments = 0; + let mut number_of_blocks = 0; + + let manifest = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .map_err(|err| ErrorCode::Internal(format!("load manifest list error: {err:?}")))?; + + for manifest_file in manifest.entries() { + number_of_segments += 1; + + let manifest = manifest_file + .load_manifest(table.file_io()) + .await + .map_err(|err| ErrorCode::Internal(format!("load manifest file error: {err:?}")))?; + + manifest.entries().iter().for_each(|entry| { + if entry.status() == ManifestStatus::Deleted { + return; + } + let data_file = entry.data_file(); + if data_file.content_type() != DataContentType::Data { + return; + } + + num_rows += data_file.record_count(); + data_size_compressed += data_file.file_size_in_bytes(); + number_of_blocks += 1; + }); + } + + statistics.num_rows = Some(num_rows); + statistics.data_size_compressed = Some(data_size_compressed); + statistics.number_of_segments = Some(number_of_segments); + statistics.number_of_blocks = Some(number_of_blocks); + + Ok(Some(statistics)) } #[async_backtrace::framed] From ed5b3647c156320da597bd8880d6bfc86234b23b Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 11 Feb 2025 14:58:21 +0800 Subject: [PATCH 05/23] Also implement column statistics Signed-off-by: Xuanwo --- Cargo.lock | 2 + src/query/storages/iceberg/Cargo.toml | 2 + src/query/storages/iceberg/src/lib.rs | 1 + src/query/storages/iceberg/src/statistics.rs | 256 +++++++++++++++++++ src/query/storages/iceberg/src/stats.rs | 87 ------- src/query/storages/iceberg/src/table.rs | 96 +++---- 6 files changed, 309 insertions(+), 135 deletions(-) create mode 100644 src/query/storages/iceberg/src/statistics.rs delete mode 100644 src/query/storages/iceberg/src/stats.rs diff --git a/Cargo.lock b/Cargo.lock index 4761f060882ed..e1da0f87854dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4224,6 +4224,7 @@ dependencies = [ "databend-common-meta-store", "databend-common-meta-types", "databend-common-pipeline-core", + "databend-common-storage", "databend-common-storages-parquet", "databend-storages-common-table-meta", "fastrace", @@ -4235,6 +4236,7 @@ dependencies = [ "serde", "serde_json", "typetag", + "uuid", ] [[package]] diff --git a/src/query/storages/iceberg/Cargo.toml b/src/query/storages/iceberg/Cargo.toml index a1a3a34d6d200..226fd5f6f3733 100644 --- a/src/query/storages/iceberg/Cargo.toml +++ b/src/query/storages/iceberg/Cargo.toml @@ -20,6 +20,7 @@ databend-common-meta-app = { workspace = true } databend-common-meta-store = { workspace = true } databend-common-meta-types = { workspace = true } databend-common-pipeline-core = { workspace = true } +databend-common-storage = { workspace = true } databend-common-storages-parquet = { workspace = true } databend-storages-common-table-meta = { workspace = true } fastrace = { workspace = true } @@ -31,6 +32,7 @@ iceberg-catalog-rest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } typetag = { workspace = true } +uuid = { workspace = true } [lints] workspace = true diff --git a/src/query/storages/iceberg/src/lib.rs b/src/query/storages/iceberg/src/lib.rs index 8949eee3c52d1..c423cd8d0925e 100644 --- a/src/query/storages/iceberg/src/lib.rs +++ b/src/query/storages/iceberg/src/lib.rs @@ -23,6 +23,7 @@ mod catalog; mod database; mod partition; mod predicate; +mod statistics; mod table; mod table_source; diff --git a/src/query/storages/iceberg/src/statistics.rs b/src/query/storages/iceberg/src/statistics.rs new file mode 100644 index 0000000000000..34761c4d9cc71 --- /dev/null +++ b/src/query/storages/iceberg/src/statistics.rs @@ -0,0 +1,256 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use databend_common_base::base::OrderedFloat; +use databend_common_catalog::statistics; +use databend_common_catalog::statistics::BasicColumnStatistics; +use databend_common_catalog::table::ColumnStatisticsProvider; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::Decimal; +use databend_common_expression::types::DecimalSize; +use databend_common_expression::types::Number; +use databend_common_expression::types::F32; +use databend_common_expression::types::F64; +use databend_common_expression::ColumnId; +use databend_common_expression::Scalar; +use databend_common_expression::TableField; +use databend_common_storage::Datum as DatabendDatum; +use iceberg::spec::DataContentType; +use iceberg::spec::Datum; +use iceberg::spec::ManifestStatus; +use iceberg::spec::PrimitiveLiteral; +use iceberg::spec::PrimitiveType; +use uuid::Uuid; + +use crate::IcebergTable; + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Default)] +pub struct IcebergStatistics { + /// Number of records in this table + pub record_count: u64, + /// Total table size in bytes + pub file_size_in_bytes: u64, + /// Number of manifest files in this table + pub number_of_manifest_files: u64, + /// Number of data files in this table + pub number_of_data_files: u64, + + /// Computed statistics for each column + pub computed_statistics: HashMap, +} + +impl IcebergStatistics { + /// Get statistics of an iceberg table. + pub async fn parse(table: &iceberg::table::Table) -> Result { + let Some(snapshot) = table.metadata().current_snapshot() else { + return Ok(IcebergStatistics::default()); + }; + + // Map from column id to the total size on disk of all regions that + // store the column. Does not include bytes necessary to read other + // columns, like footers. Leave null for row-oriented formats (Avro) + let mut column_sizes: HashMap = HashMap::new(); + // Map from column id to number of values in the column (including null + // and NaN values) + let mut value_counts: HashMap = HashMap::new(); + // Map from column id to number of null values in the column + let mut null_value_counts: HashMap = HashMap::new(); + // Map from column id to number of NaN values in the column + let mut nan_value_counts: HashMap = HashMap::new(); + // Map from column id to lower bound in the column serialized as biary. + // Each value must be less than or equal to all non-null, non-NaN values + // in the column for the file. + // Reference: + // + // - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) + let mut lower_bounds: HashMap = HashMap::new(); + // Map from column id to upper bound in the column serialized as binary. + // Each value must be greater than or equal to all non-null, non-Nan + // values in the column for the file. + // + // Reference: + // + // - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) + let mut upper_bounds: HashMap = HashMap::new(); + + let mut statistics = IcebergStatistics::default(); + + let manifest = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .map_err(|err| ErrorCode::Internal(format!("load manifest list error: {err:?}")))?; + + for manifest_file in manifest.entries() { + statistics.number_of_manifest_files += 1; + + let manifest = manifest_file + .load_manifest(table.file_io()) + .await + .map_err(|err| ErrorCode::Internal(format!("load manifest file error: {err:?}")))?; + + manifest.entries().iter().for_each(|entry| { + if entry.status() == ManifestStatus::Deleted { + return; + } + let data_file = entry.data_file(); + if data_file.content_type() != DataContentType::Data { + return; + } + + statistics.record_count += data_file.record_count(); + statistics.file_size_in_bytes += data_file.file_size_in_bytes(); + statistics.number_of_data_files += 1; + + data_file.column_sizes().iter().for_each(|(col_id, size)| { + column_sizes.insert(*col_id, *size); + }); + data_file.value_counts().iter().for_each(|(col_id, count)| { + value_counts.insert(*col_id, *count); + }); + data_file + .null_value_counts() + .iter() + .for_each(|(col_id, count)| { + null_value_counts.insert(*col_id, *count); + }); + data_file + .nan_value_counts() + .iter() + .for_each(|(col_id, count)| { + nan_value_counts.insert(*col_id, *count); + }); + + data_file + .lower_bounds() + .iter() + .for_each(|(&col_id, new_value)| { + lower_bounds + .entry(col_id) + .and_modify(|existing_value| { + if new_value < existing_value { + *existing_value = new_value.clone(); + } + }) + .or_insert_with(|| new_value.clone()); + }); + + data_file + .upper_bounds() + .iter() + .for_each(|(&col_id, new_value)| { + upper_bounds + .entry(col_id) + .and_modify(|existing_value| { + if new_value > existing_value { + *existing_value = new_value.clone(); + } + }) + .or_insert_with(|| new_value.clone()); + }); + }); + } + + let mut computed_statistics = HashMap::new(); + for field in IcebergTable::get_schema(table)?.fields() { + // The column id in iceberg is 1-based while the column id in Databend is 0-based. + let iceberg_col_id = field.column_id as i32 + 1; + let column_stats = get_column_stats( + field, + &column_sizes, + &lower_bounds, + &upper_bounds, + &null_value_counts, + ); + if let Some(stats) = column_stats { + computed_statistics.insert(iceberg_col_id as ColumnId, stats); + } + } + + statistics.computed_statistics = computed_statistics; + Ok(statistics) + } +} + +impl ColumnStatisticsProvider for IcebergStatistics { + fn column_statistics(&self, column_id: ColumnId) -> Option<&statistics::BasicColumnStatistics> { + self.computed_statistics.get(&column_id) + } + + fn num_rows(&self) -> Option { + Some(self.record_count) + } +} + +/// Try get [`ColumnStatistics`] for one column. +fn get_column_stats( + field: &TableField, + column_size: &HashMap, + lower: &HashMap, + upper: &HashMap, + null_counts: &HashMap, +) -> Option { + // The column id in iceberg is 1-based while the column id in Databend is 0-based. + let iceberg_col_id = field.column_id as i32 + 1; + match ( + column_size.get(&iceberg_col_id), + lower.get(&iceberg_col_id), + upper.get(&iceberg_col_id), + null_counts.get(&iceberg_col_id), + ) { + (Some(_size), Some(lo), Some(up), Some(nc)) => Some(BasicColumnStatistics { + min: parse_datum(lo).and_then(DatabendDatum::from_scalar), + max: parse_datum(up).and_then(DatabendDatum::from_scalar), + ndv: None, + null_count: *nc, + }), + _ => None, + } +} + +/// Try to parse iceberg [`Datum`] to databend [`Scalar`]. +pub fn parse_datum(data: &Datum) -> Option { + match data.literal() { + PrimitiveLiteral::Boolean(v) => Some(Scalar::Boolean(*v)), + PrimitiveLiteral::Int(v) => Some(Scalar::Number(i32::upcast_scalar(*v))), + PrimitiveLiteral::Long(v) => Some(Scalar::Number(i64::upcast_scalar(*v))), + PrimitiveLiteral::Float(v) => { + Some(Scalar::Number(F32::upcast_scalar(OrderedFloat::from(v.0)))) + } + PrimitiveLiteral::Double(v) => { + Some(Scalar::Number(F64::upcast_scalar(OrderedFloat::from(v.0)))) + } + PrimitiveLiteral::String(v) => Some(Scalar::String(v.clone())), + PrimitiveLiteral::Binary(v) => Some(Scalar::Binary(v.clone())), + // Iceberg use i128 to represent decimal + PrimitiveLiteral::Int128(v) => { + if let PrimitiveType::Decimal { precision, scale } = data.data_type() { + Some(Scalar::Decimal(v.to_scalar(DecimalSize { + precision: *precision as u8, + scale: *scale as u8, + }))) + } else { + None + } + } + // Iceberg use u128 to represent uuid + PrimitiveLiteral::UInt128(v) => { + Some(Scalar::String(Uuid::from_u128(*v).as_simple().to_string())) + } + PrimitiveLiteral::AboveMax => None, + PrimitiveLiteral::BelowMin => None, + } +} diff --git a/src/query/storages/iceberg/src/stats.rs b/src/query/storages/iceberg/src/stats.rs deleted file mode 100644 index 95c9dc65bf4fb..0000000000000 --- a/src/query/storages/iceberg/src/stats.rs +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use databend_common_expression::types::Number; -use databend_common_expression::types::F32; -use databend_common_expression::types::F64; -use databend_common_expression::Scalar; -use databend_common_expression::TableField; -use databend_common_expression::TableSchema; -use databend_storages_common_table_meta::meta::ColumnStatistics; -use databend_storages_common_table_meta::meta::StatisticsOfColumns; -use iceberg::spec::DataFile; -use iceberg::spec::Datum; -use iceberg::spec::PrimitiveLiteral; -use ordered_float::OrderedFloat; - -/// Try to convert statistics in [`DataFile`] to [`StatisticsOfColumns`]. -pub fn get_stats_of_data_file(schema: &TableSchema, df: &DataFile) -> Option { - let mut stats: HashMap = HashMap::with_capacity(schema.num_fields()); - for field in schema.fields.iter() { - if let Some(stat) = get_column_stats( - field, - df.lower_bounds(), - df.upper_bounds(), - df.null_value_counts(), - ) { - stats.insert(field.column_id, stat); - } - } - Some(stats) -} - -/// Try get [`ColumnStatistics`] for one column. -fn get_column_stats( - field: &TableField, - lower: &HashMap, - upper: &HashMap, - null_counts: &HashMap, -) -> Option { - // The column id in iceberg is 1-based while the column id in Databend is 0-based. - let iceberg_col_id = field.column_id as i32 + 1; - match ( - lower.get(&iceberg_col_id), - upper.get(&iceberg_col_id), - null_counts.get(&iceberg_col_id), - ) { - (Some(lo), Some(up), Some(nc)) => { - let min = parse_datum(lo)?; - let max = parse_datum(up)?; - Some(ColumnStatistics::new( - min, max, *nc, 0, // this field is not used. - None, - )) - } - (_, _, _) => None, - } -} - -/// TODO: we need to support more types. -fn parse_datum(data: &Datum) -> Option { - match data.literal() { - PrimitiveLiteral::Boolean(v) => Some(Scalar::Boolean(*v)), - PrimitiveLiteral::Int(v) => Some(Scalar::Number(i32::upcast_scalar(*v))), - PrimitiveLiteral::Long(v) => Some(Scalar::Number(i64::upcast_scalar(*v))), - PrimitiveLiteral::Float(OrderedFloat(v)) => { - Some(Scalar::Number(F32::upcast_scalar(F32::from(*v)))) - } - PrimitiveLiteral::Double(OrderedFloat(v)) => { - Some(Scalar::Number(F64::upcast_scalar(F64::from(*v)))) - } - PrimitiveLiteral::String(v) => Some(Scalar::String(v.clone())), - _ => None, - } -} diff --git a/src/query/storages/iceberg/src/table.rs b/src/query/storages/iceberg/src/table.rs index ccdef1750ad39..7f4c0cb0f932e 100644 --- a/src/query/storages/iceberg/src/table.rs +++ b/src/query/storages/iceberg/src/table.rs @@ -28,6 +28,7 @@ use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::ColumnStatisticsProvider; use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table::TableStatistics; @@ -44,11 +45,11 @@ use databend_common_pipeline_core::Pipeline; use databend_storages_common_table_meta::table::ChangeType; use futures::TryStreamExt; use iceberg::io::FileIOBuilder; -use iceberg::spec::DataContentType; -use iceberg::spec::ManifestStatus; use crate::partition::IcebergPartInfo; use crate::predicate::PredicateBuilder; +use crate::statistics; +use crate::statistics::IcebergStatistics; use crate::table_source::IcebergTableSource; use crate::IcebergCatalog; @@ -60,13 +61,18 @@ pub struct IcebergTable { info: TableInfo, pub table: iceberg::table::Table, + statistics: IcebergStatistics, } impl IcebergTable { /// create a new table on the table directory pub fn try_create(info: TableInfo) -> Result> { - let table = Self::parse_engine_options(&info.meta.engine_options)?; - Ok(Box::new(Self { info, table })) + let (table, statistics) = Self::parse_engine_options(&info.meta.engine_options)?; + Ok(Box::new(Self { + info, + table, + statistics, + })) } pub fn description() -> StorageDescription { @@ -108,7 +114,10 @@ impl IcebergTable { /// /// We will never persist the `engine_options` to storage, so it's safe to change the implementation. /// As long as you make sure both [`build_engine_options`] and [`parse_engine_options`] been updated. - pub fn build_engine_options(table: &iceberg::table::Table) -> Result> { + pub fn build_engine_options( + table: &iceberg::table::Table, + statistics: &statistics::IcebergStatistics, + ) -> Result> { let (file_io_scheme, file_io_props) = table.file_io().clone().into_builder().into_parts(); let file_io_props = serde_json::to_string(&file_io_props)?; let metadata_location = table @@ -117,6 +126,7 @@ impl IcebergTable { .unwrap_or_default(); let metadata = serde_json::to_string(table.metadata())?; let identifier = serde_json::to_string(table.identifier())?; + let statistics = serde_json::to_string(statistics)?; Ok(BTreeMap::from_iter([ ("iceberg.file_io.scheme".to_string(), file_io_scheme), @@ -124,6 +134,7 @@ impl IcebergTable { ("iceberg.metadata_location".to_string(), metadata_location), ("iceberg.metadata".to_string(), metadata), ("iceberg.identifier".to_string(), identifier), + ("iceberg.statistics".to_string(), statistics), ])) } @@ -132,7 +143,7 @@ impl IcebergTable { /// See [`build_engine_options`] for more information. pub fn parse_engine_options( options: &BTreeMap, - ) -> Result { + ) -> Result<(iceberg::table::Table, statistics::IcebergStatistics)> { let file_io_scheme = options.get("iceberg.file_io.scheme").ok_or_else(|| { ErrorCode::ReadTableDataError( "Rebuild iceberg table failed: Missing iceberg.file_io.scheme", @@ -165,6 +176,13 @@ impl IcebergTable { ) })?)?; + let statistics: statistics::IcebergStatistics = + serde_json::from_str(options.get("iceberg.statistics").ok_or_else(|| { + ErrorCode::ReadTableDataError( + "Rebuild iceberg table failed: Missing iceberg.statistics", + ) + })?)?; + let file_io = FileIOBuilder::new(file_io_scheme) .with_props(file_io_props) .build() @@ -174,7 +192,7 @@ impl IcebergTable { )) })?; - iceberg::table::Table::builder() + let table = iceberg::table::Table::builder() .identifier(identifier) .metadata(metadata) .metadata_location(metadata_location) @@ -182,7 +200,9 @@ impl IcebergTable { .build() .map_err(|err| { ErrorCode::ReadTableDataError(format!("Rebuild iceberg table failed: {err:?}")) - }) + })?; + + Ok((table, statistics)) } /// create a new table on the table directory @@ -194,8 +214,9 @@ impl IcebergTable { ) -> Result { let table = Self::load_iceberg_table(&ctl, database_name, table_name).await?; let table_schema = Self::get_schema(&table)?; + let statistics = statistics::IcebergStatistics::parse(&table).await?; - let engine_options = Self::build_engine_options(&table)?; + let engine_options = Self::build_engine_options(&table, &statistics)?; // construct table info let info = TableInfo { @@ -213,7 +234,11 @@ impl IcebergTable { ..Default::default() }; - Ok(Self { info, table }) + Ok(Self { + info, + table, + statistics, + }) } pub fn do_read_data( @@ -314,52 +339,27 @@ impl Table for IcebergTable { _change_type: Option, ) -> Result> { let table = self.table.clone(); - let Some(snapshot) = table.metadata().current_snapshot() else { + if table.metadata().current_snapshot().is_none() { return Ok(None); }; let mut statistics = TableStatistics::default(); - let mut num_rows = 0; - let mut data_size_compressed = 0; - let mut number_of_segments = 0; - let mut number_of_blocks = 0; - - let manifest = snapshot - .load_manifest_list(table.file_io(), table.metadata()) - .await - .map_err(|err| ErrorCode::Internal(format!("load manifest list error: {err:?}")))?; - - for manifest_file in manifest.entries() { - number_of_segments += 1; - - let manifest = manifest_file - .load_manifest(table.file_io()) - .await - .map_err(|err| ErrorCode::Internal(format!("load manifest file error: {err:?}")))?; - - manifest.entries().iter().for_each(|entry| { - if entry.status() == ManifestStatus::Deleted { - return; - } - let data_file = entry.data_file(); - if data_file.content_type() != DataContentType::Data { - return; - } - - num_rows += data_file.record_count(); - data_size_compressed += data_file.file_size_in_bytes(); - number_of_blocks += 1; - }); - } - - statistics.num_rows = Some(num_rows); - statistics.data_size_compressed = Some(data_size_compressed); - statistics.number_of_segments = Some(number_of_segments); - statistics.number_of_blocks = Some(number_of_blocks); + statistics.num_rows = Some(self.statistics.record_count); + statistics.data_size_compressed = Some(self.statistics.file_size_in_bytes); + statistics.number_of_segments = Some(self.statistics.number_of_manifest_files); + statistics.number_of_blocks = Some(self.statistics.number_of_data_files); Ok(Some(statistics)) } + #[async_backtrace::framed] + async fn column_statistics_provider( + &self, + _ctx: Arc, + ) -> Result> { + Ok(Box::new(self.statistics.clone())) + } + #[async_backtrace::framed] async fn read_partitions( &self, From 6ea601c5a1e27729e8038ca949d33b01386a76b8 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 11 Feb 2025 15:21:46 +0800 Subject: [PATCH 06/23] Fix scan Signed-off-by: Xuanwo --- src/query/sql/src/planner/plans/scan.rs | 27 +++++++++++-------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/src/query/sql/src/planner/plans/scan.rs b/src/query/sql/src/planner/plans/scan.rs index 67d250067d7a7..1bf62d836ac36 100644 --- a/src/query/sql/src/planner/plans/scan.rs +++ b/src/query/sql/src/planner/plans/scan.rs @@ -234,28 +234,25 @@ impl Operator for Scan { continue; } if let Some(col_stat) = v.clone() { - // Safe to unwrap: min, max and ndv are all `Some(_)`. + // Safe to unwrap: min, max are all `Some(_)`. let min = col_stat.min.unwrap(); let max = col_stat.max.unwrap(); - let ndv = col_stat.ndv.unwrap(); + let num_rows = num_rows.saturating_sub(col_stat.null_count); + let ndv = col_stat.ndv.unwrap_or(num_rows); let histogram = if let Some(histogram) = self.statistics.histograms.get(k) && histogram.is_some() { histogram.clone() + } else if num_rows != 0 { + histogram_from_ndv( + ndv, + num_rows, + Some((min.clone(), max.clone())), + DEFAULT_HISTOGRAM_BUCKETS, + ) + .ok() } else { - let num_rows = num_rows.saturating_sub(col_stat.null_count); - let ndv = std::cmp::min(num_rows, ndv); - if num_rows != 0 { - histogram_from_ndv( - ndv, - num_rows, - Some((min.clone(), max.clone())), - DEFAULT_HISTOGRAM_BUCKETS, - ) - .ok() - } else { - None - } + None }; let column_stat = ColumnStat { min, From fd154378f80441554fe8f7cc158f69e1ffd791fd Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 11 Feb 2025 16:10:14 +0800 Subject: [PATCH 07/23] Fix ndv is wrong Signed-off-by: Xuanwo --- src/query/sql/src/planner/plans/scan.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/sql/src/planner/plans/scan.rs b/src/query/sql/src/planner/plans/scan.rs index 1bf62d836ac36..fa006b7d6e4a8 100644 --- a/src/query/sql/src/planner/plans/scan.rs +++ b/src/query/sql/src/planner/plans/scan.rs @@ -239,6 +239,7 @@ impl Operator for Scan { let max = col_stat.max.unwrap(); let num_rows = num_rows.saturating_sub(col_stat.null_count); let ndv = col_stat.ndv.unwrap_or(num_rows); + let ndv = std::cmp::min(num_rows, ndv); let histogram = if let Some(histogram) = self.statistics.histograms.get(k) && histogram.is_some() { From db6540313c5c0e2b8df524819d2069ca5e4f90aa Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 11 Feb 2025 16:55:52 +0800 Subject: [PATCH 08/23] Add more info in iceberg table Signed-off-by: Xuanwo --- src/query/storages/iceberg/src/table.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/query/storages/iceberg/src/table.rs b/src/query/storages/iceberg/src/table.rs index 7f4c0cb0f932e..1c7df8822f773 100644 --- a/src/query/storages/iceberg/src/table.rs +++ b/src/query/storages/iceberg/src/table.rs @@ -308,7 +308,7 @@ impl IcebergTable { .collect(); Ok(( - PartStatistics::new_estimated(None, read_rows, read_bytes, parts.len(), total_files), + PartStatistics::new_exact(read_rows, read_bytes, parts.len(), total_files), Partitions::create(PartitionsShuffleKind::Mod, parts), )) } @@ -392,4 +392,8 @@ impl Table for IcebergTable { fn support_prewhere(&self) -> bool { false } + + fn has_exact_total_row_count(&self) -> bool { + true + } } From 40db9fd0e64a75a6bb187a420ceca86cd4dfd00f Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 11 Feb 2025 17:34:26 +0800 Subject: [PATCH 09/23] Fix column id Signed-off-by: Xuanwo --- src/query/storages/iceberg/src/statistics.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/query/storages/iceberg/src/statistics.rs b/src/query/storages/iceberg/src/statistics.rs index 34761c4d9cc71..b0794381a24df 100644 --- a/src/query/storages/iceberg/src/statistics.rs +++ b/src/query/storages/iceberg/src/statistics.rs @@ -166,9 +166,7 @@ impl IcebergStatistics { let mut computed_statistics = HashMap::new(); for field in IcebergTable::get_schema(table)?.fields() { - // The column id in iceberg is 1-based while the column id in Databend is 0-based. - let iceberg_col_id = field.column_id as i32 + 1; - let column_stats = get_column_stats( + let column_stats: Option = get_column_stats( field, &column_sizes, &lower_bounds, @@ -176,7 +174,7 @@ impl IcebergStatistics { &null_value_counts, ); if let Some(stats) = column_stats { - computed_statistics.insert(iceberg_col_id as ColumnId, stats); + computed_statistics.insert(field.column_id, stats); } } From 6e26b301e8b6fdf9985cc33c289be29997910f09 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 11 Feb 2025 19:07:55 +0800 Subject: [PATCH 10/23] Add debug for inner join Signed-off-by: Xuanwo --- .../transforms/hash_join/probe_join/inner_join.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs index 51bcbbbb4cc6b..f0fd7ccc95c6a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs @@ -90,9 +90,10 @@ impl HashJoinProbeState { } if FROM_LEFT_SINGLE && match_count > 1 { - return Err(ErrorCode::Internal( - "Scalar subquery can't return more than one row", - )); + return Err(ErrorCode::Internal(format!( + "Scalar subquery can't return more than one row, but got {}", + match_count + ))); } // Fill `probe_indexes`. From 7b852ad0336ff14c8b3a6c7a1eb6782e989c9d85 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 12 Feb 2025 15:17:35 +0800 Subject: [PATCH 11/23] Try fix tests Signed-off-by: Xuanwo --- .../suites/mode/standalone/explain/bloom_filter.test | 4 ++-- tests/sqllogictests/suites/mode/standalone/explain/join.test | 2 +- tests/sqllogictests/suites/tpch_iceberg/prune.test | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/sqllogictests/suites/mode/standalone/explain/bloom_filter.test b/tests/sqllogictests/suites/mode/standalone/explain/bloom_filter.test index 18f12f481efac..c73973512c169 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/bloom_filter.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/bloom_filter.test @@ -128,11 +128,11 @@ explain select 1 from bloom_test_t where c2=3; EvalScalar ├── output columns: [1 (#3)] ├── expressions: [1] -├── estimated rows: 2.67 +├── estimated rows: 4.00 └── Filter ├── output columns: [] ├── filters: [is_true(bloom_test_t.c2 (#1) = 3)] - ├── estimated rows: 2.67 + ├── estimated rows: 4.00 └── TableScan ├── table: default.default.bloom_test_t ├── output columns: [c2 (#1)] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/join.test b/tests/sqllogictests/suites/mode/standalone/explain/join.test index 86fdb263a1615..5aa020019970b 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/join.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/join.test @@ -238,7 +238,7 @@ HashJoin ├── build keys: [b.x (#1)] ├── probe keys: [a.x (#0)] ├── filters: [] -├── estimated rows: 2.37 +├── estimated rows: 3.56 ├── Filter(Build) │ ├── output columns: [b.x (#1), b.y (#2)] │ ├── filters: [is_true(b.x (#1) > 42)] diff --git a/tests/sqllogictests/suites/tpch_iceberg/prune.test b/tests/sqllogictests/suites/tpch_iceberg/prune.test index c32b0ae6101f3..1885d47473370 100644 --- a/tests/sqllogictests/suites/tpch_iceberg/prune.test +++ b/tests/sqllogictests/suites/tpch_iceberg/prune.test @@ -32,7 +32,7 @@ EvalScalar ├── partitions total: 0 ├── partitions scanned: 0 ├── push downs: [filters: [is_true(lineitem.l_orderkey (#0) < 1)], limit: NONE] - └── estimated rows: 0.00 + └── estimated rows: 600572.00 query T explain select 1 from ctl.tpch.lineitem where l_orderkey < 1 or l_commitdate < '1992-01-31'; From 8fad3d838230ac7fb5cd5eda07adf37de774c232 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 12 Feb 2025 15:42:45 +0800 Subject: [PATCH 12/23] Fix tests Signed-off-by: Xuanwo --- .../suites/tpch_iceberg/prune.test | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/sqllogictests/suites/tpch_iceberg/prune.test b/tests/sqllogictests/suites/tpch_iceberg/prune.test index 1885d47473370..8417b66e4d212 100644 --- a/tests/sqllogictests/suites/tpch_iceberg/prune.test +++ b/tests/sqllogictests/suites/tpch_iceberg/prune.test @@ -53,7 +53,7 @@ EvalScalar ├── partitions total: 0 ├── partitions scanned: 0 ├── push downs: [filters: [is_true((lineitem.l_orderkey (#0) < 1 OR lineitem.l_commitdate (#11) < '1992-01-31'))], limit: NONE] - └── estimated rows: 0.00 + └── estimated rows: 600572.00 query T explain select 1 from ctl.tpch.lineitem where l_orderkey < 1 and l_commitdate > '1992-01-31'; @@ -74,7 +74,7 @@ EvalScalar ├── partitions total: 0 ├── partitions scanned: 0 ├── push downs: [filters: [and_filters(lineitem.l_orderkey (#0) < 1, lineitem.l_commitdate (#11) > '1992-01-31')], limit: NONE] - └── estimated rows: 0.00 + └── estimated rows: 600572.00 query T explain select 1 from ctl.tpch.lineitem where l_orderkey > 1 and l_commitdate = '1992-01-22'; @@ -82,11 +82,11 @@ explain select 1 from ctl.tpch.lineitem where l_orderkey > 1 and l_commitdate = EvalScalar ├── output columns: [1 (#16)] ├── expressions: [1] -├── estimated rows: 0.00 +├── estimated rows: 1.00 └── Filter ├── output columns: [] ├── filters: [is_true(lineitem.l_orderkey (#0) > 1), is_true(lineitem.l_commitdate (#11) = '1992-01-22')] - ├── estimated rows: 0.00 + ├── estimated rows: 1.00 └── TableScan ├── table: ctl.tpch.lineitem ├── output columns: [l_orderkey (#0), l_commitdate (#11)] @@ -95,7 +95,7 @@ EvalScalar ├── partitions total: 0 ├── partitions scanned: 0 ├── push downs: [filters: [and_filters(lineitem.l_orderkey (#0) > 1, lineitem.l_commitdate (#11) = '1992-01-22')], limit: NONE] - └── estimated rows: 0.00 + └── estimated rows: 600572.00 query T @@ -117,7 +117,7 @@ EvalScalar ├── partitions total: 0 ├── partitions scanned: 0 ├── push downs: [filters: [NOT is_not_null(lineitem.l_orderkey (#0))], limit: NONE] - └── estimated rows: 0.00 + └── estimated rows: 600572.00 query T explain select 1 from ctl.tpch.lineitem where l_orderkey is null or l_commitdate is not null; @@ -125,17 +125,17 @@ explain select 1 from ctl.tpch.lineitem where l_orderkey is null or l_commitdat EvalScalar ├── output columns: [1 (#16)] ├── expressions: [1] -├── estimated rows: 0.00 +├── estimated rows: 600572.00 └── Filter ├── output columns: [] ├── filters: [(NOT is_not_null(lineitem.l_orderkey (#0)) OR is_not_null(lineitem.l_commitdate (#11)))] - ├── estimated rows: 0.00 + ├── estimated rows: 600572.00 └── TableScan ├── table: ctl.tpch.lineitem ├── output columns: [l_orderkey (#0), l_commitdate (#11)] ├── read rows: 600572 - ├── read size: 14.27 MiB - ├── partitions total: 4 - ├── partitions scanned: 4 + ├── read size: 14.54 MiB + ├── partitions total: 18 + ├── partitions scanned: 18 ├── push downs: [filters: [(NOT is_not_null(lineitem.l_orderkey (#0)) OR is_not_null(lineitem.l_commitdate (#11)))], limit: NONE] - └── estimated rows: 0.00 + └── estimated rows: 600572.00 From 02008f5c7a60b57fd78ae9247641de3c4b68ab81 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 12 Feb 2025 22:35:36 +0800 Subject: [PATCH 13/23] Fix system tables query Signed-off-by: Xuanwo --- src/query/sql/src/planner/binder/ddl/dictionary.rs | 2 +- src/query/sql/src/planner/binder/ddl/stream.rs | 6 +++--- src/query/sql/src/planner/binder/ddl/table.rs | 14 +++++++------- src/query/sql/src/planner/binder/ddl/view.rs | 4 ++-- .../sql/src/planner/binder/ddl/virtual_column.rs | 2 +- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/query/sql/src/planner/binder/ddl/dictionary.rs b/src/query/sql/src/planner/binder/ddl/dictionary.rs index 94099664073da..be52a193143a6 100644 --- a/src/query/sql/src/planner/binder/ddl/dictionary.rs +++ b/src/query/sql/src/planner/binder/ddl/dictionary.rs @@ -400,7 +400,7 @@ impl Binder { ) -> Result { let ShowDictionariesStmt { database, limit } = stmt; - let mut select_builder = SelectBuilder::from("system.dictionaries"); + let mut select_builder = SelectBuilder::from("default.system.dictionaries"); select_builder .with_column("database AS Database") diff --git a/src/query/sql/src/planner/binder/ddl/stream.rs b/src/query/sql/src/planner/binder/ddl/stream.rs index a18914afd7615..74137ec63de7f 100644 --- a/src/query/sql/src/planner/binder/ddl/stream.rs +++ b/src/query/sql/src/planner/binder/ddl/stream.rs @@ -127,9 +127,9 @@ impl Binder { let database = self.check_database_exist(catalog, database).await?; let mut select_builder = if *full { - SelectBuilder::from("system.streams") + SelectBuilder::from("default.system.streams") } else { - SelectBuilder::from("system.streams_terse") + SelectBuilder::from("default.system.streams_terse") }; if *full { @@ -200,7 +200,7 @@ impl Binder { let (catalog, database, stream) = self.normalize_object_identifier_triple(catalog, database, stream); - let mut select_builder = SelectBuilder::from("system.streams"); + let mut select_builder = SelectBuilder::from("default.system.streams"); select_builder .with_column("created_on") .with_column("name") diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 401a03d381898..6a38f06de3b6c 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -174,9 +174,9 @@ impl Binder { let database = self.check_database_exist(catalog, database).await?; let mut select_builder = if stmt.with_history { - SelectBuilder::from("system.tables_with_history") + SelectBuilder::from("default.system.tables_with_history") } else { - SelectBuilder::from("system.tables") + SelectBuilder::from("default.system.tables") }; if *full { @@ -318,21 +318,21 @@ impl Binder { // Use `system.tables` AS the "base" table to construct the result-set of `SHOW TABLE STATUS ..` // // To constraint the schema of the final result-set, - // `(select ${select_cols} from system.tables where ..)` + // `(select ${select_cols} from default.system.tables where ..)` // is used AS a derived table. // (unlike mysql, alias of derived table is not required in databend). let query = match limit { None => format!( - "SELECT {} FROM system.tables WHERE database = '{}' ORDER BY Name", + "SELECT {} FROM default.system.tables WHERE database = '{}' ORDER BY Name", select_cols, database ), Some(ShowLimit::Like { pattern }) => format!( - "SELECT * from (SELECT {} FROM system.tables WHERE database = '{}') \ + "SELECT * from (SELECT {} FROM default.system.tables WHERE database = '{}') \ WHERE Name LIKE '{}' ORDER BY Name", select_cols, database, pattern ), Some(ShowLimit::Where { selection }) => format!( - "SELECT * from (SELECT {} FROM system.tables WHERE database = '{}') \ + "SELECT * from (SELECT {} FROM default.system.tables WHERE database = '{}') \ WHERE ({}) ORDER BY Name", select_cols, database, selection ), @@ -353,7 +353,7 @@ impl Binder { let database = self.check_database_exist(&None, database).await?; - let mut select_builder = SelectBuilder::from("system.tables_with_history"); + let mut select_builder = SelectBuilder::from("default.system.tables_with_history"); select_builder .with_column("name AS Tables") diff --git a/src/query/sql/src/planner/binder/ddl/view.rs b/src/query/sql/src/planner/binder/ddl/view.rs index 7a8321cc05a09..031f60e63bbee 100644 --- a/src/query/sql/src/planner/binder/ddl/view.rs +++ b/src/query/sql/src/planner/binder/ddl/view.rs @@ -157,9 +157,9 @@ impl Binder { let database = self.check_database_exist(catalog, database).await?; let mut select_builder = if stmt.with_history { - SelectBuilder::from("system.views_with_history") + SelectBuilder::from("default.system.views_with_history") } else { - SelectBuilder::from("system.views") + SelectBuilder::from("default.system.views") }; if *full { diff --git a/src/query/sql/src/planner/binder/ddl/virtual_column.rs b/src/query/sql/src/planner/binder/ddl/virtual_column.rs index 02386502a8d0d..b73cc0ff9df71 100644 --- a/src/query/sql/src/planner/binder/ddl/virtual_column.rs +++ b/src/query/sql/src/planner/binder/ddl/virtual_column.rs @@ -385,7 +385,7 @@ impl Binder { } }; - let mut select_builder = SelectBuilder::from("system.virtual_columns"); + let mut select_builder = SelectBuilder::from("default.system.virtual_columns"); select_builder .with_column("database") .with_column("table") From 864a81009c65e80e4e4fbc0157c00a312f1fc776 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 13 Feb 2025 22:29:29 +0800 Subject: [PATCH 14/23] Fix stats not calculated correctly Signed-off-by: Xuanwo --- src/query/storages/iceberg/src/statistics.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/query/storages/iceberg/src/statistics.rs b/src/query/storages/iceberg/src/statistics.rs index b0794381a24df..323510f81544a 100644 --- a/src/query/storages/iceberg/src/statistics.rs +++ b/src/query/storages/iceberg/src/statistics.rs @@ -116,16 +116,16 @@ impl IcebergStatistics { statistics.number_of_data_files += 1; data_file.column_sizes().iter().for_each(|(col_id, size)| { - column_sizes.insert(*col_id, *size); + *column_sizes.entry(*col_id).or_default() += size; }); data_file.value_counts().iter().for_each(|(col_id, count)| { - value_counts.insert(*col_id, *count); + *value_counts.entry(*col_id).or_default() += count; }); data_file .null_value_counts() .iter() .for_each(|(col_id, count)| { - null_value_counts.insert(*col_id, *count); + *null_value_counts.entry(*col_id).or_default() += count; }); data_file .nan_value_counts() From 4fa9ca0b588c68d4081a407f72a66f3721b4e810 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 13 Feb 2025 22:37:38 +0800 Subject: [PATCH 15/23] Fix stats Signed-off-by: Xuanwo --- src/query/storages/iceberg/src/statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/storages/iceberg/src/statistics.rs b/src/query/storages/iceberg/src/statistics.rs index 323510f81544a..69262c611b311 100644 --- a/src/query/storages/iceberg/src/statistics.rs +++ b/src/query/storages/iceberg/src/statistics.rs @@ -131,7 +131,7 @@ impl IcebergStatistics { .nan_value_counts() .iter() .for_each(|(col_id, count)| { - nan_value_counts.insert(*col_id, *count); + *nan_value_counts.entry(*col_id).or_default() += *count; }); data_file From 1427581459a7519613cb40d5825dbde4899a09f3 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 13 Feb 2025 22:51:39 +0800 Subject: [PATCH 16/23] Fix stats Signed-off-by: Xuanwo --- src/query/storages/iceberg/src/statistics.rs | 37 ++++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/src/query/storages/iceberg/src/statistics.rs b/src/query/storages/iceberg/src/statistics.rs index 69262c611b311..0956dd4b6eb40 100644 --- a/src/query/storages/iceberg/src/statistics.rs +++ b/src/query/storages/iceberg/src/statistics.rs @@ -166,16 +166,14 @@ impl IcebergStatistics { let mut computed_statistics = HashMap::new(); for field in IcebergTable::get_schema(table)?.fields() { - let column_stats: Option = get_column_stats( + let column_stats = get_column_stats( field, &column_sizes, &lower_bounds, &upper_bounds, &null_value_counts, ); - if let Some(stats) = column_stats { - computed_statistics.insert(field.column_id, stats); - } + computed_statistics.insert(field.column_id, column_stats); } statistics.computed_statistics = computed_statistics; @@ -196,26 +194,27 @@ impl ColumnStatisticsProvider for IcebergStatistics { /// Try get [`ColumnStatistics`] for one column. fn get_column_stats( field: &TableField, - column_size: &HashMap, + _column_size: &HashMap, lower: &HashMap, upper: &HashMap, null_counts: &HashMap, -) -> Option { +) -> BasicColumnStatistics { // The column id in iceberg is 1-based while the column id in Databend is 0-based. let iceberg_col_id = field.column_id as i32 + 1; - match ( - column_size.get(&iceberg_col_id), - lower.get(&iceberg_col_id), - upper.get(&iceberg_col_id), - null_counts.get(&iceberg_col_id), - ) { - (Some(_size), Some(lo), Some(up), Some(nc)) => Some(BasicColumnStatistics { - min: parse_datum(lo).and_then(DatabendDatum::from_scalar), - max: parse_datum(up).and_then(DatabendDatum::from_scalar), - ndv: None, - null_count: *nc, - }), - _ => None, + BasicColumnStatistics { + min: lower + .get(&iceberg_col_id) + .and_then(parse_datum) + .and_then(DatabendDatum::from_scalar), + max: upper + .get(&iceberg_col_id) + .and_then(parse_datum) + .and_then(DatabendDatum::from_scalar), + ndv: None, + null_count: null_counts + .get(&iceberg_col_id) + .copied() + .unwrap_or_default(), } } From d4b335214d0ecaabe0baed168152a7bd9d9f4410 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 13 Feb 2025 23:08:34 +0800 Subject: [PATCH 17/23] Try fix predicate Signed-off-by: Xuanwo --- src/query/storages/iceberg/Cargo.toml | 1 + src/query/storages/iceberg/src/predicate.rs | 72 ++++++++++----------- src/query/storages/iceberg/src/table.rs | 2 +- 3 files changed, 38 insertions(+), 37 deletions(-) diff --git a/src/query/storages/iceberg/Cargo.toml b/src/query/storages/iceberg/Cargo.toml index 226fd5f6f3733..ddd7a76e07e6e 100644 --- a/src/query/storages/iceberg/Cargo.toml +++ b/src/query/storages/iceberg/Cargo.toml @@ -33,6 +33,7 @@ serde = { workspace = true } serde_json = { workspace = true } typetag = { workspace = true } uuid = { workspace = true } +log = "0.4.22" [lints] workspace = true diff --git a/src/query/storages/iceberg/src/predicate.rs b/src/query/storages/iceberg/src/predicate.rs index 11a69727710cc..059b4a20e1787 100644 --- a/src/query/storages/iceberg/src/predicate.rs +++ b/src/query/storages/iceberg/src/predicate.rs @@ -19,14 +19,12 @@ use databend_common_expression::Scalar; use iceberg::expr::Predicate; use iceberg::expr::Reference; use iceberg::spec::Datum; +use log::debug; -#[derive(Default, Copy, Clone, Debug)] -pub struct PredicateBuilder { - uncertain: bool, -} +pub struct PredicateBuilder; impl PredicateBuilder { - pub fn build(&mut self, expr: &RemoteExpr) -> Predicate { + pub fn build(expr: &RemoteExpr) -> (bool, Predicate) { match expr { RemoteExpr::Constant { span: _, @@ -36,9 +34,9 @@ impl PredicateBuilder { let value = scalar.as_boolean(); let is_true = value.copied().unwrap_or(false); if is_true { - Predicate::AlwaysTrue + (false, Predicate::AlwaysTrue) } else { - Predicate::AlwaysFalse + (false, Predicate::AlwaysFalse) } } @@ -50,14 +48,14 @@ impl PredicateBuilder { args, return_type: _, } if args.len() == 1 && id.name().as_ref() == "is_true" => { - let predicate = self.build(&args[0]); - if self.uncertain { - return Predicate::AlwaysTrue; + let (uncertain, predicate) = Self::build(&args[0]); + if uncertain { + return (uncertain, Predicate::AlwaysTrue); } match predicate { - Predicate::AlwaysTrue => Predicate::AlwaysTrue, - Predicate::AlwaysFalse => Predicate::AlwaysFalse, - _ => predicate, + Predicate::AlwaysTrue => (false, Predicate::AlwaysTrue), + Predicate::AlwaysFalse => (false, Predicate::AlwaysFalse), + _ => (false, predicate), } } @@ -72,10 +70,9 @@ impl PredicateBuilder { let (_, name, _, _) = args[0].as_column_ref().unwrap(); let r = Reference::new(name); if let Some(op) = build_unary(r, id.name().as_ref()) { - return op; + return (false, op); } - self.uncertain = true; - Predicate::AlwaysTrue + (true, Predicate::AlwaysTrue) } // not @@ -86,15 +83,18 @@ impl PredicateBuilder { args, return_type: _, } if args.len() == 1 && id.name().as_ref() == "not" => { - let predicate = self.build(&args[0]); - if self.uncertain { - return Predicate::AlwaysTrue; + let (uncertain, predicate) = Self::build(&args[0]); + if uncertain { + return (true, Predicate::AlwaysTrue); } - match predicate { + + let predicate = match predicate { Predicate::AlwaysTrue => Predicate::AlwaysFalse, Predicate::AlwaysFalse => Predicate::AlwaysTrue, _ => predicate.negate(), - } + }; + + (false, predicate) } // binary {a op datum} @@ -105,16 +105,18 @@ impl PredicateBuilder { args, return_type: _, } if args.len() == 2 && ["and", "and_filters", "or"].contains(&id.name().as_ref()) => { - let left = self.build(&args[0]); - let right = self.build(&args[1]); - if self.uncertain { - return Predicate::AlwaysTrue; + let (left_uncertain, left) = Self::build(&args[0]); + let (right_uncertain, right) = Self::build(&args[1]); + if left_uncertain || right_uncertain { + return (true, Predicate::AlwaysTrue); } - match id.name().as_ref() { + let predicate = match id.name().as_ref() { "and" | "and_filters" => left.and(right), "or" => left.or(right), _ => unreachable!(), - } + }; + + (false, predicate) } // binary {a op datum} @@ -135,11 +137,10 @@ impl PredicateBuilder { let r = Reference::new(name); let p = build_binary(r, id.name().as_ref(), datum); if let Some(op) = p { - return op; + return (false, op); } } - self.uncertain = true; - Predicate::AlwaysTrue + (true, Predicate::AlwaysTrue) } // binary {datum op a} @@ -160,16 +161,15 @@ impl PredicateBuilder { let r = Reference::new(name); let p = build_reverse_binary(r, id.name().as_ref(), datum); if let Some(op) = p { - return op; + return (false, op); } } - self.uncertain = true; - Predicate::AlwaysTrue + (true, Predicate::AlwaysTrue) } - _ => { - self.uncertain = true; - Predicate::AlwaysTrue + v => { + debug!("predicate build for {v:?} is nit supported yet"); + (true, Predicate::AlwaysTrue) } } } diff --git a/src/query/storages/iceberg/src/table.rs b/src/query/storages/iceberg/src/table.rs index 1c7df8822f773..d08c96eb20dee 100644 --- a/src/query/storages/iceberg/src/table.rs +++ b/src/query/storages/iceberg/src/table.rs @@ -280,7 +280,7 @@ impl IcebergTable { ); } if let Some(filter) = &push_downs.filters { - let predicate = PredicateBuilder::default().build(&filter.filter); + let (_, predicate) = PredicateBuilder::build(&filter.filter); scan = scan.with_filter(predicate) } } From 52fc72e6c45b3e8bf676defdc0cfce73bcde7336 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 13 Feb 2025 23:27:11 +0800 Subject: [PATCH 18/23] Fix lock not updated Signed-off-by: Xuanwo --- Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.lock b/Cargo.lock index e1da0f87854dd..80d38f10cba70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4233,6 +4233,7 @@ dependencies = [ "iceberg-catalog-glue", "iceberg-catalog-hms", "iceberg-catalog-rest", + "log", "serde", "serde_json", "typetag", From 9199c193f33cea36b7aa5c724b64f32e68b3597d Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 14 Feb 2025 19:42:55 +0800 Subject: [PATCH 19/23] Fix cargo Signed-off-by: Xuanwo --- src/query/storages/iceberg/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/storages/iceberg/Cargo.toml b/src/query/storages/iceberg/Cargo.toml index ddd7a76e07e6e..e1a8547496774 100644 --- a/src/query/storages/iceberg/Cargo.toml +++ b/src/query/storages/iceberg/Cargo.toml @@ -29,11 +29,11 @@ iceberg = { workspace = true } iceberg-catalog-glue = { workspace = true } iceberg-catalog-hms = { workspace = true } iceberg-catalog-rest = { workspace = true } +log = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } typetag = { workspace = true } uuid = { workspace = true } -log = "0.4.22" [lints] workspace = true From 526f55a0b88e0d59aa870604421bd2cf9f137522 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 14 Feb 2025 20:46:41 +0800 Subject: [PATCH 20/23] Fix tests Signed-off-by: Xuanwo --- .../suites/mode/standalone/explain/bloom_filter.test | 4 ++-- .../sqllogictests/suites/mode/standalone/explain/join.test | 4 ++-- .../explain/auto_rebuild_missing_bloom_index.test | 2 +- tests/sqllogictests/suites/tpch_iceberg/prune.test | 6 +++--- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/sqllogictests/suites/mode/standalone/explain/bloom_filter.test b/tests/sqllogictests/suites/mode/standalone/explain/bloom_filter.test index c73973512c169..a2c316538d348 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/bloom_filter.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/bloom_filter.test @@ -150,11 +150,11 @@ explain select 1 from bloom_test_t where c3=12345; EvalScalar ├── output columns: [1 (#3)] ├── expressions: [1] -├── estimated rows: 1.00 +├── estimated rows: 4.00 └── Filter ├── output columns: [] ├── filters: [is_true(bloom_test_t.c3 (#2) = 12345)] - ├── estimated rows: 1.00 + ├── estimated rows: 4.00 └── TableScan ├── table: default.default.bloom_test_t ├── output columns: [c3 (#2)] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/join.test b/tests/sqllogictests/suites/mode/standalone/explain/join.test index 5aa020019970b..48904b26f3573 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/join.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/join.test @@ -316,11 +316,11 @@ HashJoin ├── build keys: [b.x (#1)] ├── probe keys: [a.x (#0)] ├── filters: [] -├── estimated rows: 2.37 +├── estimated rows: 2.67 ├── Filter(Build) │ ├── output columns: [b.x (#1), b.y (#2)] │ ├── filters: [is_true(b.x (#1) > 42), is_true(b.x (#1) < 45)] -│ ├── estimated rows: 2.67 +│ ├── estimated rows: 2.00 │ └── TableScan │ ├── table: default.default.twocolumn │ ├── output columns: [x (#1), y (#2)] diff --git a/tests/sqllogictests/suites/no_table_meta_cache/explain/auto_rebuild_missing_bloom_index.test b/tests/sqllogictests/suites/no_table_meta_cache/explain/auto_rebuild_missing_bloom_index.test index 18c1544db968e..04d0d1b59f703 100644 --- a/tests/sqllogictests/suites/no_table_meta_cache/explain/auto_rebuild_missing_bloom_index.test +++ b/tests/sqllogictests/suites/no_table_meta_cache/explain/auto_rebuild_missing_bloom_index.test @@ -122,7 +122,7 @@ explain select * from t where s = '12'; Filter ├── output columns: [t.c (#0), t.s (#1)] ├── filters: [is_true(t.s (#1) = '12')] -├── estimated rows: 2.00 +├── estimated rows: 2.67 └── TableScan ├── table: default.test_auto_rebuild_missing.t ├── output columns: [c (#0), s (#1)] diff --git a/tests/sqllogictests/suites/tpch_iceberg/prune.test b/tests/sqllogictests/suites/tpch_iceberg/prune.test index 8417b66e4d212..b0a6d04646405 100644 --- a/tests/sqllogictests/suites/tpch_iceberg/prune.test +++ b/tests/sqllogictests/suites/tpch_iceberg/prune.test @@ -134,8 +134,8 @@ EvalScalar ├── table: ctl.tpch.lineitem ├── output columns: [l_orderkey (#0), l_commitdate (#11)] ├── read rows: 600572 - ├── read size: 14.54 MiB - ├── partitions total: 18 - ├── partitions scanned: 18 + ├── read size: 14.27 MiB + ├── partitions total: 4 + ├── partitions scanned: 4 ├── push downs: [filters: [(NOT is_not_null(lineitem.l_orderkey (#0)) OR is_not_null(lineitem.l_commitdate (#11)))], limit: NONE] └── estimated rows: 600572.00 From 74488d74babe91b93054d1524cd1de883f228667 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 14 Feb 2025 22:35:47 +0800 Subject: [PATCH 21/23] revert scan changes Signed-off-by: Xuanwo --- src/query/sql/src/planner/plans/scan.rs | 31 +++++++++++++++---------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/query/sql/src/planner/plans/scan.rs b/src/query/sql/src/planner/plans/scan.rs index fa006b7d6e4a8..1040bf97049d2 100644 --- a/src/query/sql/src/planner/plans/scan.rs +++ b/src/query/sql/src/planner/plans/scan.rs @@ -237,23 +237,30 @@ impl Operator for Scan { // Safe to unwrap: min, max are all `Some(_)`. let min = col_stat.min.unwrap(); let max = col_stat.max.unwrap(); - let num_rows = num_rows.saturating_sub(col_stat.null_count); - let ndv = col_stat.ndv.unwrap_or(num_rows); - let ndv = std::cmp::min(num_rows, ndv); + // ndv could be `None`, we will use `num_rows - null_count` as ndv instead. + // + // NOTE: don't touch the original num_rows, since it will be used in other places. + let ndv = col_stat + .ndv + .unwrap_or_else(|| num_rows.saturating_sub(col_stat.null_count)); let histogram = if let Some(histogram) = self.statistics.histograms.get(k) && histogram.is_some() { histogram.clone() - } else if num_rows != 0 { - histogram_from_ndv( - ndv, - num_rows, - Some((min.clone(), max.clone())), - DEFAULT_HISTOGRAM_BUCKETS, - ) - .ok() } else { - None + let num_rows = num_rows.saturating_sub(col_stat.null_count); + let ndv = std::cmp::min(num_rows, ndv); + if num_rows != 0 { + histogram_from_ndv( + ndv, + num_rows, + Some((min.clone(), max.clone())), + DEFAULT_HISTOGRAM_BUCKETS, + ) + .ok() + } else { + None + } }; let column_stat = ColumnStat { min, From d931da89d67f4dfaac322d3e4cc1f42343c0199b Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 14 Feb 2025 22:36:56 +0800 Subject: [PATCH 22/23] Revert changes to tests Signed-off-by: Xuanwo --- .../mode/standalone/explain/bloom_filter.test | 8 +- .../mode/standalone/explain/explain.test | 80 ++++++++++--------- .../suites/mode/standalone/explain/join.test | 6 +- .../auto_rebuild_missing_bloom_index.test | 2 +- 4 files changed, 52 insertions(+), 44 deletions(-) diff --git a/tests/sqllogictests/suites/mode/standalone/explain/bloom_filter.test b/tests/sqllogictests/suites/mode/standalone/explain/bloom_filter.test index a2c316538d348..18f12f481efac 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/bloom_filter.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/bloom_filter.test @@ -128,11 +128,11 @@ explain select 1 from bloom_test_t where c2=3; EvalScalar ├── output columns: [1 (#3)] ├── expressions: [1] -├── estimated rows: 4.00 +├── estimated rows: 2.67 └── Filter ├── output columns: [] ├── filters: [is_true(bloom_test_t.c2 (#1) = 3)] - ├── estimated rows: 4.00 + ├── estimated rows: 2.67 └── TableScan ├── table: default.default.bloom_test_t ├── output columns: [c2 (#1)] @@ -150,11 +150,11 @@ explain select 1 from bloom_test_t where c3=12345; EvalScalar ├── output columns: [1 (#3)] ├── expressions: [1] -├── estimated rows: 4.00 +├── estimated rows: 1.00 └── Filter ├── output columns: [] ├── filters: [is_true(bloom_test_t.c3 (#2) = 12345)] - ├── estimated rows: 4.00 + ├── estimated rows: 1.00 └── TableScan ├── table: default.default.bloom_test_t ├── output columns: [c3 (#2)] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/explain.test b/tests/sqllogictests/suites/mode/standalone/explain/explain.test index b532ad1fb8bba..15401b3b5a769 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/explain.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/explain.test @@ -1330,31 +1330,35 @@ HashJoin ├── filters: [t2.c (#2) > scalar_subquery_5 (#5)] ├── estimated rows: 0.00 ├── Sort(Build) -│ ├── output columns: [count(a) (#6), t1.c (#5), t1.a (#3)] +│ ├── output columns: [t1.c (#5), t1.a (#3), count(a) (#7)] │ ├── sort keys: [count(a) ASC NULLS LAST] │ ├── estimated rows: 0.00 -│ └── AggregateFinal -│ ├── output columns: [count(a) (#6), t1.c (#5), t1.a (#3)] -│ ├── group by: [c, a] -│ ├── aggregate functions: [count(a)] +│ └── EvalScalar +│ ├── output columns: [t1.c (#5), t1.a (#3), count(a) (#7)] +│ ├── expressions: [count(a) (#6)] │ ├── estimated rows: 0.00 -│ └── AggregatePartial +│ └── AggregateFinal +│ ├── output columns: [count(a) (#6), t1.c (#5), t1.a (#3)] │ ├── group by: [c, a] │ ├── aggregate functions: [count(a)] │ ├── estimated rows: 0.00 -│ └── Filter -│ ├── output columns: [t1.a (#3), t1.c (#5)] -│ ├── filters: [is_true(t1.a (#3) = t1.a (#3))] +│ └── AggregatePartial +│ ├── group by: [c, a] +│ ├── aggregate functions: [count(a)] │ ├── estimated rows: 0.00 -│ └── TableScan -│ ├── table: default.default.t1 -│ ├── output columns: [a (#3), c (#5)] -│ ├── read rows: 0 -│ ├── read size: 0 -│ ├── partitions total: 0 -│ ├── partitions scanned: 0 -│ ├── push downs: [filters: [is_true(t1.a (#3) = t1.a (#3))], limit: NONE] -│ └── estimated rows: 0.00 +│ └── Filter +│ ├── output columns: [t1.a (#3), t1.c (#5)] +│ ├── filters: [is_true(t1.a (#3) = t1.a (#3))] +│ ├── estimated rows: 0.00 +│ └── TableScan +│ ├── table: default.default.t1 +│ ├── output columns: [a (#3), c (#5)] +│ ├── read rows: 0 +│ ├── read size: 0 +│ ├── partitions total: 0 +│ ├── partitions scanned: 0 +│ ├── push downs: [filters: [is_true(t1.a (#3) = t1.a (#3))], limit: NONE] +│ └── estimated rows: 0.00 └── TableScan(Probe) ├── table: default.default.t2 ├── output columns: [a (#0), b (#1), c (#2)] @@ -1376,31 +1380,35 @@ HashJoin ├── filters: [t2.c (#2) > scalar_subquery_5 (#5)] ├── estimated rows: 0.00 ├── Sort(Build) -│ ├── output columns: [COUNT(*) (#6), t1.c (#5), t1.a (#3)] +│ ├── output columns: [t1.c (#5), t1.a (#3), COUNT(*) (#7)] │ ├── sort keys: [COUNT(*) ASC NULLS LAST] │ ├── estimated rows: 0.00 -│ └── AggregateFinal -│ ├── output columns: [COUNT(*) (#6), t1.c (#5), t1.a (#3)] -│ ├── group by: [c, a] -│ ├── aggregate functions: [count()] +│ └── EvalScalar +│ ├── output columns: [t1.c (#5), t1.a (#3), COUNT(*) (#7)] +│ ├── expressions: [COUNT(*) (#6)] │ ├── estimated rows: 0.00 -│ └── AggregatePartial +│ └── AggregateFinal +│ ├── output columns: [COUNT(*) (#6), t1.c (#5), t1.a (#3)] │ ├── group by: [c, a] │ ├── aggregate functions: [count()] │ ├── estimated rows: 0.00 -│ └── Filter -│ ├── output columns: [t1.a (#3), t1.c (#5)] -│ ├── filters: [is_true(a (#3) = a (#3))] +│ └── AggregatePartial +│ ├── group by: [c, a] +│ ├── aggregate functions: [count()] │ ├── estimated rows: 0.00 -│ └── TableScan -│ ├── table: default.default.t1 -│ ├── output columns: [a (#3), c (#5)] -│ ├── read rows: 0 -│ ├── read size: 0 -│ ├── partitions total: 0 -│ ├── partitions scanned: 0 -│ ├── push downs: [filters: [is_true(t1.a (#3) = t1.a (#3))], limit: NONE] -│ └── estimated rows: 0.00 +│ └── Filter +│ ├── output columns: [t1.a (#3), t1.c (#5)] +│ ├── filters: [is_true(a (#3) = a (#3))] +│ ├── estimated rows: 0.00 +│ └── TableScan +│ ├── table: default.default.t1 +│ ├── output columns: [a (#3), c (#5)] +│ ├── read rows: 0 +│ ├── read size: 0 +│ ├── partitions total: 0 +│ ├── partitions scanned: 0 +│ ├── push downs: [filters: [is_true(t1.a (#3) = t1.a (#3))], limit: NONE] +│ └── estimated rows: 0.00 └── TableScan(Probe) ├── table: default.default.t2 ├── output columns: [a (#0), b (#1), c (#2)] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/join.test b/tests/sqllogictests/suites/mode/standalone/explain/join.test index 48904b26f3573..86fdb263a1615 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/join.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/join.test @@ -238,7 +238,7 @@ HashJoin ├── build keys: [b.x (#1)] ├── probe keys: [a.x (#0)] ├── filters: [] -├── estimated rows: 3.56 +├── estimated rows: 2.37 ├── Filter(Build) │ ├── output columns: [b.x (#1), b.y (#2)] │ ├── filters: [is_true(b.x (#1) > 42)] @@ -316,11 +316,11 @@ HashJoin ├── build keys: [b.x (#1)] ├── probe keys: [a.x (#0)] ├── filters: [] -├── estimated rows: 2.67 +├── estimated rows: 2.37 ├── Filter(Build) │ ├── output columns: [b.x (#1), b.y (#2)] │ ├── filters: [is_true(b.x (#1) > 42), is_true(b.x (#1) < 45)] -│ ├── estimated rows: 2.00 +│ ├── estimated rows: 2.67 │ └── TableScan │ ├── table: default.default.twocolumn │ ├── output columns: [x (#1), y (#2)] diff --git a/tests/sqllogictests/suites/no_table_meta_cache/explain/auto_rebuild_missing_bloom_index.test b/tests/sqllogictests/suites/no_table_meta_cache/explain/auto_rebuild_missing_bloom_index.test index 04d0d1b59f703..18c1544db968e 100644 --- a/tests/sqllogictests/suites/no_table_meta_cache/explain/auto_rebuild_missing_bloom_index.test +++ b/tests/sqllogictests/suites/no_table_meta_cache/explain/auto_rebuild_missing_bloom_index.test @@ -122,7 +122,7 @@ explain select * from t where s = '12'; Filter ├── output columns: [t.c (#0), t.s (#1)] ├── filters: [is_true(t.s (#1) = '12')] -├── estimated rows: 2.67 +├── estimated rows: 2.00 └── TableScan ├── table: default.test_auto_rebuild_missing.t ├── output columns: [c (#0), s (#1)] From d530039a02371a6f5d0464859e5d79bb8c80f47a Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 14 Feb 2025 22:38:44 +0800 Subject: [PATCH 23/23] Revert change to tests Signed-off-by: Xuanwo --- .../mode/standalone/explain/explain.test | 80 +++++++++---------- 1 file changed, 36 insertions(+), 44 deletions(-) diff --git a/tests/sqllogictests/suites/mode/standalone/explain/explain.test b/tests/sqllogictests/suites/mode/standalone/explain/explain.test index 15401b3b5a769..b532ad1fb8bba 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/explain.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/explain.test @@ -1330,35 +1330,31 @@ HashJoin ├── filters: [t2.c (#2) > scalar_subquery_5 (#5)] ├── estimated rows: 0.00 ├── Sort(Build) -│ ├── output columns: [t1.c (#5), t1.a (#3), count(a) (#7)] +│ ├── output columns: [count(a) (#6), t1.c (#5), t1.a (#3)] │ ├── sort keys: [count(a) ASC NULLS LAST] │ ├── estimated rows: 0.00 -│ └── EvalScalar -│ ├── output columns: [t1.c (#5), t1.a (#3), count(a) (#7)] -│ ├── expressions: [count(a) (#6)] +│ └── AggregateFinal +│ ├── output columns: [count(a) (#6), t1.c (#5), t1.a (#3)] +│ ├── group by: [c, a] +│ ├── aggregate functions: [count(a)] │ ├── estimated rows: 0.00 -│ └── AggregateFinal -│ ├── output columns: [count(a) (#6), t1.c (#5), t1.a (#3)] +│ └── AggregatePartial │ ├── group by: [c, a] │ ├── aggregate functions: [count(a)] │ ├── estimated rows: 0.00 -│ └── AggregatePartial -│ ├── group by: [c, a] -│ ├── aggregate functions: [count(a)] +│ └── Filter +│ ├── output columns: [t1.a (#3), t1.c (#5)] +│ ├── filters: [is_true(t1.a (#3) = t1.a (#3))] │ ├── estimated rows: 0.00 -│ └── Filter -│ ├── output columns: [t1.a (#3), t1.c (#5)] -│ ├── filters: [is_true(t1.a (#3) = t1.a (#3))] -│ ├── estimated rows: 0.00 -│ └── TableScan -│ ├── table: default.default.t1 -│ ├── output columns: [a (#3), c (#5)] -│ ├── read rows: 0 -│ ├── read size: 0 -│ ├── partitions total: 0 -│ ├── partitions scanned: 0 -│ ├── push downs: [filters: [is_true(t1.a (#3) = t1.a (#3))], limit: NONE] -│ └── estimated rows: 0.00 +│ └── TableScan +│ ├── table: default.default.t1 +│ ├── output columns: [a (#3), c (#5)] +│ ├── read rows: 0 +│ ├── read size: 0 +│ ├── partitions total: 0 +│ ├── partitions scanned: 0 +│ ├── push downs: [filters: [is_true(t1.a (#3) = t1.a (#3))], limit: NONE] +│ └── estimated rows: 0.00 └── TableScan(Probe) ├── table: default.default.t2 ├── output columns: [a (#0), b (#1), c (#2)] @@ -1380,35 +1376,31 @@ HashJoin ├── filters: [t2.c (#2) > scalar_subquery_5 (#5)] ├── estimated rows: 0.00 ├── Sort(Build) -│ ├── output columns: [t1.c (#5), t1.a (#3), COUNT(*) (#7)] +│ ├── output columns: [COUNT(*) (#6), t1.c (#5), t1.a (#3)] │ ├── sort keys: [COUNT(*) ASC NULLS LAST] │ ├── estimated rows: 0.00 -│ └── EvalScalar -│ ├── output columns: [t1.c (#5), t1.a (#3), COUNT(*) (#7)] -│ ├── expressions: [COUNT(*) (#6)] +│ └── AggregateFinal +│ ├── output columns: [COUNT(*) (#6), t1.c (#5), t1.a (#3)] +│ ├── group by: [c, a] +│ ├── aggregate functions: [count()] │ ├── estimated rows: 0.00 -│ └── AggregateFinal -│ ├── output columns: [COUNT(*) (#6), t1.c (#5), t1.a (#3)] +│ └── AggregatePartial │ ├── group by: [c, a] │ ├── aggregate functions: [count()] │ ├── estimated rows: 0.00 -│ └── AggregatePartial -│ ├── group by: [c, a] -│ ├── aggregate functions: [count()] +│ └── Filter +│ ├── output columns: [t1.a (#3), t1.c (#5)] +│ ├── filters: [is_true(a (#3) = a (#3))] │ ├── estimated rows: 0.00 -│ └── Filter -│ ├── output columns: [t1.a (#3), t1.c (#5)] -│ ├── filters: [is_true(a (#3) = a (#3))] -│ ├── estimated rows: 0.00 -│ └── TableScan -│ ├── table: default.default.t1 -│ ├── output columns: [a (#3), c (#5)] -│ ├── read rows: 0 -│ ├── read size: 0 -│ ├── partitions total: 0 -│ ├── partitions scanned: 0 -│ ├── push downs: [filters: [is_true(t1.a (#3) = t1.a (#3))], limit: NONE] -│ └── estimated rows: 0.00 +│ └── TableScan +│ ├── table: default.default.t1 +│ ├── output columns: [a (#3), c (#5)] +│ ├── read rows: 0 +│ ├── read size: 0 +│ ├── partitions total: 0 +│ ├── partitions scanned: 0 +│ ├── push downs: [filters: [is_true(t1.a (#3) = t1.a (#3))], limit: NONE] +│ └── estimated rows: 0.00 └── TableScan(Probe) ├── table: default.default.t2 ├── output columns: [a (#0), b (#1), c (#2)]