Skip to content

Commit 8a9a0b2

Browse files
[feat] Create Python bindings for table writing and reading (#9)
--------- Co-authored-by: luoyuxia <[email protected]>
1 parent 1e1c3c3 commit 8a9a0b2

File tree

21 files changed

+909
-168
lines changed

21 files changed

+909
-168
lines changed

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ rust-version = "1.85"
2828

2929
[workspace]
3030
resolver = "2"
31-
members = ["crates/fluss", "crates/examples"]
31+
members = ["crates/fluss", "crates/examples", "bindings/python"]
3232

3333
[workspace.dependencies]
3434
fluss = { version = "0.1.0", path = "./crates/fluss" }
3535
tokio = { version = "1.44.2", features = ["full"] }
36-
clap = { version = "4.5.37", features = ["derive"] }
36+
clap = { version = "4.5.37", features = ["derive"] }
37+
arrow = "55.1.0"
38+
chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }

bindings/python/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ rust-version = "1.85"
2626
name = "fluss"
2727
crate-type = ["cdylib"]
2828

29-
[workspace]
30-
3129
[dependencies]
3230
pyo3 = { version = "0.24", features = ["extension-module"] }
3331
fluss = { path = "../../crates/fluss" }
@@ -36,3 +34,4 @@ arrow = { workspace = true }
3634
arrow-pyarrow = "55.1.0"
3735
pyo3-async-runtimes = { version = "0.24.0", features = ["tokio-runtime"] }
3836
chrono = { workspace = true }
37+
once_cell = "1.21.3"

bindings/python/fluss/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
from .fluss_python import *
18+
from ._fluss import *
1919

2020
__version__ = "0.1.0"

bindings/python/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ docs = [
5757
]
5858

5959
[tool.maturin]
60-
python-source = "python"
60+
python-source = "."
6161
module-name = "fluss._fluss"
6262
features = ["pyo3/extension-module"]
6363

bindings/python/src/admin.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use pyo3::prelude::*;
19-
use pyo3_async_runtimes::tokio::future_into_py;
2018
use crate::*;
19+
use pyo3_async_runtimes::tokio::future_into_py;
2120
use std::sync::Arc;
2221

2322
/// Administrative client for managing Fluss tables
@@ -38,16 +37,17 @@ impl FlussAdmin {
3837
ignore_if_exists: Option<bool>,
3938
) -> PyResult<Bound<'py, PyAny>> {
4039
let ignore = ignore_if_exists.unwrap_or(false);
41-
40+
4241
let core_table_path = table_path.to_core().clone();
4342
let core_descriptor = table_descriptor.to_core().clone();
4443
let admin = self.__admin.clone();
4544

