Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
338bc81
feat: Add hdfs support in iceberg
Xuanwo Jan 22, 2025
7906c26
Enable webhdfs support
Xuanwo Jan 24, 2025
88d96af
Fix iceberg support
Xuanwo Feb 10, 2025
c2d5357
Merge remote-tracking branch 'origin/main' into iceberg-hdfs
Xuanwo Feb 10, 2025
d610d76
Add table statistics for iceberg
Xuanwo Feb 11, 2025
ed5b364
Also implement column statistics
Xuanwo Feb 11, 2025
6ea601c
Fix scan
Xuanwo Feb 11, 2025
fd15437
Fix ndv is wrong
Xuanwo Feb 11, 2025
db65403
Add more info in iceberg table
Xuanwo Feb 11, 2025
40db9fd
Fix column id
Xuanwo Feb 11, 2025
6e26b30
Add debug for inner join
Xuanwo Feb 11, 2025
fd438d4
Merge remote-tracking branch 'origin' into iceberg-hdfs
Xuanwo Feb 12, 2025
7b852ad
Try fix tests
Xuanwo Feb 12, 2025
8fad3d8
Fix tests
Xuanwo Feb 12, 2025
ebd9afb
Merge branch 'main' into iceberg-hdfs
Xuanwo Feb 12, 2025
02008f5
Fix system tables query
Xuanwo Feb 12, 2025
864a810
Fix stats not calculated correctly
Xuanwo Feb 13, 2025
4fa9ca0
Fix stats
Xuanwo Feb 13, 2025
1427581
Fix stats
Xuanwo Feb 13, 2025
d4b3352
Try fix predicate
Xuanwo Feb 13, 2025
52fc72e
Fix lock not updated
Xuanwo Feb 13, 2025
5fc5ee1
Merge branch 'main' into iceberg-hdfs
Xuanwo Feb 14, 2025
9199c19
Fix cargo
Xuanwo Feb 14, 2025
526f55a
Fix tests
Xuanwo Feb 14, 2025
a3dffc5
Merge branch 'main' into iceberg-hdfs
Xuanwo Feb 14, 2025
74488d7
revert scan changes
Xuanwo Feb 14, 2025
d931da8
Revert changes to tests
Xuanwo Feb 14, 2025
812ee03
Merge remote-tracking branch 'refs/remotes/xuanwo/iceberg-hdfs' into …
Xuanwo Feb 14, 2025
2d0797e
Merge remote-tracking branch 'origin/main' into iceberg-hdfs
Xuanwo Feb 14, 2025
d530039
Revert change to tests
Xuanwo Feb 14, 2025
e46820f
Merge branch 'main' into iceberg-hdfs
Xuanwo Feb 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 138 additions & 80 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ arrow-data = { version = "53" }
arrow-flight = { version = "53", features = ["flight-sql-experimental", "tls"] }
arrow-ipc = { version = "53" }
arrow-ord = { version = "53" }
arrow-schema = { version = "53", features = ["serde"] }
arrow-schema = { version = "53.4", features = ["serde"] }
arrow-select = { version = "53" }
arrow-udf-js = { version = "0.5.0" }
arrow-udf-python = { version = "0.4.0" }
Expand Down Expand Up @@ -323,10 +323,12 @@ http = "1"
humantime = "2.1.0"
hyper = "1"
hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] }
iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
iceberg = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11", features = [
"storage-all",
] }
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" }
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" }
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "56fa9d11" }
indexmap = "2.0.0"
indicatif = "0.17.5"
itertools = "0.13.0"
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/src/scalars/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ fn register_grouping(registry: &mut FunctionRegistry) {
.collect::<Vec<_>>();
Value::Column(Column::Number(NumberColumn::UInt32(output.into())))
}
_ => unreachable!(),
v => unreachable!("unexpected value type for grouping function: {:?}", v),
}),
},
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ impl HashJoinProbeState {
}

if FROM_LEFT_SINGLE && match_count > 1 {
return Err(ErrorCode::Internal(
"Scalar subquery can't return more than one row",
));
return Err(ErrorCode::Internal(format!(
"Scalar subquery can't return more than one row, but got {}",
match_count
)));
}

