Skip to content

Commit a0435cd

Browse files
author
赵海源
committed
feat: support read s3
1 parent fe72d85 commit a0435cd

File tree

17 files changed

+725
-270
lines changed

17 files changed

+725
-270
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,8 @@ tokio = { version = "1.44.2", features = ["full"] }
3636
clap = { version = "4.5.37", features = ["derive"] }
3737
arrow = { version = "57.0.0", features = ["ipc_compression"] }
3838
chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
39+
40+
serde = { version = "1.0", features = ["derive"] }
41+
serde_json = "1.0"
42+
opendal = { version = "0.53", features = ["services-s3"] }
43+

crates/examples/Cargo.toml

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,34 @@ version = { workspace = true }
2626
[dependencies]
2727
fluss = { workspace = true }
2828
tokio = { workspace = true }
29-
clap = { workspace = true}
29+
clap = { workspace = true }
30+
serde = { workspace = true }
31+
serde_json = { workspace = true }
32+
opendal = { workspace = true }
3033
[[example]]
3134
name = "example-table"
32-
path = "src/example_table.rs"
35+
path = "src/example_table.rs"
36+
37+
[[example]]
38+
name = "scan-log-table"
39+
path = "src/scan_log_table.rs"
40+
41+
[[example]]
42+
name = "scan-from-earliest"
43+
path = "src/scan_from_earliest.rs"
44+
45+
[[example]]
46+
name = "scan-from-ts"
47+
path = "src/scan_from_ts.rs"
48+
49+
[[example]]
50+
name = "scan-local-only"
51+
path = "src/scan_local_only.rs"
52+
53+
[[example]]
54+
name = "debug-credentials"
55+
path = "src/debug_credentials.rs"
56+
57+
[[example]]
58+
name = "debug-s3-download"
59+
path = "src/debug_s3_download.rs"
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use fluss::client::{FlussConnection, EARLIEST_OFFSET};
19+
use fluss::config::Config;
20+
use fluss::error::Result;
21+
use fluss::metadata::TablePath;
22+
use fluss::row::InternalRow;
23+
use std::time::Duration;
24+
use fluss::rpc::message::OffsetSpec;
25+
use std::time::SystemTime;
26+
use std::time::UNIX_EPOCH;
27+
28+
#[tokio::main]
29+
pub async fn main() -> Result<()> {
30+
println!("Starting Rust scan from earliest example...");
31+
println!("Using EARLIEST_OFFSET = {}", EARLIEST_OFFSET);
32+
33+
let mut config = Config::default();
34+
config.bootstrap_server = Some("10.147.136.86:9123".to_string());
35+
36+
println!("1) Connecting to Fluss...");
37+
let conn = FlussConnection::new(config).await?;
38+
println!(" Connected successfully!");
39+
40+
let table_path = TablePath::new("fluss".to_owned(), "mahong_log_table_cpp_test_1212".to_owned());
41+
println!("2) Getting admin...");
42+
let admin = conn.get_admin().await?;
43+
println!(" Admin obtained successfully!");
44+
// Step 1: 通过 admin API 查询时间戳对应的 offset
45+
46+
47+
48+
println!("3) Getting table: {}", table_path);
49+
let table = conn.get_table(&table_path).await?;
50+
println!(" Table obtained successfully!");
51+
52+
let table_info = table.table_info();
53+
let num_buckets = table_info.num_buckets;
54+
println!(" Table has {} buckets", num_buckets);
55+
let now = SystemTime::now();
56+
let twenty_minutes_ago = now - Duration::from_secs(20 * 60);
57+
58+
let timestamp_ms = twenty_minutes_ago
59+
.duration_since(UNIX_EPOCH)
60+
.expect("Time went backwards")
61+
.as_secs();
62+
let offsets = admin.list_offsets(
63+
&table_path,
64+
&(0..num_buckets).collect::<Vec<i32>>(),
65+
OffsetSpec::Timestamp(timestamp_ms as i64)
66+
).await?;
67+
68+
// println!("4) Creating log scanner...");
69+
// let log_scanner = table.new_scan().create_log_scanner()?;
70+
// println!(" Log scanner created successfully!");
71+
72+
// println!("5) Subscribing to all buckets from timestamp {}...", timestamp_ms);
73+
// for (bucket_id, offset) in offsets {
74+
// log_scanner.subscribe(bucket_id, offset).await?;
75+
// println!(" Subscribed to bucket {} from timestamp {} offset {}", bucket_id, timestamp_ms, offset);
76+
// }
77+
78+
// println!("6) Polling records (timeout: 10 seconds)...");
79+
// let scan_records = log_scanner.poll(Duration::from_secs(10)).await?;
80+
81+
// let record_count = scan_records.count();
82+
// println!("Scanned records: {}", record_count);
83+
// for (i, record) in scan_records.into_iter().enumerate() {
84+
// let row = record.row();
85+
// if i < 10 {
86+
// println!(
87+
// " offset={} id={} number={} value={} ts={}",
88+
// record.offset(),
89+
// row.get_long(0),
90+
// row.get_int(1),
91+
// row.get_string(2),
92+
// record.timestamp()
93+
// );
94+
// }
95+
// }
96+
97+
// if record_count > 10 {
98+
// println!(" ... and {} more records", record_count - 10);
99+
// }
100+
101+
println!("\n7) Creating log scanner with projection (columns 0, 1)...");
102+
let projected_scanner = table.new_scan()
103+
.project(&[0, 2])?
104+
.create_log_scanner()?;
105+
println!(" Projected scanner created successfully!");
106+
107+
println!("8) Subscribing projected scanner to all buckets from EARLIEST_OFFSET...");
108+
for bucket_id in 0..num_buckets {
109+
// projected_scanner.subscribe(bucket_id, EARLIEST_OFFSET).await?;
110+
projected_scanner.subscribe(bucket_id, 278807821).await?;
111+
println!(" Subscribed to bucket {} from earliest", bucket_id);
112+
}
113+
114+
loop {
115+
println!("9) Polling projected records (timeout: 10 seconds)...");
116+
let projected_records = projected_scanner.poll(Duration::from_secs(10)).await?;
117+
118+
let projected_count = projected_records.count();
119+
println!("Projected records: {}", projected_count);
120+
121+
let mut projection_verified = true;
122+
for (i, record) in projected_records.into_iter().enumerate() {
123+
let row = record.row();
124+
let field_count = row.get_field_count();
125+
126+
if field_count != 2 {
127+
eprintln!("ERROR: Record {} has {} fields, expected 2", i, field_count);
128+
projection_verified = false;
129+
continue;
130+
}
131+
132+
if i < 10 {
133+
println!(" Record {}: id={}, name={}", i, row.get_long(0), row.get_string(1));
134+
}
135+
}
136+
137+
if projected_count > 10 {
138+
println!(" ... and {} more records", projected_count - 10);
139+
}
140+
141+
if projection_verified {
142+
println!("\nColumn pruning verification passed!");
143+
} else {
144+
eprintln!("\nColumn pruning verification failed!");
145+
std::process::exit(1);
146+
}
147+
148+
}
149+
150+
151+
152+
}

