Skip to content

Commit 322390a

Browse files
author
赵海源
committed
address comments
1 parent a0435cd commit 322390a

File tree

9 files changed

+144
-287
lines changed

9 files changed

+144
-287
lines changed

crates/examples/Cargo.toml

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,28 +32,4 @@ serde_json = { workspace = true }
3232
opendal = { workspace = true }
3333
[[example]]
3434
name = "example-table"
35-
path = "src/example_table.rs"
36-
37-
[[example]]
38-
name = "scan-log-table"
39-
path = "src/scan_log_table.rs"
40-
41-
[[example]]
42-
name = "scan-from-earliest"
43-
path = "src/scan_from_earliest.rs"
44-
45-
[[example]]
46-
name = "scan-from-ts"
47-
path = "src/scan_from_ts.rs"
48-
49-
[[example]]
50-
name = "scan-local-only"
51-
path = "src/scan_local_only.rs"
52-
53-
[[example]]
54-
name = "debug-credentials"
55-
path = "src/debug_credentials.rs"
56-
57-
[[example]]
58-
name = "debug-s3-download"
59-
path = "src/debug_s3_download.rs"
35+
path = "src/example_table.rs"

crates/examples/src/scan_from_ts.rs

Lines changed: 0 additions & 152 deletions
This file was deleted.