// Fill `probe_indexes`.
Expand Down
28 changes: 13 additions & 15 deletions src/query/sql/src/planner/plans/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,28 +234,26 @@ impl Operator for Scan {
continue;
}
if let Some(col_stat) = v.clone() {
// Safe to unwrap: min, max and ndv are all `Some(_)`.
// Safe to unwrap: min, max are all `Some(_)`.
let min = col_stat.min.unwrap();
let max = col_stat.max.unwrap();
let ndv = col_stat.ndv.unwrap();
let num_rows = num_rows.saturating_sub(col_stat.null_count);
let ndv = col_stat.ndv.unwrap_or(num_rows);
let ndv = std::cmp::min(num_rows, ndv);
let histogram = if let Some(histogram) = self.statistics.histograms.get(k)
&& histogram.is_some()
{
histogram.clone()
} else if num_rows != 0 {
histogram_from_ndv(
ndv,
num_rows,
Some((min.clone(), max.clone())),
DEFAULT_HISTOGRAM_BUCKETS,
)
.ok()
} else {
let num_rows = num_rows.saturating_sub(col_stat.null_count);
let ndv = std::cmp::min(num_rows, ndv);
if num_rows != 0 {
histogram_from_ndv(
ndv,
num_rows,
Some((min.clone(), max.clone())),
DEFAULT_HISTOGRAM_BUCKETS,
)
.ok()
} else {
None
}
None
};
let column_stat = ColumnStat {
min,
Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ databend-common-meta-app = { workspace = true }
databend-common-meta-store = { workspace = true }
databend-common-meta-types = { workspace = true }
databend-common-pipeline-core = { workspace = true }
databend-common-storage = { workspace = true }
databend-common-storages-parquet = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
fastrace = { workspace = true }
Expand All @@ -31,6 +32,7 @@ iceberg-catalog-rest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
typetag = { workspace = true }
uuid = { workspace = true }

[lints]
workspace = true
1 change: 1 addition & 0 deletions src/query/storages/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod catalog;
mod database;
mod partition;
mod predicate;
mod statistics;
mod table;
mod table_source;

Expand Down
254 changes: 254 additions & 0 deletions src/query/storages/iceberg/src/statistics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use databend_common_base::base::OrderedFloat;
use databend_common_catalog::statistics;
use databend_common_catalog::statistics::BasicColumnStatistics;
use databend_common_catalog::table::ColumnStatisticsProvider;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::Decimal;
use databend_common_expression::types::DecimalSize;
use databend_common_expression::types::Number;
use databend_common_expression::types::F32;
use databend_common_expression::types::F64;
use databend_common_expression::ColumnId;
use databend_common_expression::Scalar;
use databend_common_expression::TableField;
use databend_common_storage::Datum as DatabendDatum;
use iceberg::spec::DataContentType;
use iceberg::spec::Datum;
use iceberg::spec::ManifestStatus;
use iceberg::spec::PrimitiveLiteral;
use iceberg::spec::PrimitiveType;
use uuid::Uuid;

use crate::IcebergTable;

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Default)]
pub struct IcebergStatistics {
/// Number of records in this table
pub record_count: u64,
/// Total table size in bytes
pub file_size_in_bytes: u64,
/// Number of manifest files in this table
pub number_of_manifest_files: u64,
/// Number of data files in this table
pub number_of_data_files: u64,

/// Computed statistics for each column
pub computed_statistics: HashMap<ColumnId, statistics::BasicColumnStatistics>,
}

