Skip to content

Commit fe72d85

Browse files
authored
feat: support subscribe from remote (#76)
1 parent 1503050 commit fe72d85

File tree

20 files changed

+1019
-49
lines changed

20 files changed

+1019
-49
lines changed

bindings/cpp/src/lib.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ fn new_connection(bootstrap_server: &str) -> Result<*mut Connection, String> {
243243
}));
244244
Ok(conn)
245245
}
246-
Err(e) => Err(format!("Failed to connect: {}", e)),
246+
Err(e) => Err(format!("Failed to connect: {e}")),
247247
}
248248
}
249249

@@ -264,7 +264,7 @@ impl Connection {
264264
let admin = Box::into_raw(Box::new(Admin { inner: admin }));
265265
Ok(admin)
266266
}
267-
Err(e) => Err(format!("Failed to get admin: {}", e)),
267+
Err(e) => Err(format!("Failed to get admin: {e}")),
268268
}
269269
}
270270

@@ -287,7 +287,7 @@ impl Connection {
287287
}));
288288
Ok(table)
289289
}
290-
Err(e) => Err(format!("Failed to get table: {}", e)),
290+
Err(e) => Err(format!("Failed to get table: {e}")),
291291
}
292292
}
293293
}
@@ -398,7 +398,7 @@ impl Table {
398398

399399
let table_append = match fluss_table.new_append() {
400400
Ok(a) => a,
401-
Err(e) => return Err(format!("Failed to create append: {}", e)),
401+
Err(e) => return Err(format!("Failed to create append: {e}")),
402402
};
403403

404404
let writer = table_append.create_writer();
@@ -413,7 +413,10 @@ impl Table {
413413
self.table_info.clone(),
414414
);
415415

416-
let scanner = fluss_table.new_scan().create_log_scanner();
416+
let scanner = match fluss_table.new_scan().create_log_scanner() {
417+
Ok(a) => a,
418+
Err(e) => return Err(format!("Failed to create log scanner: {e}")),
419+
};
417420
let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
418421
Ok(scanner)
419422
}
@@ -431,9 +434,12 @@ impl Table {
431434
let scan = fluss_table.new_scan();
432435
let scan = match scan.project(&column_indices) {
433436
Ok(s) => s,
434-
Err(e) => return Err(format!("Failed to project columns: {}", e)),
437+
Err(e) => return Err(format!("Failed to project columns: {e}")),
438+
};
439+
let scanner = match scan.create_log_scanner() {
440+
Ok(a) => a,
441+
Err(e) => return Err(format!("Failed to create log scanner: {e}")),
435442
};
436-
let scanner = scan.create_log_scanner();
437443
let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
438444
Ok(scanner)
439445
}

bindings/cpp/src/types.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
6464
DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()),
6565
DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()),
6666
DATA_TYPE_TIMESTAMP_LTZ => Ok(fcore::metadata::DataTypes::timestamp_ltz()),
67-
_ => Err(anyhow!("Unknown data type: {}", dt)),
67+
_ => Err(anyhow!("Unknown data type: {dt}")),
6868
}
6969
}
7070

@@ -423,10 +423,7 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> {
423423
datum.i32_val = array.value(row_id);
424424
datum
425425
}
426-
_ => panic!(
427-
"Will never come here. Unsupported Time32 unit for column {}",
428-
i
429-
),
426+
_ => panic!("Will never come here. Unsupported Time32 unit for column {i}"),
430427
},
431428
ArrowDataType::Time64(unit) => match unit {
432429
TimeUnit::Microsecond => {
@@ -449,14 +446,10 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> {
449446
datum.i64_val = array.value(row_id);
450447
datum
451448
}
452-
_ => panic!(
453-
"Will never come here. Unsupported Time64 unit for column {}",
454-
i
455-
),
449+
_ => panic!("Will never come here. Unsupported Time64 unit for column {i}"),
456450
},
457451
other => panic!(
458-
"Will never come here. Unsupported Arrow data type for column {}: {:?}",
459-
i, other
452+
"Will never come here. Unsupported Arrow data type for column {i}: {other:?}"
460453
),
461454
};
462455

bindings/python/src/table.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ impl FlussTable {
6767

6868
let table_scan = fluss_table.new_scan();
6969

70-
let rust_scanner = table_scan.create_log_scanner();
70+
let rust_scanner = table_scan.create_log_scanner().map_err(|e| {
71+
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
72+
"Failed to create log scanner: {e:?}"
73+
))
74+
})?;
7175

7276
let admin = conn
7377
.get_admin()

crates/examples/src/example_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub async fn main() -> Result<()> {
7070
try_join!(f1, f2, append_writer.flush())?;
7171

7272
// scan rows
73-
let log_scanner = table.new_scan().create_log_scanner();
73+
let log_scanner = table.new_scan().create_log_scanner()?;
7474
log_scanner.subscribe(0, 0).await?;
7575

7676
loop {

crates/fluss/Cargo.toml

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ version = { workspace = true }
2222
name = "fluss"
2323
build = "src/build.rs"
2424

25+
[features]
26+
default = ["storage-memory", "storage-fs"]
27+
storage-all = ["storage-memory", "storage-fs"]
28+
29+
storage-memory = ["opendal/services-memory"]
30+
storage-fs = ["opendal/services-fs"]
31+
integration_tests = []
32+
2533
[dependencies]
2634
arrow = { workspace = true }
2735
arrow-schema = "57.0.0"
@@ -45,16 +53,17 @@ ordered-float = { version = "4", features = ["serde"] }
4553
parse-display = "0.10"
4654
ref-cast = "1.0"
4755
chrono = { workspace = true }
48-
oneshot = "0.1.11"
56+
opendal = "0.53.3"
57+
url = "2.5.7"
58+
async-trait = "0.1.89"
59+
uuid = { version = "1.10", features = ["v4"] }
60+
tempfile= "3.23.0"
4961

5062
[dev-dependencies]
5163
testcontainers = "0.25.0"
5264
once_cell = "1.19"
5365
test-env-helpers = "0.2.2"
5466

55-
[features]
56-
integration_tests = []
57-
5867

5968
[build-dependencies]
6069
prost-build = { version = "0.13.5" }

crates/fluss/src/client/table/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub const EARLIEST_OFFSET: i64 = -2;
2626

2727
mod append;
2828

29+
mod remote_log;
2930
mod scanner;
3031
mod writer;
3132

0 commit comments

Comments
 (0)