crates/fluss/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ name = "fluss"
2323
build = "src/build.rs"
2424

2525
[features]
26-
default = ["storage-memory", "storage-fs"]
27-
storage-all = ["storage-memory", "storage-fs"]
26+
default = ["storage-memory", "storage-fs", "storage-s3"]
27+
storage-all = ["storage-memory", "storage-fs", "storage-s3"]
2828

2929
storage-memory = ["opendal/services-memory"]
3030
storage-fs = ["opendal/services-fs"]
31+
storage-s3 = ["opendal/services-s3"]
3132
integration_tests = []
3233

3334
[dependencies]
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::client::metadata::Metadata;
19+
use crate::error::{Error, Result};
20+
use crate::rpc::message::GetSecurityTokenRequest;
21+
use crate::rpc::RpcClient;
22+
use parking_lot::RwLock;
23+
use serde::Deserialize;
24+
use std::collections::HashMap;
25+
use std::sync::Arc;
26+
use std::time::{Duration, Instant};
27+
28+
const CACHE_TTL: Duration = Duration::from_secs(3600);
29+
30+
#[derive(Debug, Deserialize)]
31+
struct Credentials {
32+
access_key_id: String,
33+
access_key_secret: String,
34+
security_token: Option<String>,
35+
}
36+
37+
struct CachedToken {
38+
access_key_id: String,
39+
secret_access_key: String,
40+
security_token: Option<String>,
41+
addition_infos: HashMap<String, String>,
42+
cached_at: Instant,
43+
}
44+
45+
impl CachedToken {
46+
fn to_s3_props(&self) -> HashMap<String, String> {
47+
let mut props = HashMap::new();
48+
49+
props.insert("access_key_id".to_string(), self.access_key_id.clone());
50+
props.insert(
51+
"secret_access_key".to_string(),
52+
self.secret_access_key.clone(),
53+
);
54+
55+
if let Some(token) = &self.security_token {
56+
props.insert("security_token".to_string(), token.clone());
57+
}
58+
59+
for (key, value) in &self.addition_infos {
60+
if let Some((opendal_key, transform)) = convert_hadoop_key_to_opendal(key) {
61+
let final_value = if transform {
62+
// Invert boolean value (path_style_access -> enable_virtual_host_style)
63+
if value == "true" { "false".to_string() } else { "true".to_string() }
64+
} else {
65+
value.clone()
66+
};
67+
props.insert(opendal_key, final_value);
68+
}
69+
}
70+
71+
props
72+
}
73+
}
74+
75+
/// Returns (opendal_key, needs_inversion)
76+
/// needs_inversion is true for path_style_access -> enable_virtual_host_style conversion
77+
fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> {
78+
match hadoop_key {
79+
// Standard S3A keys
80+
"fs.s3a.endpoint" => Some(("endpoint".to_string(), false)),
81+
"fs.s3a.endpoint.region" => Some(("region".to_string(), false)),
82+
// path.style.access = false means virtual_host_style = true (inverted)
83+
"fs.s3a.path.style.access" => Some(("enable_virtual_host_style".to_string(), true)),
84+
"fs.s3a.connection.ssl.enabled" => None,
85+
// Red-S3 keys (Fluss custom format)
86+
"fs.red-s3.endpoint" => Some(("endpoint".to_string(), false)),
87+
"fs.red-s3.region" => Some(("region".to_string(), false)),
88+
"fs.red-s3.path-style-access" => Some(("enable_virtual_host_style".to_string(), true)),
89+
"fs.red-s3.connection.ssl.enabled" => None,
90+
_ => None,
91+
}
92+
}
93+
94+
pub struct CredentialsCache {
95+
inner: RwLock<Option<CachedToken>>,
96+
}
97+
98+
impl CredentialsCache {
99+
pub fn new() -> Self {
100+
Self {
101+
inner: RwLock::new(None),
102+
}
103+
}
104+
105+
pub async fn get_or_refresh(
106+
&self,
107+
rpc_client: &Arc<RpcClient>,
108+
metadata: &Arc<Metadata>,
109+
) -> Result<HashMap<String, String>> {
110+
{
111+
let guard = self.inner.read();
112+
if let Some(cached) = guard.as_ref() {
113+
if cached.cached_at.elapsed() < CACHE_TTL {
114+
return Ok(cached.to_s3_props());
115+
}
116+
}
117+
}
118+
119+
self.refresh_from_server(rpc_client, metadata).await
120+
}
121+
122+
async fn refresh_from_server(
123+
&self,
124+
rpc_client: &Arc<RpcClient>,
125+
metadata: &Arc<Metadata>,
126+
) -> Result<HashMap<String, String>> {
127+
let cluster = metadata.get_cluster();
128+
let server_node = cluster
129+
.get_coordinator_server()
130+
.or_else(|| Some(cluster.get_one_available_server()))
131+
.expect("no available server to fetch security token");
132+
let conn = rpc_client.get_connection(server_node).await?;
133+
134+
let request = GetSecurityTokenRequest::new();
135+
let response = conn.request(request).await?;
136+
137+
let credentials: Credentials = serde_json::from_slice(&response.token)
138+
.map_err(|e| Error::JsonSerdeError(e.to_string()))?;
139+
140+
let mut addition_infos = HashMap::new();
141+
for kv in &response.addition_info {
142+
addition_infos.insert(kv.key.clone(), kv.value.clone());
143+
}
144+
145+
let cached = CachedToken {
146+
access_key_id: credentials.access_key_id,
147+
secret_access_key: credentials.access_key_secret,
148+
security_token: credentials.security_token,
149+
addition_infos,
150+
cached_at: Instant::now(),
151+
};
152+
153+
let props = cached.to_s3_props();
154+
*self.inner.write() = Some(cached);
155+
156+
Ok(props)
157+
}
158+
}
159+
160+
impl Default for CredentialsCache {
161+
fn default() -> Self {
162+
Self::new()
163+
}
164+
}
165+
166+
167+
168+
169+
170+
171+

0 commit comments

Comments
 (0)