Skip to content

Commit f079d2a

Browse files
committed
[sqlite-watcher] add tonic-based watcher server
1 parent 21c7150 commit f079d2a

File tree

9 files changed

+670
-7
lines changed

9 files changed

+670
-7
lines changed

Cargo.lock

Lines changed: 288 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqlite-watcher/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,24 @@ description = "SQLite WAL tailer that streams change events to database-replicat
77
license = "Apache-2.0"
88
repository = "https://github.com/serenorg/database-replicator"
99
readme = "README.md"
10+
build = "build.rs"
11+
12+
[build-dependencies]
13+
tonic-build = "0.11"
1014

1115
[dependencies]
1216
anyhow = "1.0"
1317
clap = { version = "4.4", features = ["derive", "env"] }
1418
dirs = "5.0"
19+
prost = "0.12"
1520
rusqlite = "0.30"
1621
serde_json = "1.0"
22+
tokio = { version = "1.35", features = ["macros", "rt-multi-thread", "signal", "fs"] }
23+
tonic = { version = "0.11", features = ["transport"] }
24+
tokio-stream = { version = "0.1", features = ["net"] }
1725
tracing = "0.1"
1826
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
1927

2028
[dev-dependencies]
2129
tempfile = "3.8"
30+
tokio = { version = "1.35", features = ["rt", "macros"] }

sqlite-watcher/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ Flag summary:
3232
- `--log-level`: Tracing filter (also settable via `SQLITE_WATCHER_LOG`).
3333
- `--poll-interval-ms`: How often to check the WAL file for growth (default 500 ms). Lower values react faster but cost more syscalls.
3434
- `--min-event-bytes`: Minimum WAL byte growth before emitting an event. Use larger values to avoid spam when very small transactions occur.
35+
- `--listen` + `--token-file` now control the embedded gRPC server. Clients must send `Authorization: Bearer <token>` metadata when calling the `Watcher` service (see `proto/watcher.proto`). Unix sockets/pipes are placeholders until Ticket D is completed; TCP listens on `127.0.0.1:<port>`.
3536

3637
## Cross-platform notes
3738

sqlite-watcher/build.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
fn main() -> Result<(), Box<dyn std::error::Error>> {
2+
tonic_build::configure()
3+
.build_client(true)
4+
.build_server(true)
5+
.compile(&["proto/watcher.proto"], &["proto"])?;
6+
println!("cargo:rerun-if-changed=proto/watcher.proto");
7+
Ok(())
8+
}

sqlite-watcher/proto/watcher.proto

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
syntax = "proto3";
2+
3+
package sqlitewatcher;
4+
5+
message HealthCheckRequest {}
6+
message HealthCheckResponse {
7+
string status = 1;
8+
}
9+
10+
message ListChangesRequest {
11+
uint32 limit = 1;
12+
}
13+
14+
message Change {
15+
int64 change_id = 1;
16+
string table_name = 2;
17+
string op = 3;
18+
string primary_key = 4;
19+
bytes payload = 5;
20+
string wal_frame = 6;
21+
string cursor = 7;
22+
}
23+
24+
message ListChangesResponse {
25+
repeated Change changes = 1;
26+
}
27+
28+
message AckChangesRequest {
29+
int64 up_to_change_id = 1;
30+
}
31+
32+
message AckChangesResponse {
33+
uint64 acknowledged = 1;
34+
}
35+
36+
message GetStateRequest {
37+
string table_name = 1;
38+
}
39+
40+
message GetStateResponse {
41+
bool exists = 1;
42+
int64 last_change_id = 2;
43+
string last_wal_frame = 3;
44+
string cursor = 4;
45+
}
46+
47+
message SetStateRequest {
48+
string table_name = 1;
49+
int64 last_change_id = 2;
50+
string last_wal_frame = 3;
51+
string cursor = 4;
52+
}
53+
54+
message SetStateResponse {}
55+
56+
service Watcher {
57+
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
58+
rpc ListChanges(ListChangesRequest) returns (ListChangesResponse);
59+
rpc AckChanges(AckChangesRequest) returns (AckChangesResponse);
60+
rpc GetState(GetStateRequest) returns (GetStateResponse);
61+
rpc SetState(SetStateRequest) returns (SetStateResponse);
62+
}