4645
future_into_py(py, async move {
47-
admin.create_table(&core_table_path, &core_descriptor, ignore)
46+
admin
47+
.create_table(&core_table_path, &core_descriptor, ignore)
4848
.await
4949
.map_err(|e| FlussError::new_err(e.to_string()))?;
50-
50+
5151
Python::with_gil(|py| Ok(py.None()))
5252
})
5353
}
@@ -60,10 +60,12 @@ impl FlussAdmin {
6060
) -> PyResult<Bound<'py, PyAny>> {
6161
let core_table_path = table_path.to_core().clone();
6262
let admin = self.__admin.clone();
63-
63+
6464
future_into_py(py, async move {
65-
let core_table_info = admin.get_table(&core_table_path).await
66-
.map_err(|e| FlussError::new_err(format!("Failed to get table: {}", e)))?;
65+
let core_table_info = admin
66+
.get_table(&core_table_path)
67+
.await
68+
.map_err(|e| FlussError::new_err(format!("Failed to get table: {e}")))?;
6769

6870
Python::with_gil(|py| {
6971
let table_info = TableInfo::from_core(core_table_info);
@@ -80,10 +82,12 @@ impl FlussAdmin {
8082
) -> PyResult<Bound<'py, PyAny>> {
8183
let core_table_path = table_path.to_core().clone();
8284
let admin = self.__admin.clone();
83-
85+
8486
future_into_py(py, async move {
85-
let core_lake_snapshot = admin.get_latest_lake_snapshot(&core_table_path).await
86-
.map_err(|e| FlussError::new_err(format!("Failed to get lake snapshot: {}", e)))?;
87+
let core_lake_snapshot = admin
88+
.get_latest_lake_snapshot(&core_table_path)
89+
.await
90+
.map_err(|e| FlussError::new_err(format!("Failed to get lake snapshot: {e}")))?;
8791

8892
Python::with_gil(|py| {
8993
let lake_snapshot = LakeSnapshot::from_core(core_lake_snapshot);

bindings/python/src/config.rs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use pyo3::prelude::*;
19-
use pyo3::types::PyDict;
2018
use crate::*;
19+
use pyo3::types::PyDict;
2120

2221
/// Configuration for Fluss client
2322
#[pyclass]
@@ -33,7 +32,7 @@ impl Config {
3332
#[pyo3(signature = (properties = None))]
3433
fn new(properties: Option<&Bound<'_, PyDict>>) -> PyResult<Self> {
3534
let mut config = fcore::config::Config::default();
36-
35+
3736
if let Some(props) = properties {
3837
for item in props.iter() {
3938
let key: String = item.0.extract()?;
@@ -42,67 +41,65 @@ impl Config {
4241
match key.as_str() {
4342
"bootstrap.servers" => {
4443
config.bootstrap_server = Some(value);
45-
},
44+
}
4645
"request.max.size" => {
4746
if let Ok(size) = value.parse::<i32>() {
4847
config.request_max_size = size;
4948
}
50-
},
49+
}
5150
"writer.acks" => {
5251
config.writer_acks = value;
53-
},
52+
}
5453
"writer.retries" => {
5554
if let Ok(retries) = value.parse::<i32>() {
5655
config.writer_retries = retries;
5756
}
58-
},
57+
}
5958
"writer.batch.size" => {
6059
if let Ok(size) = value.parse::<i32>() {
6160
config.writer_batch_size = size;
6261
}
63-
},
62+
}
6463
_ => {
65-
return Err(FlussError::new_err(format!("Unknown property: {}", key)));
64+
return Err(FlussError::new_err(format!("Unknown property: {key}")));
6665
}
6766
}
6867
}
6968
}
7069

71-
Ok(Self {
72-
inner: config,
73-
})
70+
Ok(Self { inner: config })
7471
}
75-
72+
7673
/// Get the bootstrap server
7774
#[getter]
7875
fn bootstrap_server(&self) -> Option<String> {
7976
self.inner.bootstrap_server.clone()
8077
}
81-
78+
8279
/// Set the bootstrap server
8380
#[setter]
8481
fn set_bootstrap_server(&mut self, server: String) {
8582
self.inner.bootstrap_server = Some(server);
8683
}
87-
84+
8885
/// Get the request max size
8986
#[getter]
9087
fn request_max_size(&self) -> i32 {
9188
self.inner.request_max_size
9289
}
93-
90+
9491
/// Set the request max size
9592
#[setter]
9693
fn set_request_max_size(&mut self, size: i32) {
9794
self.inner.request_max_size = size;
9895
}
99-
96+
10097
/// Get the writer batch size
10198
#[getter]
10299
fn writer_batch_size(&self) -> i32 {
103100
self.inner.writer_batch_size
104101
}
105-
102+
106103
/// Set the writer batch size
107104
#[setter]
108105
fn set_writer_batch_size(&mut self, size: i32) {

bindings/python/src/connection.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use pyo3::prelude::*;
1918
use crate::*;
20-
use std::sync::Arc;
2119
use pyo3_async_runtimes::tokio::future_into_py;
20+
use std::sync::Arc;
2221

2322
/// Connection to a Fluss cluster
2423
#[pyclass]
@@ -37,55 +36,55 @@ impl FlussConnection {
3736
let connection = fcore::client::FlussConnection::new(rust_config)
3837
.await
3938
.map_err(|e| FlussError::new_err(e.to_string()))?;
40-
39+
4140
let py_connection = FlussConnection {
4241
inner: Arc::new(connection),
4342
};
4443

45-
Python::with_gil(|py| {
46-
Py::new(py, py_connection)
47-
})
44+
Python::with_gil(|py| Py::new(py, py_connection))
4845
})
4946
}
50-
47+
5148
/// Get admin interface
5249
fn get_admin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
5350
let client = self.inner.clone();
5451

5552
future_into_py(py, async move {
56-
let admin = client.get_admin()
53+
let admin = client
54+
.get_admin()
5755
.await
5856
.map_err(|e| FlussError::new_err(e.to_string()))?;
5957

6058
let py_admin = FlussAdmin::from_core(admin);
6159

62-
Python::with_gil(|py| {
63-
Py::new(py, py_admin)
64-
})
60+
Python::with_gil(|py| Py::new(py, py_admin))
6561
})
6662
}
6763

6864
/// Get a table
69-
fn get_table<'py>(&self, py: Python<'py>, table_path: &TablePath) -> PyResult<Bound<'py, PyAny>> {
65+
fn get_table<'py>(
66+
&self,
67+
py: Python<'py>,
68+
table_path: &TablePath,
69+
) -> PyResult<Bound<'py, PyAny>> {
7070
let client = self.inner.clone();
7171
let core_path = table_path.to_core().clone();
7272

7373
future_into_py(py, async move {
74-
let core_table = client.get_table(&core_path)
74+
let core_table = client
75+
.get_table(&core_path)
7576
.await
7677
.map_err(|e| FlussError::new_err(e.to_string()))?;
77-
78+
7879
let py_table = FlussTable::new_table(
79-
client,
80-
core_table.metadata,
81-
core_table.table_info,
82-
core_table.table_path,
83-
core_table.has_primary_key,
80+
client.clone(),
81+
core_table.metadata().clone(),
82+
core_table.table_info().clone(),
83+
core_table.table_path().clone(),
84+
core_table.has_primary_key(),
8485
);
8586

86-
Python::with_gil(|py| {
87-
Py::new(py, py_table)
88-
})
87+
Python::with_gil(|py| Py::new(py, py_table))
8988
})
9089
}
9190

