diff --git a/Cargo.lock b/Cargo.lock index 05ebe1a21d02f..ff025f9ecf169 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10227,7 +10227,7 @@ dependencies = [ [[package]] name = "object_store_opendal" version = "0.49.0" -source = "git+https://github.com/apache/opendal?rev=78b6a9f#78b6a9f26dbeb5118990d3140f1c636d8e54e719" +source = "git+https://github.com/apache/opendal?rev=b8a3b7a#b8a3b7aa093d8695d89d579e146907850bb6565c" dependencies = [ "async-trait", "bytes", @@ -10308,7 +10308,7 @@ dependencies = [ [[package]] name = "opendal" version = "0.51.1" -source = "git+https://github.com/apache/opendal?rev=78b6a9f#78b6a9f26dbeb5118990d3140f1c636d8e54e719" +source = "git+https://github.com/apache/opendal?rev=b8a3b7a#b8a3b7aa093d8695d89d579e146907850bb6565c" dependencies = [ "anyhow", "async-backtrace", diff --git a/Cargo.toml b/Cargo.toml index 628c92bd82f45..06d5accb6bd7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -365,10 +365,10 @@ num-derive = "0.3.3" num-traits = "0.2.19" num_cpus = "1.13.1" object = "0.36.5" -object_store_opendal = { version = "0.49.0", package = "object_store_opendal", git = "https://github.com/apache/opendal", rev = "78b6a9f" } +object_store_opendal = { version = "0.49.0", package = "object_store_opendal", git = "https://github.com/apache/opendal", rev = "b8a3b7a" } once_cell = "1.15.0" openai_api_rust = "0.1" -opendal = { version = "0.51.1", package = "opendal", git = "https://github.com/apache/opendal", rev = "78b6a9f", features = [ +opendal = { version = "0.51.1", package = "opendal", git = "https://github.com/apache/opendal", rev = "b8a3b7a", features = [ "layers-fastrace", "layers-prometheus-client", "layers-async-backtrace", diff --git a/src/common/storage/src/operator.rs b/src/common/storage/src/operator.rs index 98b312543a970..68126cbf7b8b7 100644 --- a/src/common/storage/src/operator.rs +++ b/src/common/storage/src/operator.rs @@ -358,7 +358,8 @@ fn init_webhdfs_operator(v: &StorageWebhdfsConfig) -> Result { let mut builder = services::Webhdfs::default() .endpoint(&v.endpoint_url) .root(&v.root) - .delegation(&v.delegation); + .delegation(&v.delegation) + .user_name(&v.user_name); if v.disable_list_batch { builder = builder.disable_list_batch(); diff --git a/src/meta/app/src/storage/storage_params.rs b/src/meta/app/src/storage/storage_params.rs index ec9a7c5007158..92489a4fce0dc 100644 --- a/src/meta/app/src/storage/storage_params.rs +++ b/src/meta/app/src/storage/storage_params.rs @@ -523,6 +523,7 @@ pub struct StorageWebhdfsConfig { pub root: String, pub delegation: String, pub disable_list_batch: bool, + pub user_name: String, } impl Debug for StorageWebhdfsConfig { @@ -531,7 +532,8 @@ impl Debug for StorageWebhdfsConfig { ds.field("endpoint_url", &self.endpoint_url) .field("root", &self.root) - .field("disable_list_batch", &self.disable_list_batch); + .field("disable_list_batch", &self.disable_list_batch) + .field("user_name", &self.user_name); ds.field("delegation", &mask_string(&self.delegation, 3)); diff --git a/src/meta/proto-conv/src/config_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/config_from_to_protobuf_impl.rs index fc70c55f7d519..342b62138ef24 100644 --- a/src/meta/proto-conv/src/config_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/config_from_to_protobuf_impl.rs @@ -262,6 +262,7 @@ impl FromToProto for StorageWebhdfsConfig { root: p.root, delegation: p.delegation, disable_list_batch: p.disable_list_batch, + user_name: p.user_name, }) } @@ -274,8 +275,7 @@ impl FromToProto for StorageWebhdfsConfig { delegation: self.delegation.clone(), disable_list_batch: self.disable_list_batch, - username: String::new(), // reserved for future use - password: String::new(), // reserved for future use + user_name: self.user_name.clone(), }) } } diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index 2488187e21d8a..4c8c8db4e656b 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -146,7 +146,8 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (114, "2024-12-12: Add: New DataType Interval."), (115, "2024-12-16: Add: udf.proto: add UDAFScript and UDAFServer"), (116, "2025-01-09: Add: MarkedDeletedIndexMeta"), - (117, "2025-01-21: Add: config.proto: add disable_list_batch in WebhdfsConfig") + (117, "2025-01-21: Add: config.proto: add disable_list_batch in WebhdfsConfig"), + (118, "2025-01-22: Add: config.proto: add user_name in WebhdfsConfig") // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index ce2a46f0c2521..aadf166f7041b 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -115,3 +115,4 @@ mod v114_interval_datatype; mod v115_add_udaf_script; mod v116_marked_deleted_index_meta; mod v117_webhdfs_add_disable_list_batch; +mod v118_webhdfs_add_user_name; diff --git a/src/meta/proto-conv/tests/it/user_proto_conv.rs b/src/meta/proto-conv/tests/it/user_proto_conv.rs index 277314937762b..e72db4d47ca65 100644 --- a/src/meta/proto-conv/tests/it/user_proto_conv.rs +++ b/src/meta/proto-conv/tests/it/user_proto_conv.rs @@ -322,6 +322,7 @@ pub(crate) fn test_webhdfs_stage_info() -> mt::principal::StageInfo { root: "/path/to/stage/files".to_string(), delegation: "".to_string(), disable_list_batch: false, + user_name: String::new(), }), }, is_temporary: false, diff --git a/src/meta/proto-conv/tests/it/user_stage.rs b/src/meta/proto-conv/tests/it/user_stage.rs index e8dad60286c1c..97b78b5c30be8 100644 --- a/src/meta/proto-conv/tests/it/user_stage.rs +++ b/src/meta/proto-conv/tests/it/user_stage.rs @@ -95,6 +95,7 @@ fn test_user_stage_webhdfs_v30() -> anyhow::Result<()> { root: "/path/to/stage/files".to_string(), delegation: "".to_string(), disable_list_batch: false, + user_name: String::new(), }), }, file_format_params: mt::principal::FileFormatParams::Json( diff --git a/src/meta/proto-conv/tests/it/v030_user_stage.rs b/src/meta/proto-conv/tests/it/v030_user_stage.rs index f19500592ea59..a993bcb9dfcf2 100644 --- a/src/meta/proto-conv/tests/it/v030_user_stage.rs +++ b/src/meta/proto-conv/tests/it/v030_user_stage.rs @@ -54,6 +54,7 @@ fn test_decode_v30_user_stage() -> anyhow::Result<()> { root: "/path/to/stage/files".to_string(), delegation: "".to_string(), disable_list_batch: false, + user_name: String::new(), }), }, file_format_params: mt::principal::FileFormatParams::Json( diff --git a/src/meta/proto-conv/tests/it/v031_copy_max_file.rs b/src/meta/proto-conv/tests/it/v031_copy_max_file.rs index 768a941da3039..ffd14b0925e32 100644 --- a/src/meta/proto-conv/tests/it/v031_copy_max_file.rs +++ b/src/meta/proto-conv/tests/it/v031_copy_max_file.rs @@ -52,6 +52,7 @@ fn test_decode_v31_copy_max_file() -> anyhow::Result<()> { root: "/path/to/stage/files".to_string(), delegation: "".to_string(), disable_list_batch: false, + user_name: String::new(), }), }, file_format_params: mt::principal::FileFormatParams::Json( diff --git a/src/meta/proto-conv/tests/it/v118_webhdfs_add_user_name.rs b/src/meta/proto-conv/tests/it/v118_webhdfs_add_user_name.rs new file mode 100644 index 0000000000000..c8903a0181c75 --- /dev/null +++ b/src/meta/proto-conv/tests/it/v118_webhdfs_add_user_name.rs @@ -0,0 +1,90 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::DateTime; +use chrono::Utc; +use databend_common_meta_app as mt; +use databend_common_meta_app::principal::UserIdentity; +use databend_common_meta_app::storage::StorageParams; +use databend_common_meta_app::storage::StorageWebhdfsConfig; +use fastrace::func_name; + +use crate::common; + +// These bytes are built when a new version in introduced, +// and are kept for backward compatibility test. +// +// ************************************************************* +// * These messages should never be updated, * +// * only be added when a new version is added, * +// * or be removed when an old version is no longer supported. * +// ************************************************************* +// +#[test] +fn test_v118_webhdfs_add_user_name() -> anyhow::Result<()> { + // Encoded data of version 118 of databend_common_meta_app::principal::user_stage::StageInfo: + // It is generated with common::test_pb_from_to(). + let stage_info_v118 = vec![ + 10, 22, 119, 101, 98, 104, 100, 102, 115, 58, 47, 47, 100, 105, 114, 47, 116, 111, 47, 102, + 105, 108, 101, 115, 16, 1, 26, 34, 10, 32, 42, 30, 34, 20, 100, 97, 116, 97, 98, 101, 110, + 100, 95, 105, 115, 95, 112, 111, 119, 101, 114, 102, 117, 108, 48, 1, 160, 6, 118, 168, 6, + 24, 42, 11, 10, 2, 48, 2, 16, 142, 8, 24, 1, 56, 1, 50, 4, 116, 101, 115, 116, 56, 100, 66, + 29, 10, 8, 100, 97, 116, 97, 98, 101, 110, 100, 18, 11, 100, 97, 116, 97, 98, 101, 110, + 100, 46, 114, 115, 160, 6, 118, 168, 6, 24, 74, 10, 34, 8, 8, 2, 160, 6, 118, 168, 6, 24, + 82, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 48, 48, 32, 85, + 84, 67, 160, 6, 118, 168, 6, 24, + ]; + + let want = || mt::principal::StageInfo { + stage_name: "webhdfs://dir/to/files".to_string(), + stage_type: mt::principal::StageType::External, + stage_params: mt::principal::StageParams { + storage: StorageParams::Webhdfs(StorageWebhdfsConfig { + disable_list_batch: true, + user_name: "databend_is_powerful".to_string(), + ..Default::default() + }), + }, + is_temporary: false, + file_format_params: mt::principal::FileFormatParams::Json( + mt::principal::JsonFileFormatParams { + compression: mt::principal::StageFileCompression::Bz2, + }, + ), + copy_options: mt::principal::CopyOptions { + on_error: mt::principal::OnErrorMode::AbortNum(2), + size_limit: 1038, + max_files: 0, + split_size: 0, + purge: true, + single: false, + max_file_size: 0, + disable_variant_check: true, + return_failed_only: false, + detailed_output: false, + }, + comment: "test".to_string(), + number_of_files: 100, + creator: Some(UserIdentity { + username: "databend".to_string(), + hostname: "databend.rs".to_string(), + }), + created_on: DateTime::::default(), + }; + + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), stage_info_v118.as_slice(), 118, want())?; + + Ok(()) +} diff --git a/src/meta/protos/proto/config.proto b/src/meta/protos/proto/config.proto index 6c7647e9f2cf3..f1fed8ddfea45 100644 --- a/src/meta/protos/proto/config.proto +++ b/src/meta/protos/proto/config.proto @@ -96,8 +96,11 @@ message WebhdfsStorageConfig { string root = 2; string delegation = 3; - string username = 4; // reserved for future use - string password = 5; // reserved for future use + string user_name = 4; + + // We used to have password (5) but removed later, keep those index reserved. + // string password = 5; + reserved 5; bool disable_list_batch = 6; } diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index 2a52a2ff23bf6..1fac7a6454ec5 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -1198,6 +1198,13 @@ pub struct WebhdfsStorageConfig { )] #[serde(rename = "disable_list_batch")] pub webhdfs_disable_list_batch: bool, + #[clap( + long = "storage-webhdfs-user-name", + value_name = "VALUE", + default_value_t + )] + #[serde(rename = "user_name")] + pub webhdfs_user_name: String, } impl Default for WebhdfsStorageConfig { @@ -1210,7 +1217,8 @@ impl Debug for WebhdfsStorageConfig { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("WebhdfsStorageConfig") .field("endpoint_url", &self.webhdfs_endpoint_url) - .field("webhdfs_root", &self.webhdfs_root) + .field("root", &self.webhdfs_root) + .field("user_name", &self.webhdfs_user_name) .field("delegation", &mask_string(&self.webhdfs_delegation, 3)) .finish() } @@ -1223,6 +1231,7 @@ impl From for WebhdfsStorageConfig { webhdfs_endpoint_url: v.endpoint_url, webhdfs_root: v.root, webhdfs_disable_list_batch: v.disable_list_batch, + webhdfs_user_name: v.user_name, } } } @@ -1236,6 +1245,7 @@ impl TryFrom for InnerStorageWebhdfsConfig { endpoint_url: value.webhdfs_endpoint_url, root: value.webhdfs_root, disable_list_batch: value.webhdfs_disable_list_batch, + user_name: value.webhdfs_user_name, }) } } diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index a65f7c0100666..6cd0d750e7f7d 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -197,6 +197,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'storage' | 'webhdfs.disable_list_batch' | 'false' | '' | | 'storage' | 'webhdfs.endpoint_url' | '' | '' | | 'storage' | 'webhdfs.root' | '' | '' | +| 'storage' | 'webhdfs.user_name' | '' | '' | +-----------+-------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+ diff --git a/src/query/sql/src/planner/binder/location.rs b/src/query/sql/src/planner/binder/location.rs index 0e71bb8fd7ecf..49c37e7dffd43 100644 --- a/src/query/sql/src/planner/binder/location.rs +++ b/src/query/sql/src/planner/binder/location.rs @@ -411,12 +411,14 @@ fn parse_webhdfs_params(l: &mut UriLocation, root: String) -> Result Result<()> { ("https", "TrUE"), ("delegation", "databendthebest"), ("disable_list_batch", "true"), + ("user_name", "test"), ] .into_iter() .map(|(k, v)| (k.to_string(), v.to_string())) @@ -401,6 +402,7 @@ async fn test_parse_uri_location() -> Result<()> { endpoint_url: "https://example.com".to_string(), delegation: "databendthebest".to_string(), disable_list_batch: true, + user_name: "test".to_string(), }), "/".to_string(), ),