Skip to content

Commit a34f1c1

Browse files
zhaohaidao赵海源
andauthored
feat: introduce cpp bindings (#83)
--------- Co-authored-by: 赵海源 <[email protected]>
1 parent 3037192 commit a34f1c1

File tree

15 files changed

+2524
-2
lines changed

15 files changed

+2524
-2
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ rust-version = "1.85"
2828

2929
[workspace]
3030
resolver = "2"
31-
members = ["crates/fluss", "crates/examples", "bindings/python"]
31+
members = ["crates/fluss", "crates/examples", "bindings/python", "bindings/cpp"]
3232

3333
[workspace.dependencies]
3434
fluss = { version = "0.1.0", path = "./crates/fluss" }

bindings/cpp/.clang-format

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
---
19+
BasedOnStyle: Google
20+
ColumnLimit: 100
21+
IndentWidth: 4

bindings/cpp/.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
build/
2+
cmake-build-*/
3+
.idea/
4+
*.o
5+
*.a
6+
*.so
7+
*.dylib

bindings/cpp/CMakeLists.txt

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
cmake_minimum_required(VERSION 3.22)
19+
20+
if (POLICY CMP0135)
21+
cmake_policy(SET CMP0135 NEW)
22+
endif()
23+
24+
project(fluss-cpp LANGUAGES CXX)
25+
26+
include(FetchContent)
27+
set(FLUSS_GOOGLETEST_VERSION 1.15.2 CACHE STRING "version of GoogleTest")
28+
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
29+
30+
find_package(Threads REQUIRED)
31+
32+
if (NOT CMAKE_BUILD_TYPE)
33+
set(CMAKE_BUILD_TYPE Debug)
34+
endif()
35+
36+
set(CMAKE_CXX_STANDARD 17)
37+
set(CMAKE_CXX_STANDARD_REQUIRED ON)
38+
39+
option(FLUSS_ENABLE_ADDRESS_SANITIZER "Enable address sanitizer" OFF)
40+
option(FLUSS_ENABLE_TESTING "Enable building test binary for fluss" OFF)
41+
option(FLUSS_DEV "Enable dev mode" OFF)
42+
43+
if (FLUSS_DEV)
44+
set(FLUSS_ENABLE_ADDRESS_SANITIZER ON)
45+
set(FLUSS_ENABLE_TESTING ON)
46+
endif()
47+
48+
# Get cargo target dir
49+
execute_process(COMMAND cargo locate-project --workspace --message-format plain
50+
OUTPUT_VARIABLE CARGO_TARGET_DIR
51+
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR})
52+
string(REGEX REPLACE "/Cargo.toml\n$" "/target" CARGO_TARGET_DIR "${CARGO_TARGET_DIR}")
53+
54+
set(CARGO_MANIFEST ${PROJECT_SOURCE_DIR}/Cargo.toml)
55+
set(RUST_SOURCE_FILE ${PROJECT_SOURCE_DIR}/src/lib.rs)
56+
set(RUST_BRIDGE_CPP ${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src/lib.rs.cc)
57+
set(RUST_HEADER_FILE ${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src/lib.rs.h)
58+
59+
if (CMAKE_BUILD_TYPE STREQUAL "Debug")
60+
set(RUST_LIB ${CARGO_TARGET_DIR}/debug/${CMAKE_STATIC_LIBRARY_PREFIX}fluss_cpp${CMAKE_STATIC_LIBRARY_SUFFIX})
61+
else()
62+
set(RUST_LIB ${CARGO_TARGET_DIR}/release/${CMAKE_STATIC_LIBRARY_PREFIX}fluss_cpp${CMAKE_STATIC_LIBRARY_SUFFIX})
63+
endif()
64+
65+
set(CPP_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/include
66+
${PROJECT_SOURCE_DIR}/src
67+
${CARGO_TARGET_DIR}/cxxbridge
68+
${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src)
69+
70+
file(GLOB CPP_SOURCE_FILE "src/*.cpp")
71+
file(GLOB CPP_HEADER_FILE "include/*.hpp")
72+
73+
if (NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
74+
list(APPEND CARGO_BUILD_FLAGS "--release")
75+
endif()
76+
77+
add_custom_target(cargo_build
78+
COMMAND cargo build --manifest-path ${CARGO_MANIFEST} ${CARGO_BUILD_FLAGS}
79+
BYPRODUCTS ${RUST_BRIDGE_CPP} ${RUST_LIB} ${RUST_HEADER_FILE}
80+
DEPENDS ${RUST_SOURCE_FILE}
81+
USES_TERMINAL
82+
COMMENT "Running cargo..."
83+
)
84+
85+
add_library(fluss_cpp STATIC ${CPP_SOURCE_FILE} ${RUST_BRIDGE_CPP})
86+
target_sources(fluss_cpp PUBLIC ${CPP_HEADER_FILE})
87+
target_sources(fluss_cpp PRIVATE ${RUST_HEADER_FILE})
88+
target_include_directories(fluss_cpp PUBLIC ${CPP_INCLUDE_DIR})
89+
target_link_libraries(fluss_cpp PUBLIC ${RUST_LIB})
90+
target_link_libraries(fluss_cpp PRIVATE ${CMAKE_DL_LIBS} Threads::Threads)
91+
if(APPLE)
92+
target_link_libraries(fluss_cpp PUBLIC "-framework CoreFoundation" "-framework Security")
93+
endif()
94+
95+
add_executable(fluss_cpp_example examples/example.cpp)
96+
target_link_libraries(fluss_cpp_example fluss_cpp)
97+
target_include_directories(fluss_cpp_example PUBLIC ${CPP_INCLUDE_DIR})
98+
99+
set_target_properties(fluss_cpp
100+
PROPERTIES ADDITIONAL_CLEAN_FILES ${CARGO_TARGET_DIR}
101+
)
102+
add_dependencies(fluss_cpp cargo_build)
103+
104+
if (FLUSS_ENABLE_ADDRESS_SANITIZER)
105+
target_compile_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined -fno-omit-frame-pointer -fno-common -O1)
106+
target_link_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined)
107+
endif()

bindings/cpp/Cargo.toml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "fluss-cpp"
20+
version = "0.1.0"
21+
edition.workspace = true
22+
rust-version.workspace = true
23+
publish = false
24+
25+
[lib]
26+
crate-type = ["staticlib"]
27+
28+
[dependencies]
29+
anyhow = "1.0"
30+
arrow = { workspace = true }
31+
cxx = "1.0"
32+
fluss = { path = "../../crates/fluss" }
33+
tokio = { version = "1.27", features = ["rt-multi-thread", "macros"] }
34+
35+
[build-dependencies]
36+
cxx-build = "1.0"

bindings/cpp/build.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
fn main() {
19+
cxx_build::bridge("src/lib.rs")
20+
.std("c++17")
21+
.compile("fluss-cpp-bridge");
22+
23+
println!("cargo:rerun-if-changed=src/lib.rs");
24+
}

bindings/cpp/examples/example.cpp

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "fluss.hpp"
19+
20+
#include <iostream>
21+
#include <vector>
22+
23+
static void check(const char* step, const fluss::Result& r) {
24+
if (!r.Ok()) {
25+
std::cerr << step << " failed: code=" << r.error_code
26+
<< " msg=" << r.error_message << std::endl;
27+
std::exit(1);
28+
}
29+
}
30+
31+
int main() {
32+
// 1) Connect
33+
fluss::Connection conn;
34+
check("connect", fluss::Connection::Connect("127.0.0.1:9123", conn));
35+
36+
// 2) Admin
37+
fluss::Admin admin;
38+
check("get_admin", conn.GetAdmin(admin));
39+
40+
// 3) Schema & descriptor
41+
auto schema = fluss::Schema::NewBuilder()
42+
.AddColumn("id", fluss::DataType::Int)
43+
.AddColumn("name", fluss::DataType::String)
44+
.AddColumn("score", fluss::DataType::Float)
45+
.AddColumn("age", fluss::DataType::Int)
46+
.Build();
47+
48+
auto descriptor = fluss::TableDescriptor::NewBuilder()
49+
.SetSchema(schema)
50+
.SetBucketCount(1)
51+
.SetProperty("table.log.arrow.compression.type", "NONE")
52+
.SetComment("cpp example table")
53+
.Build();
54+
55+
fluss::TablePath table_path("fluss", "sample_table_cpp_v1");
56+
// ignore_if_exists=true to allow re-run
57+
check("create_table", admin.CreateTable(table_path, descriptor, true));
58+
59+
// 4) Get table
60+
fluss::Table table;
61+
check("get_table", conn.GetTable(table_path, table));
62+
63+
// 5) Writer
64+
fluss::AppendWriter writer;
65+
check("new_append_writer", table.NewAppendWriter(writer));
66+
67+
struct RowData {
68+
int id;
69+
const char* name;
70+
float score;
71+
int age;
72+
};
73+
74+
std::vector<RowData> rows = {
75+
{1, "Alice", 95.2f, 25},
76+
{2, "Bob", 87.2f, 30},
77+
{3, "Charlie", 92.1f, 35},
78+
};
79+
80+
for (const auto& r : rows) {
81+
fluss::GenericRow row;
82+
row.SetInt32(0, r.id);
83+
row.SetString(1, r.name);
84+
row.SetFloat32(2, r.score);
85+
row.SetInt32(3, r.age);
86+
check("append", writer.Append(row));
87+
}
88+
check("flush", writer.Flush());
89+
std::cout << "Wrote " << rows.size() << " rows" << std::endl;
90+
91+
// 6) Scan
92+
fluss::LogScanner scanner;
93+
check("new_log_scanner", table.NewLogScanner(scanner));
94+
95+
auto info = table.GetTableInfo();
96+
int buckets = info.num_buckets;
97+
for (int b = 0; b < buckets; ++b) {
98+
check("subscribe", scanner.Subscribe(b, 0));
99+
}
100+
101+
fluss::ScanRecords records;
102+
check("poll", scanner.Poll(5000, records));
103+
104+
std::cout << "Scanned records: " << records.records.size() << std::endl;
105+
for (const auto& rec : records.records) {
106+
std::cout << " offset=" << rec.offset << " id=" << rec.row.fields[0].i32_val
107+
<< " name=" << rec.row.fields[1].string_val
108+
<< " score=" << rec.row.fields[2].f32_val << " age=" << rec.row.fields[3].i32_val
109+
<< " ts=" << rec.timestamp << std::endl;
110+
}
111+
112+
// 7) Project only id (0) and name (1) columns
113+
std::vector<size_t> projected_columns = {0, 1};
114+
fluss::LogScanner projected_scanner;
115+
check("new_log_scanner_with_projection",
116+
table.NewLogScannerWithProjection(projected_columns, projected_scanner));
117+
118+
for (int b = 0; b < buckets; ++b) {
119+
check("subscribe_projected", projected_scanner.Subscribe(b, 0));
120+
}
121+
122+
fluss::ScanRecords projected_records;
123+
check("poll_projected", projected_scanner.Poll(5000, projected_records));
124+
125+
std::cout << "Projected records: " << projected_records.records.size() << std::endl;
126+
127+
bool projection_verified = true;
128+
for (size_t i = 0; i < projected_records.records.size(); ++i) {
129+
const auto& rec = projected_records.records[i];
130+
const auto& row = rec.row;
131+
132+
if (row.fields.size() != projected_columns.size()) {
133+
std::cerr << "ERROR: Record " << i << " has " << row.fields.size()
134+
<< " fields, expected " << projected_columns.size() << std::endl;
135+
projection_verified = false;
136+
continue;
137+
}
138+
139+
// Verify field types match expected columns
140+
// Column 0 (id) should be Int32, Column 1 (name) should be String
141+
if (row.fields[0].type != fluss::DatumType::Int32) {
142+
std::cerr << "ERROR: Record " << i << " field 0 type mismatch, expected Int32" << std::endl;
143+
projection_verified = false;
144+
}
145+
if (row.fields[1].type != fluss::DatumType::String) {
146+
std::cerr << "ERROR: Record " << i << " field 1 type mismatch, expected String" << std::endl;
147+
projection_verified = false;
148+
}
149+
150+
// Print projected data
151+
if (row.fields[0].type == fluss::DatumType::Int32 &&
152+
row.fields[1].type == fluss::DatumType::String) {
153+
std::cout << " Record " << i << ": id=" << row.fields[0].i32_val
154+
<< ", name=" << row.fields[1].string_val << std::endl;
155+
}
156+
}
157+
158+
if (projection_verified) {
159+
std::cout << "Column pruning verification passed!" << std::endl;
160+
} else {
161+
std::cerr << "Column pruning verification failed!" << std::endl;
162+
std::exit(1);
163+
}
164+
165+
return 0;
166+
}

0 commit comments

Comments
 (0)