@@ -98,7 +97,7 @@ impl FlussConnection {
9897
fn __enter__(slf: PyRef<Self>) -> PyRef<Self> {
9998
slf
10099
}
101-
100+
102101
// Exit the runtime context (for 'with' statement)
103102
#[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
104103
fn __exit__(

bindings/python/src/error.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use pyo3::exceptions::PyException;
1819
use pyo3::prelude::*;
1920

2021
/// Fluss errors
@@ -27,6 +28,11 @@ pub struct FlussError {
2728

2829
#[pymethods]
2930
impl FlussError {
31+
#[new]
32+
fn new(message: String) -> Self {
33+
Self { message }
34+
}
35+
3036
fn __str__(&self) -> String {
3137
format!("FlussError: {}", self.message)
3238
}
@@ -36,4 +42,4 @@ impl FlussError {
3642
pub fn new_err(message: impl ToString) -> PyErr {
3743
PyErr::new::<FlussError, _>(message.to_string())
3844
}
39-
}
45+
}

bindings/python/src/lib.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,24 @@
1616
// under the License.
1717

1818
pub use ::fluss as fcore;
19-
use pyo3::prelude::*;
2019
use once_cell::sync::Lazy;
20+
use pyo3::prelude::*;
2121
use tokio::runtime::Runtime;
2222

23+
mod admin;
2324
mod config;
2425
mod connection;
25-
mod table;
26-
mod admin;
27-
mod types;
2826
mod error;
27+
mod metadata;
28+
mod table;
2929
mod utils;
3030

31+
pub use admin::*;
3132
pub use config::*;
3233
pub use connection::*;
33-
pub use table::*;
34-
pub use admin::*;
35-
pub use types::*;
3634
pub use error::*;
35+
pub use metadata::*;
36+
pub use table::*;
3737
pub use utils::*;
3838

3939
static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
@@ -44,7 +44,7 @@ static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
4444
});
4545

4646
#[pymodule]
47-
fn fluss_python(m: &Bound<'_, PyModule>) -> PyResult<()> {
47+
fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
4848
// Register all classes
4949
m.add_class::<Config>()?;
5050
m.add_class::<FlussConnection>()?;
@@ -58,10 +58,9 @@ fn fluss_python(m: &Bound<'_, PyModule>) -> PyResult<()> {
5858
m.add_class::<LogScanner>()?;
5959
m.add_class::<LakeSnapshot>()?;
6060
m.add_class::<TableBucket>()?;
61-
61+
6262
// Register exception types
63-
// TODO: maybe implement a separate module for exceptions
64-
m.add("FlussError", m.py().get_type::<FlussError>())?;
65-
63+
m.add_class::<FlussError>()?;
64+
6665
Ok(())
6766
}

0 commit comments

Comments
 (0)