33
44use std:: sync:: Arc ;
55
6- use crate :: object_store:: {
7- ObjectStore , ObjectStoreParams , ObjectStoreProvider , StorageOptions ,
8- DEFAULT_CLOUD_IO_PARALLELISM ,
9- } ;
10- use hdfs_native_object_store:: HdfsObjectStoreBuilder ;
116use lance_core:: Error ;
127use object_store:: path:: Path ;
8+ use object_store_opendal:: OpendalStore ;
9+ use opendal:: { Operator , services:: HdfsNative } ;
1310use snafu:: location;
1411
12+ use crate :: object_store:: {
13+ DEFAULT_CLOUD_IO_PARALLELISM , ObjectStore , ObjectStoreParams , ObjectStoreProvider ,
14+ StorageOptions ,
15+ } ;
16+
1517#[ derive( Debug , Clone ) ]
1618pub struct HdfsStoreProvider ;
1719
@@ -22,17 +24,27 @@ impl ObjectStoreProvider for HdfsStoreProvider {
2224 base_path : url:: Url ,
2325 params : & ObjectStoreParams ,
2426 ) -> Result < ObjectStore , lance_core:: Error > {
25- let storage_options = StorageOptions ( params. storage_options . clone ( ) . unwrap_or_default ( ) ) ;
27+ let mut storage_options =
28+ StorageOptions ( params. storage_options . clone ( ) . unwrap_or_default ( ) ) ;
2629
2730 let download_retry_count = storage_options. download_retry_count ( ) ;
28- let hdfs = Arc :: new (
29- HdfsObjectStoreBuilder :: new ( )
30- . with_url ( base_path. clone ( ) )
31- . with_config ( storage_options. 0 . iter ( ) )
32- . build ( ) ?,
31+ storage_options. 0 . insert (
32+ "name_node" . to_string ( ) ,
33+ format ! ( "{}://{}" , base_path. scheme( ) , base_path. authority( ) ) ,
3334 ) ;
35+
36+ let operator = Operator :: from_iter :: < HdfsNative > ( storage_options. 0 . into_iter ( ) )
37+ . map_err ( |e| {
38+ Error :: invalid_input (
39+ format ! ( "Failed to create HDFS native operator: {:?}" , e) ,
40+ location ! ( ) ,
41+ )
42+ } ) ?
43+ . finish ( ) ;
44+
45+ let opendal_store = Arc :: new ( OpendalStore :: new ( operator) ) ;
3446 Ok ( ObjectStore :: new (
35- hdfs ,
47+ opendal_store ,
3648 base_path,
3749 params. block_size ,
3850 None ,
@@ -59,7 +71,6 @@ impl ObjectStoreProvider for HdfsStoreProvider {
5971 } )
6072 }
6173}
62-
6374#[ cfg( test) ]
6475mod tests {
6576 use url:: Url ;
0 commit comments