Skip to content

Commit 16b922f

Browse files
committed
feat: use opendal to support hdfs native
1 parent 09c83fa commit 16b922f

File tree

4 files changed

+57
-100
lines changed

4 files changed

+57
-100
lines changed

Cargo.lock

Lines changed: 31 additions & 83 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ moka = { version = "0.12", features = ["future", "sync"] }
150150
ndarray = { version = "0.16.1", features = ["matrixmultiply-threading"] }
151151
num-traits = "0.2"
152152
object_store = { version = "0.12.3" }
153-
hdfs-native-object-store = "0.15"
154153
opendal = { version = "0.54" }
155154
object_store_opendal = { version = "0.54" }
156155
pin-project = "1.0"

rust/lance-io/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ rust-version.workspace = true
1616
object_store = { workspace = true }
1717
opendal = { workspace = true, optional = true }
1818
object_store_opendal = { workspace = true, optional = true }
19-
hdfs-native-object-store = { workspace = true, optional = true }
2019
lance-arrow.workspace = true
2120
lance-core.workspace = true
2221
lance-namespace.workspace = true
@@ -64,14 +63,14 @@ name = "scheduler"
6463
harness = false
6564

6665
[features]
67-
default = ["aws", "azure", "gcp"]
66+
default = ["aws", "azure", "gcp", "hdfs"]
6867
gcs-test = []
6968
gcp = ["object_store/gcp", "dep:opendal", "opendal/services-gcs", "dep:object_store_opendal"]
7069
aws = ["object_store/aws", "dep:aws-config", "dep:aws-credential-types", "dep:opendal", "opendal/services-s3", "dep:object_store_opendal"]
7170
azure = ["object_store/azure", "dep:opendal", "opendal/services-azblob", "dep:object_store_opendal"]
7271
oss = ["dep:opendal", "opendal/services-oss", "dep:object_store_opendal"]
7372
huggingface = ["dep:opendal", "opendal/services-huggingface", "dep:object_store_opendal"]
74-
hdfs = ["dep:hdfs-native-object-store"]
73+
hdfs = ["dep:opendal", "opendal/services-hdfs-native", "dep:object_store_opendal" ]
7574
test-util = []
7675

7776
[lints]

rust/lance-io/src/object_store/providers/hdfs.rs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@
33

44
use std::sync::Arc;
55

6-
use crate::object_store::{
7-
ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions,
8-
DEFAULT_CLOUD_IO_PARALLELISM,
9-
};
10-
use hdfs_native_object_store::HdfsObjectStoreBuilder;
116
use lance_core::Error;
127
use object_store::path::Path;
8+
use object_store_opendal::OpendalStore;
9+
use opendal::{Operator, services::HdfsNative};
1310
use snafu::location;
1411

12+
use crate::object_store::{
13+
DEFAULT_CLOUD_IO_PARALLELISM, ObjectStore, ObjectStoreParams, ObjectStoreProvider,
14+
StorageOptions,
15+
};
16+
1517
#[derive(Debug, Clone)]
1618
pub struct HdfsStoreProvider;
1719

@@ -22,17 +24,27 @@ impl ObjectStoreProvider for HdfsStoreProvider {
2224
base_path: url::Url,
2325
params: &ObjectStoreParams,
2426
) -> Result<ObjectStore, lance_core::Error> {
25-
let storage_options = StorageOptions(params.storage_options.clone().unwrap_or_default());
27+
let mut storage_options =
28+
StorageOptions(params.storage_options.clone().unwrap_or_default());
2629

2730
let download_retry_count = storage_options.download_retry_count();
28-
let hdfs = Arc::new(
29-
HdfsObjectStoreBuilder::new()
30-
.with_url(base_path.clone())
31-
.with_config(storage_options.0.iter())
32-
.build()?,
31+
storage_options.0.insert(
32+
"name_node".to_string(),
33+
format!("{}://{}", base_path.scheme(), base_path.authority()),
3334
);
35+
36+
let operator = Operator::from_iter::<HdfsNative>(storage_options.0.into_iter())
37+
.map_err(|e| {
38+
Error::invalid_input(
39+
format!("Failed to create HDFS native operator: {:?}", e),
40+
location!(),
41+
)
42+
})?
43+
.finish();
44+
45+
let opendal_store = Arc::new(OpendalStore::new(operator));
3446
Ok(ObjectStore::new(
35-
hdfs,
47+
opendal_store,
3648
base_path,
3749
params.block_size,
3850
None,
@@ -59,7 +71,6 @@ impl ObjectStoreProvider for HdfsStoreProvider {
5971
})
6072
}
6173
}
62-
6374
#[cfg(test)]
6475
mod tests {
6576
use url::Url;

0 commit comments

Comments
 (0)