-
Notifications
You must be signed in to change notification settings - Fork 841
feat: Add hdfs support in iceberg and fill iceberg statistics #17352
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 15 commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
338bc81
feat: Add hdfs support in iceberg
Xuanwo 7906c26
Enable webhdfs support
Xuanwo 88d96af
Fix iceberg support
Xuanwo c2d5357
Merge remote-tracking branch 'origin/main' into iceberg-hdfs
Xuanwo d610d76
Add table statistics for iceberg
Xuanwo ed5b364
Also implement column statistics
Xuanwo 6ea601c
Fix scan
Xuanwo fd15437
Fix ndv is wrong
Xuanwo db65403
Add more info in iceberg table
Xuanwo 40db9fd
Fix column id
Xuanwo 6e26b30
Add debug for inner join
Xuanwo fd438d4
Merge remote-tracking branch 'origin' into iceberg-hdfs
Xuanwo 7b852ad
Try fix tests
Xuanwo 8fad3d8
Fix tests
Xuanwo ebd9afb
Merge branch 'main' into iceberg-hdfs
Xuanwo 02008f5
Fix system tables query
Xuanwo 864a810
Fix stats not calculated correctly
Xuanwo 4fa9ca0
Fix stats
Xuanwo 1427581
Fix stats
Xuanwo d4b3352
Try fix predicate
Xuanwo 52fc72e
Fix lock not updated
Xuanwo 5fc5ee1
Merge branch 'main' into iceberg-hdfs
Xuanwo 9199c19
Fix cargo
Xuanwo 526f55a
Fix tests
Xuanwo a3dffc5
Merge branch 'main' into iceberg-hdfs
Xuanwo 74488d7
revert scan changes
Xuanwo d931da8
Revert changes to tests
Xuanwo 812ee03
Merge remote-tracking branch 'refs/remotes/xuanwo/iceberg-hdfs' into …
Xuanwo 2d0797e
Merge remote-tracking branch 'origin/main' into iceberg-hdfs
Xuanwo d530039
Revert change to tests
Xuanwo e46820f
Merge branch 'main' into iceberg-hdfs
Xuanwo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,254 @@ | ||
| // Copyright 2021 Datafuse Labs | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| use std::collections::HashMap; | ||
|
|
||
| use databend_common_base::base::OrderedFloat; | ||
| use databend_common_catalog::statistics; | ||
| use databend_common_catalog::statistics::BasicColumnStatistics; | ||
| use databend_common_catalog::table::ColumnStatisticsProvider; | ||
| use databend_common_exception::ErrorCode; | ||
| use databend_common_exception::Result; | ||
| use databend_common_expression::types::Decimal; | ||
| use databend_common_expression::types::DecimalSize; | ||
| use databend_common_expression::types::Number; | ||
| use databend_common_expression::types::F32; | ||
| use databend_common_expression::types::F64; | ||
| use databend_common_expression::ColumnId; | ||
| use databend_common_expression::Scalar; | ||
| use databend_common_expression::TableField; | ||
| use databend_common_storage::Datum as DatabendDatum; | ||
| use iceberg::spec::DataContentType; | ||
| use iceberg::spec::Datum; | ||
| use iceberg::spec::ManifestStatus; | ||
| use iceberg::spec::PrimitiveLiteral; | ||
| use iceberg::spec::PrimitiveType; | ||
| use uuid::Uuid; | ||
|
|
||
| use crate::IcebergTable; | ||
|
|
||
| #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Default)] | ||
| pub struct IcebergStatistics { | ||
| /// Number of records in this table | ||
| pub record_count: u64, | ||
| /// Total table size in bytes | ||
| pub file_size_in_bytes: u64, | ||
| /// Number of manifest files in this table | ||
| pub number_of_manifest_files: u64, | ||
| /// Number of data files in this table | ||
| pub number_of_data_files: u64, | ||
|
|
||
| /// Computed statistics for each column | ||
| pub computed_statistics: HashMap<ColumnId, statistics::BasicColumnStatistics>, | ||
| } | ||
|
|
||
| impl IcebergStatistics { | ||
| /// Get statistics of an iceberg table. | ||
| pub async fn parse(table: &iceberg::table::Table) -> Result<IcebergStatistics> { | ||
| let Some(snapshot) = table.metadata().current_snapshot() else { | ||
| return Ok(IcebergStatistics::default()); | ||
| }; | ||
|
|
||
| // Map from column id to the total size on disk of all regions that | ||
| // store the column. Does not include bytes necessary to read other | ||
| // columns, like footers. Leave null for row-oriented formats (Avro) | ||
| let mut column_sizes: HashMap<i32, u64> = HashMap::new(); | ||
| // Map from column id to number of values in the column (including null | ||
| // and NaN values) | ||
| let mut value_counts: HashMap<i32, u64> = HashMap::new(); | ||
| // Map from column id to number of null values in the column | ||
| let mut null_value_counts: HashMap<i32, u64> = HashMap::new(); | ||
| // Map from column id to number of NaN values in the column | ||
| let mut nan_value_counts: HashMap<i32, u64> = HashMap::new(); | ||
| // Map from column id to lower bound in the column serialized as biary. | ||
| // Each value must be less than or equal to all non-null, non-NaN values | ||
| // in the column for the file. | ||
| // Reference: | ||
| // | ||
| // - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) | ||
| let mut lower_bounds: HashMap<i32, Datum> = HashMap::new(); | ||
| // Map from column id to upper bound in the column serialized as binary. | ||
| // Each value must be greater than or equal to all non-null, non-Nan | ||
| // values in the column for the file. | ||
| // | ||
| // Reference: | ||
| // | ||
| // - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) | ||
| let mut upper_bounds: HashMap<i32, Datum> = HashMap::new(); | ||
|
|
||
| let mut statistics = IcebergStatistics::default(); | ||
|
|
||
| let manifest = snapshot | ||
| .load_manifest_list(table.file_io(), table.metadata()) | ||
| .await | ||
| .map_err(|err| ErrorCode::Internal(format!("load manifest list error: {err:?}")))?; | ||
|
|
||
| for manifest_file in manifest.entries() { | ||
| statistics.number_of_manifest_files += 1; | ||
|
|
||
| let manifest = manifest_file | ||
| .load_manifest(table.file_io()) | ||
| .await | ||
| .map_err(|err| ErrorCode::Internal(format!("load manifest file error: {err:?}")))?; | ||
|
|
||
| manifest.entries().iter().for_each(|entry| { | ||
| if entry.status() == ManifestStatus::Deleted { | ||
| return; | ||
| } | ||
| let data_file = entry.data_file(); | ||
| if data_file.content_type() != DataContentType::Data { | ||
| return; | ||
| } | ||
|
|
||
| statistics.record_count += data_file.record_count(); | ||
| statistics.file_size_in_bytes += data_file.file_size_in_bytes(); | ||
| statistics.number_of_data_files += 1; | ||
|
|
||
| data_file.column_sizes().iter().for_each(|(col_id, size)| { | ||
| column_sizes.insert(*col_id, *size); | ||
| }); | ||
| data_file.value_counts().iter().for_each(|(col_id, count)| { | ||
| value_counts.insert(*col_id, *count); | ||
| }); | ||
| data_file | ||
| .null_value_counts() | ||
| .iter() | ||
| .for_each(|(col_id, count)| { | ||
| null_value_counts.insert(*col_id, *count); | ||
| }); | ||
| data_file | ||
| .nan_value_counts() | ||
| .iter() | ||
| .for_each(|(col_id, count)| { | ||
| nan_value_counts.insert(*col_id, *count); | ||
| }); | ||
|
|
||
| data_file | ||
| .lower_bounds() | ||
| .iter() | ||
| .for_each(|(&col_id, new_value)| { | ||
| lower_bounds | ||
| .entry(col_id) | ||
| .and_modify(|existing_value| { | ||
| if new_value < existing_value { | ||
| *existing_value = new_value.clone(); | ||
| } | ||
| }) | ||
| .or_insert_with(|| new_value.clone()); | ||
| }); | ||
|
|
||
| data_file | ||
| .upper_bounds() | ||
| .iter() | ||
| .for_each(|(&col_id, new_value)| { | ||
| upper_bounds | ||
| .entry(col_id) | ||
| .and_modify(|existing_value| { | ||
| if new_value > existing_value { | ||
| *existing_value = new_value.clone(); | ||
| } | ||
| }) | ||
| .or_insert_with(|| new_value.clone()); | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| let mut computed_statistics = HashMap::new(); | ||
| for field in IcebergTable::get_schema(table)?.fields() { | ||
| let column_stats: Option<BasicColumnStatistics> = get_column_stats( | ||
| field, | ||
| &column_sizes, | ||
| &lower_bounds, | ||
| &upper_bounds, | ||
| &null_value_counts, | ||
| ); | ||
| if let Some(stats) = column_stats { | ||
| computed_statistics.insert(field.column_id, stats); | ||
| } | ||
| } | ||
|
|
||
| statistics.computed_statistics = computed_statistics; | ||
| Ok(statistics) | ||
| } | ||
| } | ||
|
|
||
| impl ColumnStatisticsProvider for IcebergStatistics { | ||
| fn column_statistics(&self, column_id: ColumnId) -> Option<&statistics::BasicColumnStatistics> { | ||
| self.computed_statistics.get(&column_id) | ||
| } | ||
|
|
||
| fn num_rows(&self) -> Option<u64> { | ||
| Some(self.record_count) | ||
| } | ||
| } | ||
|
|
||
| /// Try get [`ColumnStatistics`] for one column. | ||
| fn get_column_stats( | ||
| field: &TableField, | ||
| column_size: &HashMap<i32, u64>, | ||
| lower: &HashMap<i32, Datum>, | ||
| upper: &HashMap<i32, Datum>, | ||
| null_counts: &HashMap<i32, u64>, | ||
| ) -> Option<BasicColumnStatistics> { | ||
| // The column id in iceberg is 1-based while the column id in Databend is 0-based. | ||
| let iceberg_col_id = field.column_id as i32 + 1; | ||
| match ( | ||
| column_size.get(&iceberg_col_id), | ||
| lower.get(&iceberg_col_id), | ||
| upper.get(&iceberg_col_id), | ||
| null_counts.get(&iceberg_col_id), | ||
| ) { | ||
| (Some(_size), Some(lo), Some(up), Some(nc)) => Some(BasicColumnStatistics { | ||
| min: parse_datum(lo).and_then(DatabendDatum::from_scalar), | ||
| max: parse_datum(up).and_then(DatabendDatum::from_scalar), | ||
| ndv: None, | ||
| null_count: *nc, | ||
| }), | ||
| _ => None, | ||
| } | ||
| } | ||
|
|
||
| /// Try to parse iceberg [`Datum`] to databend [`Scalar`]. | ||
| pub fn parse_datum(data: &Datum) -> Option<Scalar> { | ||
| match data.literal() { | ||
| PrimitiveLiteral::Boolean(v) => Some(Scalar::Boolean(*v)), | ||
| PrimitiveLiteral::Int(v) => Some(Scalar::Number(i32::upcast_scalar(*v))), | ||
| PrimitiveLiteral::Long(v) => Some(Scalar::Number(i64::upcast_scalar(*v))), | ||
| PrimitiveLiteral::Float(v) => { | ||
| Some(Scalar::Number(F32::upcast_scalar(OrderedFloat::from(v.0)))) | ||
| } | ||
| PrimitiveLiteral::Double(v) => { | ||
| Some(Scalar::Number(F64::upcast_scalar(OrderedFloat::from(v.0)))) | ||
| } | ||
| PrimitiveLiteral::String(v) => Some(Scalar::String(v.clone())), | ||
| PrimitiveLiteral::Binary(v) => Some(Scalar::Binary(v.clone())), | ||
| // Iceberg use i128 to represent decimal | ||
| PrimitiveLiteral::Int128(v) => { | ||
| if let PrimitiveType::Decimal { precision, scale } = data.data_type() { | ||
| Some(Scalar::Decimal(v.to_scalar(DecimalSize { | ||
| precision: *precision as u8, | ||
| scale: *scale as u8, | ||
| }))) | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
| // Iceberg use u128 to represent uuid | ||
| PrimitiveLiteral::UInt128(v) => { | ||
| Some(Scalar::String(Uuid::from_u128(*v).as_simple().to_string())) | ||
| } | ||
| PrimitiveLiteral::AboveMax => None, | ||
| PrimitiveLiteral::BelowMin => None, | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.