Skip to content

Commit 12414d3

Browse files
authored
feat: Add disable_list_batch for webhdfs (#17345)
* feat: Add disable_list_batch for webhdfs Signed-off-by: Xuanwo <[email protected]> * Fix test Signed-off-by: Xuanwo <[email protected]> --------- Signed-off-by: Xuanwo <[email protected]>
1 parent 0154424 commit 12414d3

File tree

15 files changed

+141
-6
lines changed

15 files changed

+141
-6
lines changed

src/common/storage/src/operator.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,11 +355,15 @@ fn init_moka_operator(v: &StorageMokaConfig) -> Result<impl Builder> {
355355

356356
/// init_webhdfs_operator will init a WebHDFS operator
357357
fn init_webhdfs_operator(v: &StorageWebhdfsConfig) -> Result<impl Builder> {
358-
let builder = services::Webhdfs::default()
358+
let mut builder = services::Webhdfs::default()
359359
.endpoint(&v.endpoint_url)
360360
.root(&v.root)
361361
.delegation(&v.delegation);
362362

363+
if v.disable_list_batch {
364+
builder = builder.disable_list_batch();
365+
}
366+
363367
Ok(builder)
364368
}
365369

src/meta/app/src/storage/storage_params.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,14 +522,16 @@ pub struct StorageWebhdfsConfig {
522522
pub endpoint_url: String,
523523
pub root: String,
524524
pub delegation: String,
525+
pub disable_list_batch: bool,
525526
}
526527

527528
impl Debug for StorageWebhdfsConfig {
528529
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
529530
let mut ds = f.debug_struct("StorageWebhdfsConfig");
530531

531532
ds.field("endpoint_url", &self.endpoint_url)
532-
.field("root", &self.root);
533+
.field("root", &self.root)
534+
.field("disable_list_batch", &self.disable_list_batch);
533535

534536
ds.field("delegation", &mask_string(&self.delegation, 3));
535537

src/meta/proto-conv/src/config_from_to_protobuf_impl.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ impl FromToProto for StorageWebhdfsConfig {
261261
endpoint_url: p.endpoint_url,
262262
root: p.root,
263263
delegation: p.delegation,
264+
disable_list_batch: p.disable_list_batch,
264265
})
265266
}
266267

@@ -271,6 +272,7 @@ impl FromToProto for StorageWebhdfsConfig {
271272
endpoint_url: self.endpoint_url.clone(),
272273
root: self.root.clone(),
273274
delegation: self.delegation.clone(),
275+
disable_list_batch: self.disable_list_batch,
274276

275277
username: String::new(), // reserved for future use
276278
password: String::new(), // reserved for future use

src/meta/proto-conv/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
146146
(114, "2024-12-12: Add: New DataType Interval."),
147147
(115, "2024-12-16: Add: udf.proto: add UDAFScript and UDAFServer"),
148148
(116, "2025-01-09: Add: MarkedDeletedIndexMeta"),
149+
(117, "2025-01-21: Add: config.proto: add disable_list_batch in WebhdfsConfig")
149150
// Dear developer:
150151
// If you're gonna add a new metadata version, you'll have to add a test for it.
151152
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)

src/meta/proto-conv/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,4 @@ mod v113_warehouse_grantobject;
114114
mod v114_interval_datatype;
115115
mod v115_add_udaf_script;
116116
mod v116_marked_deleted_index_meta;
117+
mod v117_webhdfs_add_disable_list_batch;

src/meta/proto-conv/tests/it/user_proto_conv.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ pub(crate) fn test_webhdfs_stage_info() -> mt::principal::StageInfo {
321321
endpoint_url: "https://webhdfs.example.com".to_string(),
322322
root: "/path/to/stage/files".to_string(),
323323
delegation: "<delegation_token>".to_string(),
324+
disable_list_batch: false,
324325
}),
325326
},
326327
is_temporary: false,

