Skip to content

Commit b699799

Browse files
author
赵海源
committed
address comments
1 parent a0435cd commit b699799

File tree

9 files changed

+296
-312
lines changed

9 files changed

+296
-312
lines changed

crates/examples/Cargo.toml

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,28 +32,4 @@ serde_json = { workspace = true }
3232
opendal = { workspace = true }
3333
[[example]]
3434
name = "example-table"
35-
path = "src/example_table.rs"
36-
37-
[[example]]
38-
name = "scan-log-table"
39-
path = "src/scan_log_table.rs"
40-
41-
[[example]]
42-
name = "scan-from-earliest"
43-
path = "src/scan_from_earliest.rs"
44-
45-
[[example]]
46-
name = "scan-from-ts"
47-
path = "src/scan_from_ts.rs"
48-
49-
[[example]]
50-
name = "scan-local-only"
51-
path = "src/scan_local_only.rs"
52-
53-
[[example]]
54-
name = "debug-credentials"
55-
path = "src/debug_credentials.rs"
56-
57-
[[example]]
58-
name = "debug-s3-download"
59-
path = "src/debug_s3_download.rs"
35+
path = "src/example_table.rs"

crates/examples/src/scan_from_ts.rs

Lines changed: 0 additions & 152 deletions
This file was deleted.