impl IcebergStatistics {
/// Get statistics of an iceberg table.
pub async fn parse(table: &iceberg::table::Table) -> Result<IcebergStatistics> {
let Some(snapshot) = table.metadata().current_snapshot() else {
return Ok(IcebergStatistics::default());
};

// Map from column id to the total size on disk of all regions that
// store the column. Does not include bytes necessary to read other
// columns, like footers. Leave null for row-oriented formats (Avro)
let mut column_sizes: HashMap<i32, u64> = HashMap::new();
// Map from column id to number of values in the column (including null
// and NaN values)
let mut value_counts: HashMap<i32, u64> = HashMap::new();
// Map from column id to number of null values in the column
let mut null_value_counts: HashMap<i32, u64> = HashMap::new();
// Map from column id to number of NaN values in the column
let mut nan_value_counts: HashMap<i32, u64> = HashMap::new();
// Map from column id to lower bound in the column serialized as biary.
// Each value must be less than or equal to all non-null, non-NaN values
// in the column for the file.
// Reference:
//
// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
let mut lower_bounds: HashMap<i32, Datum> = HashMap::new();
// Map from column id to upper bound in the column serialized as binary.
// Each value must be greater than or equal to all non-null, non-Nan
// values in the column for the file.
//
// Reference:
//
// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
let mut upper_bounds: HashMap<i32, Datum> = HashMap::new();

let mut statistics = IcebergStatistics::default();

let manifest = snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.map_err(|err| ErrorCode::Internal(format!("load manifest list error: {err:?}")))?;

for manifest_file in manifest.entries() {
statistics.number_of_manifest_files += 1;

let manifest = manifest_file
.load_manifest(table.file_io())
.await
.map_err(|err| ErrorCode::Internal(format!("load manifest file error: {err:?}")))?;

manifest.entries().iter().for_each(|entry| {
if entry.status() == ManifestStatus::Deleted {
return;
}
let data_file = entry.data_file();
if data_file.content_type() != DataContentType::Data {
return;
}

statistics.record_count += data_file.record_count();
statistics.file_size_in_bytes += data_file.file_size_in_bytes();
statistics.number_of_data_files += 1;

data_file.column_sizes().iter().for_each(|(col_id, size)| {
column_sizes.insert(*col_id, *size);
});
data_file.value_counts().iter().for_each(|(col_id, count)| {
value_counts.insert(*col_id, *count);
});
data_file
.null_value_counts()
.iter()
.for_each(|(col_id, count)| {
null_value_counts.insert(*col_id, *count);
});
data_file
.nan_value_counts()
.iter()
.for_each(|(col_id, count)| {
nan_value_counts.insert(*col_id, *count);
});

data_file
.lower_bounds()
.iter()
.for_each(|(&col_id, new_value)| {
lower_bounds
.entry(col_id)
.and_modify(|existing_value| {
if new_value < existing_value {
*existing_value = new_value.clone();
}
})
.or_insert_with(|| new_value.clone());
});

data_file
.upper_bounds()
.iter()
.for_each(|(&col_id, new_value)| {
upper_bounds
.entry(col_id)
.and_modify(|existing_value| {
if new_value > existing_value {
*existing_value = new_value.clone();
}
})
.or_insert_with(|| new_value.clone());
});
});
}

let mut computed_statistics = HashMap::new();
for field in IcebergTable::get_schema(table)?.fields() {
let column_stats: Option<BasicColumnStatistics> = get_column_stats(
field,
&column_sizes,
&lower_bounds,
&upper_bounds,
&null_value_counts,
);
if let Some(stats) = column_stats {
computed_statistics.insert(field.column_id, stats);
}
}

statistics.computed_statistics = computed_statistics;
Ok(statistics)
}
}

impl ColumnStatisticsProvider for IcebergStatistics {
fn column_statistics(&self, column_id: ColumnId) -> Option<&statistics::BasicColumnStatistics> {
self.computed_statistics.get(&column_id)
}

fn num_rows(&self) -> Option<u64> {
Some(self.record_count)
}
}

/// Try get [`ColumnStatistics`] for one column.
fn get_column_stats(
field: &TableField,
column_size: &HashMap<i32, u64>,
lower: &HashMap<i32, Datum>,
upper: &HashMap<i32, Datum>,
null_counts: &HashMap<i32, u64>,
) -> Option<BasicColumnStatistics> {
// The column id in iceberg is 1-based while the column id in Databend is 0-based.
let iceberg_col_id = field.column_id as i32 + 1;
match (
column_size.get(&iceberg_col_id),
lower.get(&iceberg_col_id),
upper.get(&iceberg_col_id),
null_counts.get(&iceberg_col_id),
) {
(Some(_size), Some(lo), Some(up), Some(nc)) => Some(BasicColumnStatistics {
min: parse_datum(lo).and_then(DatabendDatum::from_scalar),
max: parse_datum(up).and_then(DatabendDatum::from_scalar),
ndv: None,
null_count: *nc,
}),
_ => None,
}
}

/// Try to parse iceberg [`Datum`] to databend [`Scalar`].
pub fn parse_datum(data: &Datum) -> Option<Scalar> {
match data.literal() {
PrimitiveLiteral::Boolean(v) => Some(Scalar::Boolean(*v)),
PrimitiveLiteral::Int(v) => Some(Scalar::Number(i32::upcast_scalar(*v))),
PrimitiveLiteral::Long(v) => Some(Scalar::Number(i64::upcast_scalar(*v))),
PrimitiveLiteral::Float(v) => {
Some(Scalar::Number(F32::upcast_scalar(OrderedFloat::from(v.0))))
}
PrimitiveLiteral::Double(v) => {
Some(Scalar::Number(F64::upcast_scalar(OrderedFloat::from(v.0))))
}
PrimitiveLiteral::String(v) => Some(Scalar::String(v.clone())),
PrimitiveLiteral::Binary(v) => Some(Scalar::Binary(v.clone())),
// Iceberg use i128 to represent decimal
PrimitiveLiteral::Int128(v) => {
if let PrimitiveType::Decimal { precision, scale } = data.data_type() {
Some(Scalar::Decimal(v.to_scalar(DecimalSize {
precision: *precision as u8,
scale: *scale as u8,
})))
} else {
None
}
}
// Iceberg use u128 to represent uuid
PrimitiveLiteral::UInt128(v) => {
Some(Scalar::String(Uuid::from_u128(*v).as_simple().to_string()))
}
PrimitiveLiteral::AboveMax => None,
PrimitiveLiteral::BelowMin => None,
}
}
Loading
Loading