src/meta/proto-conv/tests/it/user_stage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ fn test_user_stage_webhdfs_v30() -> anyhow::Result<()> {
9494
endpoint_url: "https://webhdfs.example.com".to_string(),
9595
root: "/path/to/stage/files".to_string(),
9696
delegation: "<delegation_token>".to_string(),
97+
disable_list_batch: false,
9798
}),
9899
},
99100
file_format_params: mt::principal::FileFormatParams::Json(

src/meta/proto-conv/tests/it/v030_user_stage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ fn test_decode_v30_user_stage() -> anyhow::Result<()> {
5353
endpoint_url: "https://webhdfs.example.com".to_string(),
5454
root: "/path/to/stage/files".to_string(),
5555
delegation: "<delegation_token>".to_string(),
56+
disable_list_batch: false,
5657
}),
5758
},
5859
file_format_params: mt::principal::FileFormatParams::Json(

src/meta/proto-conv/tests/it/v031_copy_max_file.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ fn test_decode_v31_copy_max_file() -> anyhow::Result<()> {
5151
endpoint_url: "https://webhdfs.example.com".to_string(),
5252
root: "/path/to/stage/files".to_string(),
5353
delegation: "<delegation_token>".to_string(),
54+
disable_list_batch: false,
5455
}),
5556
},
5657
file_format_params: mt::principal::FileFormatParams::Json(
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use chrono::DateTime;
16+
use chrono::Utc;
17+
use databend_common_meta_app as mt;
18+
use databend_common_meta_app::principal::UserIdentity;
19+
use databend_common_meta_app::storage::StorageParams;
20+
use databend_common_meta_app::storage::StorageWebhdfsConfig;
21+
use fastrace::func_name;
22+
23+
use crate::common;
24+
25+
// These bytes are built when a new version in introduced,
26+
// and are kept for backward compatibility test.
27+
//
28+
// *************************************************************
29+
// * These messages should never be updated, *
30+
// * only be added when a new version is added, *
31+
// * or be removed when an old version is no longer supported. *
32+
// *************************************************************
33+
//
34+
#[test]
35+
fn test_v117_webhdfs_add_disable_list_batch() -> anyhow::Result<()> {
36+
// Encoded data of version 117 of databend_common_meta_app::principal::user_stage::StageInfo:
37+
// It is generated with common::test_pb_from_to().
38+
let stage_info_v117 = vec![
39+
10, 22, 119, 101, 98, 104, 100, 102, 115, 58, 47, 47, 100, 105, 114, 47, 116, 111, 47, 102,
40+
105, 108, 101, 115, 16, 1, 26, 12, 10, 10, 42, 8, 48, 1, 160, 6, 117, 168, 6, 24, 42, 11,
41+
10, 2, 48, 2, 16, 142, 8, 24, 1, 56, 1, 50, 4, 116, 101, 115, 116, 56, 100, 66, 29, 10, 8,
42+
100, 97, 116, 97, 98, 101, 110, 100, 18, 11, 100, 97, 116, 97, 98, 101, 110, 100, 46, 114,
43+
115, 160, 6, 117, 168, 6, 24, 74, 10, 34, 8, 8, 2, 160, 6, 117, 168, 6, 24, 82, 23, 49, 57,
44+
55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 48, 48, 32, 85, 84, 67, 160, 6,
45+
117, 168, 6, 24,
46+
];
47+
48+
let want = || mt::principal::StageInfo {
49+
stage_name: "webhdfs://dir/to/files".to_string(),
50+
stage_type: mt::principal::StageType::External,
51+
stage_params: mt::principal::StageParams {
52+
storage: StorageParams::Webhdfs(StorageWebhdfsConfig {
53+
disable_list_batch: true,
54+
..Default::default()
55+
}),
56+
},
57+
is_temporary: false,
58+
file_format_params: mt::principal::FileFormatParams::Json(
59+
mt::principal::JsonFileFormatParams {
60+
compression: mt::principal::StageFileCompression::Bz2,
61+
},
62+
),
63+
copy_options: mt::principal::CopyOptions {
64+
on_error: mt::principal::OnErrorMode::AbortNum(2),
65+
size_limit: 1038,
66+
max_files: 0,
67+
split_size: 0,
68+
purge: true,
69+
single: false,
70+
max_file_size: 0,
71+
disable_variant_check: true,
72+
return_failed_only: false,
73+
detailed_output: false,
74+
},
75+
comment: "test".to_string(),
76+
number_of_files: 100,
77+
creator: Some(UserIdentity {
78+
username: "databend".to_string(),
79+
hostname: "databend.rs".to_string(),
80+
}),
81+
created_on: DateTime::<Utc>::default(),
82+
};
83+
84+
common::test_pb_from_to(func_name!(), want())?;
85+
common::test_load_old(func_name!(), stage_info_v117.as_slice(), 117, want())?;
86+
87+
Ok(())
88+
}

0 commit comments

Comments
 (0)