crates/fluss/src/client/credentials.rs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
use crate::client::metadata::Metadata;
1919
use crate::error::{Error, Result};
20-
use crate::rpc::message::GetSecurityTokenRequest;
2120
use crate::rpc::RpcClient;
21+
use crate::rpc::message::GetSecurityTokenRequest;
2222
use parking_lot::RwLock;
2323
use serde::Deserialize;
2424
use std::collections::HashMap;
@@ -60,7 +60,11 @@ impl CachedToken {
6060
if let Some((opendal_key, transform)) = convert_hadoop_key_to_opendal(key) {
6161
let final_value = if transform {
6262
// Invert boolean value (path_style_access -> enable_virtual_host_style)
63-
if value == "true" { "false".to_string() } else { "true".to_string() }
63+
if value == "true" {
64+
"false".to_string()
65+
} else {
66+
"true".to_string()
67+
}
6468
} else {
6569
value.clone()
6670
};
@@ -76,17 +80,10 @@ impl CachedToken {
7680
/// needs_inversion is true for path_style_access -> enable_virtual_host_style conversion
7781
fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> {
7882
match hadoop_key {
79-
// Standard S3A keys
8083
"fs.s3a.endpoint" => Some(("endpoint".to_string(), false)),
8184
"fs.s3a.endpoint.region" => Some(("region".to_string(), false)),
82-
// path.style.access = false means virtual_host_style = true (inverted)
8385
"fs.s3a.path.style.access" => Some(("enable_virtual_host_style".to_string(), true)),
8486
"fs.s3a.connection.ssl.enabled" => None,
85-
// Red-S3 keys (Fluss custom format)
86-
"fs.red-s3.endpoint" => Some(("endpoint".to_string(), false)),
87-
"fs.red-s3.region" => Some(("region".to_string(), false)),
88-
"fs.red-s3.path-style-access" => Some(("enable_virtual_host_style".to_string(), true)),
89-
"fs.red-s3.connection.ssl.enabled" => None,
9087
_ => None,
9188
}
9289
}
@@ -162,10 +159,3 @@ impl Default for CredentialsCache {
162159
Self::new()
163160
}
164161
}
165-
166-
167-
168-
169-
170-
171-

crates/fluss/src/client/table/remote_log.rs

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -175,16 +175,6 @@ impl RemoteLogDownloader {
175175
local_path: &Path,
176176
s3_props: &HashMap<String, String>,
177177
) -> Result<PathBuf> {
178-
eprintln!("[DEBUG] download_file called: remote_path={}", remote_path);
179-
eprintln!("[DEBUG] s3_props count: {}", s3_props.len());
180-
for (k, v) in s3_props {
181-
if k.contains("key") || k.contains("secret") {
182-
eprintln!("[DEBUG] {} = {}...", k, &v[..std::cmp::min(8, v.len())]);
183-
} else {
184-
eprintln!("[DEBUG] {} = {}", k, v);
185-
}
186-
}
187-
188178
// Handle both URL (e.g., "s3://bucket/path") and local file paths
189179
// If the path doesn't contain "://", treat it as a local file path
190180
let remote_log_tablet_dir_url = if remote_log_tablet_dir.contains("://") {
@@ -212,17 +202,20 @@ impl RemoteLogDownloader {
212202
// Timeout for remote storage operations (30 seconds)
213203
const REMOTE_OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
214204

215-
eprintln!("[DEBUG] Calling stat on: {}", relative_path);
216205
// Get file metadata to know the size with timeout
217206
let stat_future = op.stat(relative_path);
218207
let meta = tokio::time::timeout(REMOTE_OP_TIMEOUT, stat_future)
219208
.await
220-
.map_err(|_| Error::Io(io::Error::new(
221-
io::ErrorKind::TimedOut,
222-
format!("Timeout getting file metadata from remote storage: {}", remote_path)
223-
)))??;
209+
.map_err(|_| {
210+
Error::Io(io::Error::new(
211+
io::ErrorKind::TimedOut,
212+
format!(
213+
"Timeout getting file metadata from remote storage: {}",
214+
remote_path
215+
),
216+
))
217+
})??;
224218
let file_size = meta.content_length();
225-
eprintln!("[DEBUG] stat succeeded, file_size={}", file_size);
226219

227220
// Create local file for writing
228221
let mut local_file = tokio::fs::File::create(local_path).await?;
@@ -232,26 +225,33 @@ impl RemoteLogDownloader {
232225
const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient streaming
233226
let mut offset = 0u64;
234227
let mut chunk_count = 0u64;
235-
let total_chunks = (file_size + CHUNK_SIZE - 1) / CHUNK_SIZE;
236-
eprintln!("[DEBUG] Starting download: {} bytes in {} chunks", file_size, total_chunks);
228+
let total_chunks = file_size.div_ceil(CHUNK_SIZE);
237229

238230
while offset < file_size {
239231
let end = std::cmp::min(offset + CHUNK_SIZE, file_size);
240232
let range = offset..end;
241233
chunk_count += 1;
242234

243235
if chunk_count <= 3 || chunk_count % 10 == 0 {
244-
eprintln!("[DEBUG] Reading chunk {}/{} (offset {})", chunk_count, total_chunks, offset);
236+
eprintln!(
237+
"Remote log download: reading chunk {}/{} (offset {})",
238+
chunk_count, total_chunks, offset
239+
);
245240
}
246241

247242
// Read chunk from remote storage with timeout
248243
let read_future = op.read_with(relative_path).range(range.clone());
249244
let chunk = tokio::time::timeout(REMOTE_OP_TIMEOUT, read_future)
250245
.await
251-
.map_err(|_| Error::Io(io::Error::new(
252-
io::ErrorKind::TimedOut,
253-
format!("Timeout reading chunk from remote storage: {} at offset {}", remote_path, offset)
254-
)))??;
246+
.map_err(|_| {
247+
Error::Io(io::Error::new(
248+
io::ErrorKind::TimedOut,
249+
format!(
250+
"Timeout reading chunk from remote storage: {} at offset {}",
251+
remote_path, offset
252+
),
253+
))
254+
})??;
255255
let bytes = chunk.to_bytes();
256256

257257
// Write chunk to local file

crates/fluss/src/client/table/scanner.rs

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -235,21 +235,17 @@ impl LogFetcher {
235235

236236
async fn send_fetches_and_collect(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
237237
let fetch_request = self.prepare_fetch_log_requests().await;
238-
eprintln!("[DEBUG] prepare_fetch_log_requests returned {} requests", fetch_request.len());
239238
let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
240239
for (leader, fetch_request) in fetch_request {
241-
eprintln!("[DEBUG] Sending fetch request to leader {}", leader);
242240
let cluster = self.metadata.get_cluster();
243241
let server_node = cluster
244242
.get_tablet_server(leader)
245243
.expect("todo: handle leader not exist.");
246244
let con = self.conns.get_connection(server_node).await?;
247245

248-
eprintln!("[DEBUG] Sending RPC request...");
249246
let fetch_response = con
250247
.request(crate::rpc::message::FetchLogRequest::new(fetch_request))
251248
.await?;
252-
eprintln!("[DEBUG] Got RPC response with {} tables", fetch_response.tables_resp.len());
253249

254250
for pb_fetch_log_resp in fetch_response.tables_resp {
255251
let table_id = pb_fetch_log_resp.table_id;
@@ -263,12 +259,10 @@ impl LogFetcher {
263259
if let Some(ref remote_log_fetch_info) =
264260
fetch_log_for_bucket.remote_log_fetch_info
265261
{
266-
eprintln!("[DEBUG] Bucket {} requires remote log fetch with {} segments",
267-
bucket, remote_log_fetch_info.remote_log_segments.len());
268-
eprintln!("[DEBUG] Getting S3 credentials...");
269-
let s3_props =
270-
self.credentials_cache.get_or_refresh(&self.conns, &self.metadata).await?;
271-
eprintln!("[DEBUG] Got {} S3 props", s3_props.len());
262+
let s3_props = self
263+
.credentials_cache
264+
.get_or_refresh(&self.conns, &self.metadata)
265+
.await?;
272266
self.remote_log_downloader.set_s3_props(s3_props);
273267
let remote_fetch_info = RemoteLogFetchInfo::from_proto(
274268
remote_log_fetch_info,
@@ -291,8 +285,6 @@ impl LogFetcher {
291285
current_fetch_offset = segment.start_offset;
292286
}
293287

294-
eprintln!("[DEBUG] Downloading segment {} from {}",
295-
segment.segment_id, remote_fetch_info.remote_log_tablet_dir);
296288
let download_future =
297289
self.remote_log_downloader.request_remote_log(
298290
&remote_fetch_info.remote_log_tablet_dir,
@@ -306,11 +298,8 @@ impl LogFetcher {
306298
high_watermark,
307299
self.read_context.clone(),
308300
);
309-
eprintln!("[DEBUG] Waiting for download to complete...");
310301
let remote_records =
311302
pending_fetch.convert_to_completed_fetch().await?;
312-
eprintln!("[DEBUG] Download completed, got {} records",
313-
remote_records.values().map(|v| v.len()).sum::<usize>());
314303
// Update offset and merge results
315304
for (tb, records) in remote_records {
316305
if let Some(last_record) = records.last() {

0 commit comments

Comments
 (0)