Skip to content

Commit 244f1d0

Browse files
author
赵海源
committed
fix cmt
1 parent a0435cd commit 244f1d0

File tree

8 files changed

+147
-103
lines changed

8 files changed

+147
-103
lines changed

crates/examples/Cargo.toml

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,28 +32,4 @@ serde_json = { workspace = true }
3232
opendal = { workspace = true }
3333
[[example]]
3434
name = "example-table"
35-
path = "src/example_table.rs"
36-
37-
[[example]]
38-
name = "scan-log-table"
39-
path = "src/scan_log_table.rs"
40-
41-
[[example]]
42-
name = "scan-from-earliest"
43-
path = "src/scan_from_earliest.rs"
44-
45-
[[example]]
46-
name = "scan-from-ts"
47-
path = "src/scan_from_ts.rs"
48-
49-
[[example]]
50-
name = "scan-local-only"
51-
path = "src/scan_local_only.rs"
52-
53-
[[example]]
54-
name = "debug-credentials"
55-
path = "src/debug_credentials.rs"
56-
57-
[[example]]
58-
name = "debug-s3-download"
59-
path = "src/debug_s3_download.rs"
35+
path = "src/example_table.rs"

crates/examples/src/scan_from_ts.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use fluss::client::{FlussConnection, EARLIEST_OFFSET};
18+
use fluss::client::{EARLIEST_OFFSET, FlussConnection};
1919
use fluss::config::Config;
2020
use fluss::error::Result;
2121
use fluss::metadata::TablePath;
2222
use fluss::row::InternalRow;
23-
use std::time::Duration;
2423
use fluss::rpc::message::OffsetSpec;
24+
use std::time::Duration;
2525
use std::time::SystemTime;
2626
use std::time::UNIX_EPOCH;
2727

@@ -37,14 +37,15 @@ pub async fn main() -> Result<()> {
3737
let conn = FlussConnection::new(config).await?;
3838
println!(" Connected successfully!");
3939

40-
let table_path = TablePath::new("fluss".to_owned(), "mahong_log_table_cpp_test_1212".to_owned());
40+
let table_path = TablePath::new(
41+
"fluss".to_owned(),
42+
"mahong_log_table_cpp_test_1212".to_owned(),
43+
);
4144
println!("2) Getting admin...");
4245
let admin = conn.get_admin().await?;
4346
println!(" Admin obtained successfully!");
4447
// Step 1: 通过 admin API 查询时间戳对应的 offset
4548

46-
47-
4849
println!("3) Getting table: {}", table_path);
4950
let table = conn.get_table(&table_path).await?;
5051
println!(" Table obtained successfully!");
@@ -54,16 +55,18 @@ pub async fn main() -> Result<()> {
5455
println!(" Table has {} buckets", num_buckets);
5556
let now = SystemTime::now();
5657
let twenty_minutes_ago = now - Duration::from_secs(20 * 60);
57-
58+
5859
let timestamp_ms = twenty_minutes_ago
5960
.duration_since(UNIX_EPOCH)
6061
.expect("Time went backwards")
6162
.as_secs();
62-
let offsets = admin.list_offsets(
63-
&table_path,
64-
&(0..num_buckets).collect::<Vec<i32>>(),
65-
OffsetSpec::Timestamp(timestamp_ms as i64)
66-
).await?;
63+
let offsets = admin
64+
.list_offsets(
65+
&table_path,
66+
&(0..num_buckets).collect::<Vec<i32>>(),
67+
OffsetSpec::Timestamp(timestamp_ms as i64),
68+
)
69+
.await?;
6770

6871
// println!("4) Creating log scanner...");
6972
// let log_scanner = table.new_scan().create_log_scanner()?;
@@ -99,9 +102,7 @@ pub async fn main() -> Result<()> {
99102
// }
100103

101104
println!("\n7) Creating log scanner with projection (columns 0, 1)...");
102-
let projected_scanner = table.new_scan()
103-
.project(&[0, 2])?
104-
.create_log_scanner()?;
105+
let projected_scanner = table.new_scan().project(&[0, 2])?.create_log_scanner()?;
105106
println!(" Projected scanner created successfully!");
106107

107108
println!("8) Subscribing projected scanner to all buckets from EARLIEST_OFFSET...");
@@ -130,7 +131,12 @@ pub async fn main() -> Result<()> {
130131
}
131132

132133
if i < 10 {
133-
println!(" Record {}: id={}, name={}", i, row.get_long(0), row.get_string(1));
134+
println!(
135+
" Record {}: id={}, name={}",
136+
i,
137+
row.get_long(0),
138+
row.get_string(1)
139+
);
134140
}
135141
}
136142

@@ -144,9 +150,5 @@ pub async fn main() -> Result<()> {
144150
eprintln!("\nColumn pruning verification failed!");
145151
std::process::exit(1);
146152
}
147-
148153
}
149-
150-
151-
152154
}

