1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15+ use std:: env;
1516use std:: sync:: Arc ;
1617use std:: time:: Duration ;
1718
@@ -22,13 +23,16 @@ use aws_sdk_s3::config::Credentials;
2223use aws_sdk_s3:: config:: Region ;
2324use aws_sdk_s3:: config:: SharedCredentialsProvider ;
2425use aws_sdk_s3:: Client ;
26+ use aws_smithy_runtime:: client:: http:: hyper_014:: HyperClientBuilder ;
2527use databend_common_base:: base:: tokio:: io:: AsyncReadExt ;
2628use databend_common_base:: base:: GlobalInstance ;
2729use databend_common_catalog:: table:: Table ;
30+ use databend_common_catalog:: table_context:: TableContext ;
2831use databend_common_config:: GlobalConfig ;
2932use databend_common_exception:: ErrorCode ;
3033use databend_common_exception:: Result ;
3134use databend_common_expression:: TableSchema ;
35+ use databend_common_grpc:: DNSService ;
3236use databend_common_meta_app:: schema:: TableInfo ;
3337use databend_common_meta_app:: storage:: StorageParams ;
3438use databend_common_storages_fuse:: io:: MetaReaders ;
@@ -38,6 +42,7 @@ use databend_enterprise_fail_safe::FailSafeHandlerWrapper;
3842use databend_storages_common_cache:: LoadParams ;
3943use databend_storages_common_table_meta:: meta:: CompactSegmentInfo ;
4044use databend_storages_common_table_meta:: meta:: Location ;
45+ use hyper_tls:: HttpsConnector ;
4146use log:: info;
4247use log:: warn;
4348use opendal:: ErrorKind ;
@@ -49,7 +54,11 @@ impl RealFailSafeHandler {}
4954
5055#[ async_trait:: async_trait]
5156impl FailSafeHandler for RealFailSafeHandler {
52- async fn recover_table_data ( & self , table_info : TableInfo ) -> Result < ( ) > {
57+ async fn recover_table_data (
58+ & self ,
59+ ctx : Arc < dyn TableContext > ,
60+ table_info : TableInfo ,
61+ ) -> Result < ( ) > {
5362 let storage_params = match & table_info. meta . storage_params {
5463 // External or attached table.
5564 Some ( sp) => sp. clone ( ) ,
@@ -62,7 +71,7 @@ impl FailSafeHandler for RealFailSafeHandler {
6271
6372 let fuse_table = FuseTable :: do_create ( table_info) ?;
6473
65- let amender = Amender :: try_new ( storage_params) . await ?;
74+ let amender = Amender :: try_new ( ctx , storage_params) . await ?;
6675
6776 amender. recover_snapshot ( fuse_table) . await ?;
6877
@@ -86,7 +95,7 @@ struct Amender {
8695}
8796
8897impl Amender {
89- async fn try_new ( storage_param : StorageParams ) -> Result < Self > {
98+ async fn try_new ( ctx : Arc < dyn TableContext > , storage_param : StorageParams ) -> Result < Self > {
9099 // TODO
91100 // - replace client with opendal operator
92101 // - supports other storage types
@@ -116,18 +125,53 @@ impl Amender {
116125 . connect_timeout ( Duration :: from_secs ( 3 ) )
117126 . build ( ) ;
118127
128+ let tls_connector = {
129+ let mut builder = hyper_tls:: native_tls:: TlsConnector :: builder ( ) ;
130+ let allow_invalid_cert = ctx
131+ . get_settings ( )
132+ . get_premise_deploy_danger_amend_accept_invalid_cert ( ) ?;
133+ if allow_invalid_cert {
134+ info ! ( "allows invalid cert, and accepts invalid hostnames in cert validation" ) ;
135+ builder. danger_accept_invalid_certs ( true ) ;
136+ builder. danger_accept_invalid_hostnames ( true ) ;
137+ } ;
138+ builder
139+ . build ( )
140+ . unwrap_or_else ( |e| panic ! ( "error while creating TLS connector: {}" , e) )
141+ } ;
142+
143+ let mut http = hyper_v014:: client:: HttpConnector :: new_with_resolver ( DNSService { } ) ;
144+ // also allows http connection
145+ http. enforce_http ( false ) ;
146+
147+ let connect_timeout = env:: var ( "_DATABEND_INTERNAL_CONNECT_TIMEOUT" )
148+ . ok ( )
149+ . and_then ( |v| v. parse :: < u64 > ( ) . ok ( ) )
150+ . unwrap_or ( 30 ) ;
151+ http. set_connect_timeout ( Some ( Duration :: from_secs ( connect_timeout) ) ) ;
152+
153+ let conn = HttpsConnector :: from ( ( http, tls_connector. into ( ) ) ) ;
154+
119155 let config = aws_config:: from_env ( )
120156 . region ( region_provider)
121157 . endpoint_url ( s3_config. endpoint_url )
122158 . credentials_provider ( SharedCredentialsProvider :: new ( base_credentials) )
123159 . retry_config ( retry_config)
124160 . timeout_config ( timeout_config)
161+ . http_client ( HyperClientBuilder :: new ( ) . build ( conn) )
125162 . load ( )
126163 . await ;
127164
165+ let force_path_style = ctx
166+ . get_settings ( )
167+ . get_premise_deploy_amend_force_path_style ( ) ?;
168+ let sdk_config = aws_sdk_s3:: config:: Builder :: from ( & config)
169+ . force_path_style ( force_path_style)
170+ . build ( ) ;
171+
128172 let root = s3_config. root ;
129173 let bucket = s3_config. bucket ;
130- let client = Client :: new ( & config ) ;
174+ let client = Client :: from_conf ( sdk_config ) ;
131175
132176 Ok ( Self {
133177 client,
@@ -286,7 +330,7 @@ impl Amender {
286330 . send ( )
287331 . await
288332 . map_err ( |e| {
289- ErrorCode :: StorageOther ( format ! ( "failed to list object versions. {}" , e) )
333+ ErrorCode :: StorageOther ( format ! ( "failed to list object versions. {:? }" , e) )
290334 } ) ?;
291335
292336 // find the latest version
0 commit comments