crates/fluss/src/client/credentials.rs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
use crate::client::metadata::Metadata;
1919
use crate::error::{Error, Result};
20-
use crate::rpc::message::GetSecurityTokenRequest;
2120
use crate::rpc::RpcClient;
21+
use crate::rpc::message::GetSecurityTokenRequest;
2222
use parking_lot::RwLock;
2323
use serde::Deserialize;
2424
use std::collections::HashMap;
@@ -60,7 +60,11 @@ impl CachedToken {
6060
if let Some((opendal_key, transform)) = convert_hadoop_key_to_opendal(key) {
6161
let final_value = if transform {
6262
// Invert boolean value (path_style_access -> enable_virtual_host_style)
63-
if value == "true" { "false".to_string() } else { "true".to_string() }
63+
if value == "true" {
64+
"false".to_string()
65+
} else {
66+
"true".to_string()
67+
}
6468
} else {
6569
value.clone()
6670
};
@@ -76,17 +80,10 @@ impl CachedToken {
7680
/// needs_inversion is true for path_style_access -> enable_virtual_host_style conversion
7781
fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> {
7882
match hadoop_key {
79-
// Standard S3A keys
8083
"fs.s3a.endpoint" => Some(("endpoint".to_string(), false)),
8184
"fs.s3a.endpoint.region" => Some(("region".to_string(), false)),
82-
// path.style.access = false means virtual_host_style = true (inverted)
8385
"fs.s3a.path.style.access" => Some(("enable_virtual_host_style".to_string(), true)),
8486
"fs.s3a.connection.ssl.enabled" => None,
85-
// Red-S3 keys (Fluss custom format)
86-
"fs.red-s3.endpoint" => Some(("endpoint".to_string(), false)),
87-
"fs.red-s3.region" => Some(("region".to_string(), false)),
88-
"fs.red-s3.path-style-access" => Some(("enable_virtual_host_style".to_string(), true)),
89-
"fs.red-s3.connection.ssl.enabled" => None,
9087
_ => None,
9188
}
9289
}
@@ -162,10 +159,3 @@ impl Default for CredentialsCache {
162159
Self::new()
163160
}
164161
}
165-
166-
167-
168-
169-
170-
171-

crates/fluss/src/client/table/remote_log.rs

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use parking_lot::RwLock;
2424
use std::collections::HashMap;
2525
use std::io;
2626
use std::path::{Path, PathBuf};
27-
use std::sync::Arc;
2827
use tempfile::TempDir;
2928
use tokio::io::AsyncWriteExt;
3029
use tokio::sync::oneshot;
@@ -117,14 +116,14 @@ impl RemoteLogDownloadFuture {
117116
/// Downloader for remote log segment files
118117
pub struct RemoteLogDownloader {
119118
local_log_dir: TempDir,
120-
s3_props: Arc<RwLock<HashMap<String, String>>>,
119+
s3_props: RwLock<HashMap<String, String>>,
121120
}
122121

123122
impl RemoteLogDownloader {
124123
pub fn new(local_log_dir: TempDir) -> Result<Self> {
125124
Ok(Self {
126125
local_log_dir,
127-
s3_props: Arc::new(RwLock::new(HashMap::new())),
126+
s3_props: RwLock::new(HashMap::new()),
128127
})
129128
}
130129

@@ -175,16 +174,6 @@ impl RemoteLogDownloader {
175174
local_path: &Path,
176175
s3_props: &HashMap<String, String>,
177176
) -> Result<PathBuf> {
178-
eprintln!("[DEBUG] download_file called: remote_path={}", remote_path);
179-
eprintln!("[DEBUG] s3_props count: {}", s3_props.len());
180-
for (k, v) in s3_props {
181-
if k.contains("key") || k.contains("secret") {
182-
eprintln!("[DEBUG] {} = {}...", k, &v[..std::cmp::min(8, v.len())]);
183-
} else {
184-
eprintln!("[DEBUG] {} = {}", k, v);
185-
}
186-
}
187-
188177
// Handle both URL (e.g., "s3://bucket/path") and local file paths
189178
// If the path doesn't contain "://", treat it as a local file path
190179
let remote_log_tablet_dir_url = if remote_log_tablet_dir.contains("://") {
@@ -212,46 +201,56 @@ impl RemoteLogDownloader {
212201
// Timeout for remote storage operations (30 seconds)
213202
const REMOTE_OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
214203

215-
eprintln!("[DEBUG] Calling stat on: {}", relative_path);
216204
// Get file metadata to know the size with timeout
217205
let stat_future = op.stat(relative_path);
218206
let meta = tokio::time::timeout(REMOTE_OP_TIMEOUT, stat_future)
219207
.await
220-
.map_err(|_| Error::Io(io::Error::new(
221-
io::ErrorKind::TimedOut,
222-
format!("Timeout getting file metadata from remote storage: {}", remote_path)
223-
)))??;
208+
.map_err(|_| {
209+
Error::Io(io::Error::new(
210+
io::ErrorKind::TimedOut,
211+
format!(
212+
"Timeout getting file metadata from remote storage: {}",
213+
remote_path
214+
),
215+
))
216+
})??;
224217
let file_size = meta.content_length();
225-
eprintln!("[DEBUG] stat succeeded, file_size={}", file_size);
226218

227219
// Create local file for writing
228220
let mut local_file = tokio::fs::File::create(local_path).await?;
229221

230222
// Stream data from remote to local file in chunks
231223
// opendal::Reader::read accepts a range, so we read in chunks
232-
const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient streaming
224+
const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient reading
233225
let mut offset = 0u64;
234226
let mut chunk_count = 0u64;
235-
let total_chunks = (file_size + CHUNK_SIZE - 1) / CHUNK_SIZE;
236-
eprintln!("[DEBUG] Starting download: {} bytes in {} chunks", file_size, total_chunks);
227+
let total_chunks = file_size.div_ceil(CHUNK_SIZE);
237228

238229
while offset < file_size {
239230
let end = std::cmp::min(offset + CHUNK_SIZE, file_size);
240231
let range = offset..end;
241232
chunk_count += 1;
242233

243234
if chunk_count <= 3 || chunk_count % 10 == 0 {
244-
eprintln!("[DEBUG] Reading chunk {}/{} (offset {})", chunk_count, total_chunks, offset);
235+
eprintln!(
236+
"Remote log download: reading chunk {}/{} (offset {})",
237+
chunk_count, total_chunks, offset
238+
);
245239
}
246240

247241
// Read chunk from remote storage with timeout
248242
let read_future = op.read_with(relative_path).range(range.clone());
249243
let chunk = tokio::time::timeout(REMOTE_OP_TIMEOUT, read_future)
250244
.await
251-
.map_err(|_| Error::Io(io::Error::new(
252-
io::ErrorKind::TimedOut,
253-
format!("Timeout reading chunk from remote storage: {} at offset {}", remote_path, offset)
254-
)))??;
245+
.map_err(|_| {
246+
Error::Io(io::Error::new(
247+
io::ErrorKind::TimedOut,
248+
format!(
249+
"Timeout reading chunk from remote storage: {} at offset {}",
250+
remote_path, offset
251+
),
252+
))
253+
})??;
255254
let bytes = chunk.to_bytes();
256255

257256
// Write chunk to local file

0 commit comments

Comments
 (0)