crates/fluss/src/client/credentials.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
use crate::client::metadata::Metadata;
1919
use crate::error::{Error, Result};
20-
use crate::rpc::message::GetSecurityTokenRequest;
2120
use crate::rpc::RpcClient;
21+
use crate::rpc::message::GetSecurityTokenRequest;
2222
use parking_lot::RwLock;
2323
use serde::Deserialize;
2424
use std::collections::HashMap;
@@ -60,7 +60,11 @@ impl CachedToken {
6060
if let Some((opendal_key, transform)) = convert_hadoop_key_to_opendal(key) {
6161
let final_value = if transform {
6262
// Invert boolean value (path_style_access -> enable_virtual_host_style)
63-
if value == "true" { "false".to_string() } else { "true".to_string() }
63+
if value == "true" {
64+
"false".to_string()
65+
} else {
66+
"true".to_string()
67+
}
6468
} else {
6569
value.clone()
6670
};
@@ -162,10 +166,3 @@ impl Default for CredentialsCache {
162166
Self::new()
163167
}
164168
}
165-
166-
167-
168-
169-
170-
171-

crates/fluss/src/client/table/remote_log.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ impl RemoteLogDownloader {
184184
eprintln!("[DEBUG] {} = {}", k, v);
185185
}
186186
}
187-
187+
188188
// Handle both URL (e.g., "s3://bucket/path") and local file paths
189189
// If the path doesn't contain "://", treat it as a local file path
190190
let remote_log_tablet_dir_url = if remote_log_tablet_dir.contains("://") {
@@ -217,10 +217,15 @@ impl RemoteLogDownloader {
217217
let stat_future = op.stat(relative_path);
218218
let meta = tokio::time::timeout(REMOTE_OP_TIMEOUT, stat_future)
219219
.await
220-
.map_err(|_| Error::Io(io::Error::new(
221-
io::ErrorKind::TimedOut,
222-
format!("Timeout getting file metadata from remote storage: {}", remote_path)
223-
)))??;
220+
.map_err(|_| {
221+
Error::Io(io::Error::new(
222+
io::ErrorKind::TimedOut,
223+
format!(
224+
"Timeout getting file metadata from remote storage: {}",
225+
remote_path
226+
),
227+
))
228+
})??;
224229
let file_size = meta.content_length();
225230
eprintln!("[DEBUG] stat succeeded, file_size={}", file_size);
226231

@@ -232,26 +237,37 @@ impl RemoteLogDownloader {
232237
const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient streaming
233238
let mut offset = 0u64;
234239
let mut chunk_count = 0u64;
235-
let total_chunks = (file_size + CHUNK_SIZE - 1) / CHUNK_SIZE;
236-
eprintln!("[DEBUG] Starting download: {} bytes in {} chunks", file_size, total_chunks);
240+
let total_chunks = file_size.div_ceil(CHUNK_SIZE);
241+
eprintln!(
242+
"[DEBUG] Starting download: {} bytes in {} chunks",
243+
file_size, total_chunks
244+
);
237245

238246
while offset < file_size {
239247
let end = std::cmp::min(offset + CHUNK_SIZE, file_size);
240248
let range = offset..end;
241249
chunk_count += 1;
242250

243251
if chunk_count <= 3 || chunk_count % 10 == 0 {
244-
eprintln!("[DEBUG] Reading chunk {}/{} (offset {})", chunk_count, total_chunks, offset);
252+
eprintln!(
253+
"[DEBUG] Reading chunk {}/{} (offset {})",
254+
chunk_count, total_chunks, offset
255+
);
245256
}
246257

247258
// Read chunk from remote storage with timeout
248259
let read_future = op.read_with(relative_path).range(range.clone());
249260
let chunk = tokio::time::timeout(REMOTE_OP_TIMEOUT, read_future)
250261
.await
251-
.map_err(|_| Error::Io(io::Error::new(
252-
io::ErrorKind::TimedOut,
253-
format!("Timeout reading chunk from remote storage: {} at offset {}", remote_path, offset)
254-
)))??;
262+
.map_err(|_| {
263+
Error::Io(io::Error::new(
264+
io::ErrorKind::TimedOut,
265+
format!(
266+
"Timeout reading chunk from remote storage: {} at offset {}",
267+
remote_path, offset
268+
),
269+
))
270+
})??;
255271
let bytes = chunk.to_bytes();
256272

257273
// Write chunk to local file

crates/fluss/src/client/table/scanner.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,10 @@ impl LogFetcher {
235235

236236
async fn send_fetches_and_collect(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
237237
let fetch_request = self.prepare_fetch_log_requests().await;
238-
eprintln!("[DEBUG] prepare_fetch_log_requests returned {} requests", fetch_request.len());
238+
eprintln!(
239+
"[DEBUG] prepare_fetch_log_requests returned {} requests",
240+
fetch_request.len()
241+
);
239242
let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
240243
for (leader, fetch_request) in fetch_request {
241244
eprintln!("[DEBUG] Sending fetch request to leader {}", leader);
@@ -249,7 +252,10 @@ impl LogFetcher {
249252
let fetch_response = con
250253
.request(crate::rpc::message::FetchLogRequest::new(fetch_request))
251254
.await?;
252-
eprintln!("[DEBUG] Got RPC response with {} tables", fetch_response.tables_resp.len());
255+
eprintln!(
256+
"[DEBUG] Got RPC response with {} tables",
257+
fetch_response.tables_resp.len()
258+
);
253259

254260
for pb_fetch_log_resp in fetch_response.tables_resp {
255261
let table_id = pb_fetch_log_resp.table_id;
@@ -263,11 +269,16 @@ impl LogFetcher {
263269
if let Some(ref remote_log_fetch_info) =
264270
fetch_log_for_bucket.remote_log_fetch_info
265271
{
266-
eprintln!("[DEBUG] Bucket {} requires remote log fetch with {} segments",
267-
bucket, remote_log_fetch_info.remote_log_segments.len());
272+
eprintln!(
273+
"[DEBUG] Bucket {} requires remote log fetch with {} segments",
274+
bucket,
275+
remote_log_fetch_info.remote_log_segments.len()
276+
);
268277
eprintln!("[DEBUG] Getting S3 credentials...");
269-
let s3_props =
270-
self.credentials_cache.get_or_refresh(&self.conns, &self.metadata).await?;
278+
let s3_props = self
279+
.credentials_cache
280+
.get_or_refresh(&self.conns, &self.metadata)
281+
.await?;
271282
eprintln!("[DEBUG] Got {} S3 props", s3_props.len());
272283
self.remote_log_downloader.set_s3_props(s3_props);
273284
let remote_fetch_info = RemoteLogFetchInfo::from_proto(
@@ -291,8 +302,10 @@ impl LogFetcher {
291302
current_fetch_offset = segment.start_offset;
292303
}
293304

294-
eprintln!("[DEBUG] Downloading segment {} from {}",
295-
segment.segment_id, remote_fetch_info.remote_log_tablet_dir);
305+
eprintln!(
306+
"[DEBUG] Downloading segment {} from {}",
307+
segment.segment_id, remote_fetch_info.remote_log_tablet_dir
308+
);
296309
let download_future =
297310
self.remote_log_downloader.request_remote_log(
298311
&remote_fetch_info.remote_log_tablet_dir,
@@ -309,8 +322,10 @@ impl LogFetcher {
309322
eprintln!("[DEBUG] Waiting for download to complete...");
310323
let remote_records =
311324
pending_fetch.convert_to_completed_fetch().await?;
312-
eprintln!("[DEBUG] Download completed, got {} records",
313-
remote_records.values().map(|v| v.len()).sum::<usize>());
325+
eprintln!(
326+
"[DEBUG] Download completed, got {} records",
327+
remote_records.values().map(|v| v.len()).sum::<usize>()
328+
);
314329
// Update offset and merge results
315330
for (tb, records) in remote_records {
316331
if let Some(last_record) = records.last() {

crates/fluss/src/io/storage_s3.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,22 @@
1616
// under the License.
1717

1818
use crate::error::Result;
19-
use opendal::layers::TimeoutLayer;
20-
use opendal::services::S3Config;
2119
use opendal::Configurator;
2220
use opendal::Operator;
21+
use opendal::layers::TimeoutLayer;
22+
use opendal::services::S3Config;
2323
use std::collections::HashMap;
2424
use std::time::Duration;
2525

2626
pub(crate) fn s3_config_build(props: &HashMap<String, String>) -> Result<Operator> {
2727
let config = S3Config::from_iter(props.clone())?;
2828
let op = Operator::from_config(config)?.finish();
29-
29+
3030
// Add timeout layer to prevent hanging on S3 operations
3131
let timeout_layer = TimeoutLayer::new()
3232
.with_timeout(Duration::from_secs(10))
3333
.with_io_timeout(Duration::from_secs(30));
34-
34+
3535
Ok(op.layer(timeout_layer))
3636
}
3737

@@ -46,4 +46,3 @@ pub(crate) fn parse_s3_path(path: &str) -> (&str, &str) {
4646
None => (path, ""),
4747
}
4848
}
49-

0 commit comments

Comments
 (0)