diff --git a/src/common/storage/src/operator.rs b/src/common/storage/src/operator.rs index 8b77fab3b10a1..98b312543a970 100644 --- a/src/common/storage/src/operator.rs +++ b/src/common/storage/src/operator.rs @@ -355,11 +355,15 @@ fn init_moka_operator(v: &StorageMokaConfig) -> Result { /// init_webhdfs_operator will init a WebHDFS operator fn init_webhdfs_operator(v: &StorageWebhdfsConfig) -> Result { - let builder = services::Webhdfs::default() + let mut builder = services::Webhdfs::default() .endpoint(&v.endpoint_url) .root(&v.root) .delegation(&v.delegation); + if v.disable_list_batch { + builder = builder.disable_list_batch(); + } + Ok(builder) } diff --git a/src/meta/app/src/storage/storage_params.rs b/src/meta/app/src/storage/storage_params.rs index b0a3cd653e10b..ec9a7c5007158 100644 --- a/src/meta/app/src/storage/storage_params.rs +++ b/src/meta/app/src/storage/storage_params.rs @@ -522,6 +522,7 @@ pub struct StorageWebhdfsConfig { pub endpoint_url: String, pub root: String, pub delegation: String, + pub disable_list_batch: bool, } impl Debug for StorageWebhdfsConfig { @@ -529,7 +530,8 @@ impl Debug for StorageWebhdfsConfig { let mut ds = f.debug_struct("StorageWebhdfsConfig"); ds.field("endpoint_url", &self.endpoint_url) - .field("root", &self.root); + .field("root", &self.root) + .field("disable_list_batch", &self.disable_list_batch); ds.field("delegation", &mask_string(&self.delegation, 3)); diff --git a/src/meta/proto-conv/src/config_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/config_from_to_protobuf_impl.rs index bf29c0e2ed3a9..fc70c55f7d519 100644 --- a/src/meta/proto-conv/src/config_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/config_from_to_protobuf_impl.rs @@ -261,6 +261,7 @@ impl FromToProto for StorageWebhdfsConfig { endpoint_url: p.endpoint_url, root: p.root, delegation: p.delegation, + disable_list_batch: p.disable_list_batch, }) } @@ -271,6 +272,7 @@ impl FromToProto for StorageWebhdfsConfig { endpoint_url: self.endpoint_url.clone(), root: self.root.clone(), delegation: self.delegation.clone(), + disable_list_batch: self.disable_list_batch, username: String::new(), // reserved for future use password: String::new(), // reserved for future use diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index a9ef7e617502d..2488187e21d8a 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -146,6 +146,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (114, "2024-12-12: Add: New DataType Interval."), (115, "2024-12-16: Add: udf.proto: add UDAFScript and UDAFServer"), (116, "2025-01-09: Add: MarkedDeletedIndexMeta"), + (117, "2025-01-21: Add: config.proto: add disable_list_batch in WebhdfsConfig") // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index 432c9c0e087da..ce2a46f0c2521 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -114,3 +114,4 @@ mod v113_warehouse_grantobject; mod v114_interval_datatype; mod v115_add_udaf_script; mod v116_marked_deleted_index_meta; +mod v117_webhdfs_add_disable_list_batch; diff --git a/src/meta/proto-conv/tests/it/user_proto_conv.rs b/src/meta/proto-conv/tests/it/user_proto_conv.rs index 9311068844603..277314937762b 100644 --- a/src/meta/proto-conv/tests/it/user_proto_conv.rs +++ b/src/meta/proto-conv/tests/it/user_proto_conv.rs @@ -321,6 +321,7 @@ pub(crate) fn test_webhdfs_stage_info() -> mt::principal::StageInfo { endpoint_url: "https://webhdfs.example.com".to_string(), root: "/path/to/stage/files".to_string(), delegation: "".to_string(), + disable_list_batch: false, }), }, is_temporary: false, diff --git a/src/meta/proto-conv/tests/it/user_stage.rs b/src/meta/proto-conv/tests/it/user_stage.rs index 61dfcd8310dec..e8dad60286c1c 100644 --- a/src/meta/proto-conv/tests/it/user_stage.rs +++ b/src/meta/proto-conv/tests/it/user_stage.rs @@ -94,6 +94,7 @@ fn test_user_stage_webhdfs_v30() -> anyhow::Result<()> { endpoint_url: "https://webhdfs.example.com".to_string(), root: "/path/to/stage/files".to_string(), delegation: "".to_string(), + disable_list_batch: false, }), }, file_format_params: mt::principal::FileFormatParams::Json( diff --git a/src/meta/proto-conv/tests/it/v030_user_stage.rs b/src/meta/proto-conv/tests/it/v030_user_stage.rs index b52ec2b46d14c..f19500592ea59 100644 --- a/src/meta/proto-conv/tests/it/v030_user_stage.rs +++ b/src/meta/proto-conv/tests/it/v030_user_stage.rs @@ -53,6 +53,7 @@ fn test_decode_v30_user_stage() -> anyhow::Result<()> { endpoint_url: "https://webhdfs.example.com".to_string(), root: "/path/to/stage/files".to_string(), delegation: "".to_string(), + disable_list_batch: false, }), }, file_format_params: mt::principal::FileFormatParams::Json( diff --git a/src/meta/proto-conv/tests/it/v031_copy_max_file.rs b/src/meta/proto-conv/tests/it/v031_copy_max_file.rs index dd03c215ba1da..768a941da3039 100644 --- a/src/meta/proto-conv/tests/it/v031_copy_max_file.rs +++ b/src/meta/proto-conv/tests/it/v031_copy_max_file.rs @@ -51,6 +51,7 @@ fn test_decode_v31_copy_max_file() -> anyhow::Result<()> { endpoint_url: "https://webhdfs.example.com".to_string(), root: "/path/to/stage/files".to_string(), delegation: "".to_string(), + disable_list_batch: false, }), }, file_format_params: mt::principal::FileFormatParams::Json( diff --git a/src/meta/proto-conv/tests/it/v117_webhdfs_add_disable_list_batch.rs b/src/meta/proto-conv/tests/it/v117_webhdfs_add_disable_list_batch.rs new file mode 100644 index 0000000000000..b7b034c8a66ee --- /dev/null +++ b/src/meta/proto-conv/tests/it/v117_webhdfs_add_disable_list_batch.rs @@ -0,0 +1,88 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::DateTime; +use chrono::Utc; +use databend_common_meta_app as mt; +use databend_common_meta_app::principal::UserIdentity; +use databend_common_meta_app::storage::StorageParams; +use databend_common_meta_app::storage::StorageWebhdfsConfig; +use fastrace::func_name; + +use crate::common; + +// These bytes are built when a new version in introduced, +// and are kept for backward compatibility test. +// +// ************************************************************* +// * These messages should never be updated, * +// * only be added when a new version is added, * +// * or be removed when an old version is no longer supported. * +// ************************************************************* +// +#[test] +fn test_v117_webhdfs_add_disable_list_batch() -> anyhow::Result<()> { + // Encoded data of version 117 of databend_common_meta_app::principal::user_stage::StageInfo: + // It is generated with common::test_pb_from_to(). + let stage_info_v117 = vec![ + 10, 22, 119, 101, 98, 104, 100, 102, 115, 58, 47, 47, 100, 105, 114, 47, 116, 111, 47, 102, + 105, 108, 101, 115, 16, 1, 26, 12, 10, 10, 42, 8, 48, 1, 160, 6, 117, 168, 6, 24, 42, 11, + 10, 2, 48, 2, 16, 142, 8, 24, 1, 56, 1, 50, 4, 116, 101, 115, 116, 56, 100, 66, 29, 10, 8, + 100, 97, 116, 97, 98, 101, 110, 100, 18, 11, 100, 97, 116, 97, 98, 101, 110, 100, 46, 114, + 115, 160, 6, 117, 168, 6, 24, 74, 10, 34, 8, 8, 2, 160, 6, 117, 168, 6, 24, 82, 23, 49, 57, + 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 48, 48, 32, 85, 84, 67, 160, 6, + 117, 168, 6, 24, + ]; + + let want = || mt::principal::StageInfo { + stage_name: "webhdfs://dir/to/files".to_string(), + stage_type: mt::principal::StageType::External, + stage_params: mt::principal::StageParams { + storage: StorageParams::Webhdfs(StorageWebhdfsConfig { + disable_list_batch: true, + ..Default::default() + }), + }, + is_temporary: false, + file_format_params: mt::principal::FileFormatParams::Json( + mt::principal::JsonFileFormatParams { + compression: mt::principal::StageFileCompression::Bz2, + }, + ), + copy_options: mt::principal::CopyOptions { + on_error: mt::principal::OnErrorMode::AbortNum(2), + size_limit: 1038, + max_files: 0, + split_size: 0, + purge: true, + single: false, + max_file_size: 0, + disable_variant_check: true, + return_failed_only: false, + detailed_output: false, + }, + comment: "test".to_string(), + number_of_files: 100, + creator: Some(UserIdentity { + username: "databend".to_string(), + hostname: "databend.rs".to_string(), + }), + created_on: DateTime::::default(), + }; + + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), stage_info_v117.as_slice(), 117, want())?; + + Ok(()) +} diff --git a/src/meta/protos/proto/config.proto b/src/meta/protos/proto/config.proto index 9d0f87614a6a5..6c7647e9f2cf3 100644 --- a/src/meta/protos/proto/config.proto +++ b/src/meta/protos/proto/config.proto @@ -98,6 +98,8 @@ message WebhdfsStorageConfig { string username = 4; // reserved for future use string password = 5; // reserved for future use + + bool disable_list_batch = 6; } message ObsStorageConfig { diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index 4c6b9c3fec636..2a52a2ff23bf6 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -1190,6 +1190,14 @@ pub struct WebhdfsStorageConfig { #[clap(long = "storage-webhdfs-root", value_name = "VALUE", default_value_t)] #[serde(rename = "root")] pub webhdfs_root: String, + /// Disable list batch if hdfs doesn't support yet. + #[clap( + long = "storage-webhdfs-disable-list-batch", + value_name = "VALUE", + default_value_t + )] + #[serde(rename = "disable_list_batch")] + pub webhdfs_disable_list_batch: bool, } impl Default for WebhdfsStorageConfig { @@ -1214,6 +1222,7 @@ impl From for WebhdfsStorageConfig { webhdfs_delegation: v.delegation, webhdfs_endpoint_url: v.endpoint_url, webhdfs_root: v.root, + webhdfs_disable_list_batch: v.disable_list_batch, } } } @@ -1226,6 +1235,7 @@ impl TryFrom for InnerStorageWebhdfsConfig { delegation: value.webhdfs_delegation, endpoint_url: value.webhdfs_endpoint_url, root: value.webhdfs_root, + disable_list_batch: value.webhdfs_disable_list_batch, }) } } diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index 44d919f0e3c33..a65f7c0100666 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -194,6 +194,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'storage' | 'storage_type' | 'null' | '' | | 'storage' | 'type' | 'fs' | '' | | 'storage' | 'webhdfs.delegation' | '' | '' | +| 'storage' | 'webhdfs.disable_list_batch' | 'false' | '' | | 'storage' | 'webhdfs.endpoint_url' | '' | '' | | 'storage' | 'webhdfs.root' | '' | '' | +-----------+-------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+ diff --git a/src/query/sql/src/planner/binder/location.rs b/src/query/sql/src/planner/binder/location.rs index 5d74c26dcf4bc..0e71bb8fd7ecf 100644 --- a/src/query/sql/src/planner/binder/location.rs +++ b/src/query/sql/src/planner/binder/location.rs @@ -397,11 +397,26 @@ fn parse_webhdfs_params(l: &mut UriLocation, root: String) -> Result()) + .unwrap_or(Ok(true)) + .map_err(|e| { + Error::new( + ErrorKind::InvalidInput, + format!( + "disable_list_batch should be `TRUE` or `FALSE`, parse error with: {:?}", + e, + ), + ) + })?; let sp = StorageParams::Webhdfs(StorageWebhdfsConfig { endpoint_url, root, delegation, + disable_list_batch, }); l.connection diff --git a/src/query/sql/tests/location.rs b/src/query/sql/tests/location.rs index 106cca1ed476e..6b869c1de00be 100644 --- a/src/query/sql/tests/location.rs +++ b/src/query/sql/tests/location.rs @@ -386,16 +386,21 @@ async fn test_parse_uri_location() -> Result<()> { "webhdfs".to_string(), "example.com".to_string(), "/path/to/dir/".to_string(), - vec![("https", "TrUE"), ("delegation", "databendthebest")] - .into_iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect::>(), + vec![ + ("https", "TrUE"), + ("delegation", "databendthebest"), + ("disable_list_batch", "true"), + ] + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect::>(), ), ( StorageParams::Webhdfs(StorageWebhdfsConfig { root: "/path/to/dir/".to_string(), endpoint_url: "https://example.com".to_string(), delegation: "databendthebest".to_string(), + disable_list_batch: true, }), "/".to_string(), ),