sqlite-watcher/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
pub mod change;
22
pub mod decoder;
33
pub mod queue;
4+
pub mod server;
45
pub mod wal;
6+
7+
pub mod watcher_proto {
8+
tonic::include_proto!("sqlitewatcher");
9+
}

sqlite-watcher/src/main.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use std::fmt;
2+
use std::fs;
3+
use std::net::SocketAddr;
24
use std::path::{Path, PathBuf};
35
use std::str::FromStr;
46
use std::sync::mpsc;
@@ -8,6 +10,7 @@ use anyhow::{anyhow, bail, Context, Result};
810
use clap::Parser;
911
use sqlite_watcher::decoder::WalGrowthDecoder;
1012
use sqlite_watcher::queue::ChangeQueue;
13+
use sqlite_watcher::server::TcpServerHandle;
1114
use sqlite_watcher::wal::{start_wal_watcher, WalWatcherConfig as TailConfig};
1215
use tracing_subscriber::EnvFilter;
1316

@@ -206,6 +209,7 @@ fn main() -> Result<()> {
206209
let cli = Cli::parse();
207210
init_tracing(&cli.log_filter)?;
208211
let config = WatcherConfig::try_from(cli)?;
212+
let auth_token = read_token_file(&config.token_file)?;
209213

210214
tracing::info!(
211215
db = %config.database_path.display(),
@@ -219,6 +223,7 @@ fn main() -> Result<()> {
219223

220224
let queue = ChangeQueue::open(&config.queue_path)?;
221225
let decoder = WalGrowthDecoder::default();
226+
let server_handle = start_grpc_server(&config.listen, &config.queue_path, &auth_token)?;
222227
let (event_tx, event_rx) = mpsc::channel();
223228
let _wal_handle = start_wal_watcher(
224229
&config.database_path,
@@ -246,6 +251,7 @@ fn main() -> Result<()> {
246251
}
247252
}
248253

254+
drop(server_handle);
249255
Ok(())
250256
}
251257

@@ -261,6 +267,43 @@ fn process_wal_event(
261267
Ok(ids)
262268
}
263269

270+
fn read_token_file(path: &Path) -> Result<String> {
271+
let contents = fs::read_to_string(path)
272+
.with_context(|| format!("failed to read token file {}", path.display()))?;
273+
let token = contents.trim().to_string();
274+
if token.is_empty() {
275+
bail!("token file {} is empty", path.display());
276+
}
277+
Ok(token)
278+
}
279+
280+
fn start_grpc_server(
281+
listen: &ListenAddress,
282+
queue_path: &Path,
283+
token: &str,
284+
) -> Result<Option<TcpServerHandle>> {
285+
match listen {
286+
ListenAddress::Tcp { host, port } => {
287+
let addr: SocketAddr = format!("{}:{}", host, port)
288+
.parse()
289+
.with_context(|| format!("invalid tcp listen address {host}:{port}"))?;
290+
let handle = TcpServerHandle::spawn(addr, queue_path.to_path_buf(), token.to_string())?;
291+
Ok(Some(handle))
292+
}
293+
ListenAddress::Unix(path) => {
294+
tracing::warn!(
295+
path = %path.display(),
296+
"unix socket gRPC transport is not yet implemented"
297+
);
298+
Ok(None)
299+
}
300+
ListenAddress::Pipe(name) => {
301+
tracing::warn!(pipe = name, "named pipe transport is not yet implemented");
302+
Ok(None)
303+
}
304+
}
305+
}
306+
264307
#[cfg(test)]
265308
mod tests {
266309
use super::*;

0 commit comments

Comments
 (0)