Skip to content

Commit 65594dc

Browse files
committed
[sqlite-watcher] scaffold crate with wal watcher + queue
1 parent 8566627 commit 65594dc

File tree

9 files changed

+873
-0
lines changed

9 files changed

+873
-0
lines changed

Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
[workspace]
2+
resolver = "2"
3+
members = [
4+
"sqlite-watcher",
5+
]
6+
17
[package]
28
name = "database-replicator"
39
version = "7.0.14"

sqlite-watcher/Cargo.toml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[package]
2+
name = "sqlite-watcher"
3+
version = "0.1.0"
4+
edition = "2021"
5+
authors = ["SerenAI <[email protected]>"]
6+
description = "SQLite WAL tailer that streams change events to database-replicator."
7+
license = "Apache-2.0"
8+
repository = "https://github.com/serenorg/database-replicator"
9+
readme = "README.md"
10+
11+
[dependencies]
12+
anyhow = "1.0"
13+
clap = { version = "4.4", features = ["derive", "env"] }
14+
dirs = "5.0"
15+
rusqlite = "0.30"
16+
tracing = "0.1"
17+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
18+
19+
[dev-dependencies]
20+
tempfile = "3.8"

sqlite-watcher/README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# sqlite-watcher
2+
3+
`sqlite-watcher` tails SQLite WAL files and exposes change streams that `database-replicator` can consume for incremental syncs. The current milestone focuses on a CLI skeleton plus a WAL-growth watcher loop so we can exercise configuration, logging, and packaging before wiring in the change queue + gRPC service described in `docs/plans/sqlite-watcher-plan.md`.
4+
5+
## Building
6+
7+
```bash
8+
cargo build -p sqlite-watcher
9+
```
10+
11+
This crate participates in the main workspace, so `cargo build --workspace` or `cargo test --workspace` will also compile it.
12+
13+
## CLI usage
14+
15+
```bash
16+
sqlite-watcher \
17+
--db /path/to/database.db \
18+
--listen unix:/tmp/sqlite-watcher.sock \
19+
--token-file ~/.seren/sqlite-watcher/token \
20+
--log-level info \
21+
--poll-interval-ms 250 \
22+
--min-event-bytes 4096
23+
```
24+
25+
Flag summary:
26+
27+
- `--db` (required): SQLite file to monitor; must exist and be accessible in WAL mode.
28+
- `--listen`: Listener endpoint; accepts `unix:/path`, `tcp:<port>`, or `pipe:<name>`.
29+
- `--token-file`: Shared-secret used to authenticate gRPC clients (defaults to `~/.seren/sqlite-watcher/token`).
30+
- `--log-level`: Tracing filter (also settable via `SQLITE_WATCHER_LOG`).
31+
- `--poll-interval-ms`: How often to check the WAL file for growth (default 500 ms). Lower values react faster but cost more syscalls.
32+
- `--min-event-bytes`: Minimum WAL byte growth before emitting an event. Use larger values to avoid spam when very small transactions occur.
33+
34+
## Cross-platform notes
35+
36+
- **Linux/macOS**: Default listener is a Unix domain socket at `/tmp/sqlite-watcher.sock`. Ensure the target SQLite database enables WAL journaling.
37+
- **Windows**: Unix sockets are disabled; pass `--listen tcp:50051` or `--listen pipe:SerenWatcher`. Named pipes allow local service accounts without opening TCP ports.
38+
- All platforms expect the token file to live under `~/.seren/sqlite-watcher/token` by default; create the directory with `0700` permissions so the watcher refuses to start if the secret is world-readable.
39+
- The current WAL watcher polls the `*.sqlite-wal` file for byte growth. To keep WAL history available, configure your writers with `PRAGMA journal_mode=WAL;` and raise `wal_autocheckpoint` (or disable it) so the SQLite engine does not aggressively truncate the log.
40+
41+
Additional design constraints and follow-up work items live in `docs/plans/sqlite-watcher-plan.md` and `docs/plans/sqlite-watcher-tickets.md`.

sqlite-watcher/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod queue;
2+
pub mod wal;

sqlite-watcher/src/main.rs

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
use std::fmt;
2+
use std::path::{Path, PathBuf};
3+
use std::str::FromStr;
4+
use std::sync::mpsc;
5+
use std::time::Duration;
6+
7+
use anyhow::{anyhow, bail, Context, Result};
8+
use clap::Parser;
9+
use sqlite_watcher::wal::{start_wal_watcher, WalWatcherConfig as TailConfig};
10+
use tracing_subscriber::EnvFilter;
11+
12+
#[cfg(unix)]
13+
const DEFAULT_LISTEN: &str = "unix:/tmp/sqlite-watcher.sock";
14+
#[cfg(not(unix))]
15+
const DEFAULT_LISTEN: &str = "tcp:50051";
16+
17+
/// Command-line interface definition for sqlite-watcher.
18+
#[derive(Debug, Clone, Parser)]
19+
#[command(
20+
name = "sqlite-watcher",
21+
version,
22+
about = "Tails SQLite WAL files and exposes change streams.",
23+
long_about = None
24+
)]
25+
struct Cli {
26+
/// Path to the SQLite database the watcher should monitor.
27+
#[arg(long = "db", value_name = "PATH")]
28+
db_path: PathBuf,
29+
30+
/// Listener binding. Accepts unix:/path, tcp:<port>, or pipe:<name>.
31+
#[arg(long, value_name = "ENDPOINT", default_value = DEFAULT_LISTEN)]
32+
listen: String,
33+
34+
/// Shared-secret token file used to authenticate RPC clients.
35+
#[arg(long = "token-file", value_name = "PATH")]
36+
token_file: Option<PathBuf>,
37+
38+
/// Tracing filter (info,warn,debug,trace). Can also be provided via SQLITE_WATCHER_LOG.
39+
#[arg(
40+
long = "log-level",
41+
value_name = "FILTER",
42+
default_value = "info",
43+
env = "SQLITE_WATCHER_LOG"
44+
)]
45+
log_filter: String,
46+
47+
/// Interval in milliseconds between WAL file polls.
48+
#[arg(
49+
long = "poll-interval-ms",
50+
default_value_t = 500,
51+
value_parser = clap::value_parser!(u64).range(50..=60_000)
52+
)]
53+
poll_interval_ms: u64,
54+
55+
/// Minimum WAL byte growth required before emitting an event.
56+
#[arg(
57+
long = "min-event-bytes",
58+
default_value_t = 1,
59+
value_parser = clap::value_parser!(u64).range(1..=10_000_000)
60+
)]
61+
min_event_bytes: u64,
62+
}
63+
64+
#[derive(Debug, Clone, PartialEq, Eq)]
65+
enum ListenAddress {
66+
Unix(PathBuf),
67+
Tcp { host: String, port: u16 },
68+
Pipe(String),
69+
}
70+
71+
impl fmt::Display for ListenAddress {
72+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73+
match self {
74+
ListenAddress::Unix(path) => write!(f, "unix:{}", path.display()),
75+
ListenAddress::Tcp { host, port } => write!(f, "tcp:{}:{}", host, port),
76+
ListenAddress::Pipe(name) => write!(f, "pipe:{}", name),
77+
}
78+
}
79+
}
80+
81+
impl FromStr for ListenAddress {
82+
type Err = anyhow::Error;
83+
84+
fn from_str(value: &str) -> Result<Self, Self::Err> {
85+
if let Some(path) = value.strip_prefix("unix:") {
86+
if cfg!(windows) {
87+
bail!("unix sockets are not supported on Windows");
88+
}
89+
if path.is_empty() {
90+
bail!("unix listen path cannot be empty");
91+
}
92+
return Ok(ListenAddress::Unix(PathBuf::from(path)));
93+
}
94+
95+
if let Some(port) = value.strip_prefix("tcp:") {
96+
let port: u16 = port
97+
.parse()
98+
.map_err(|_| anyhow!("tcp listener must specify a numeric port"))?;
99+
return Ok(ListenAddress::Tcp {
100+
host: "127.0.0.1".to_string(),
101+
port,
102+
});
103+
}
104+
105+
if let Some(name) = value.strip_prefix("pipe:") {
106+
if cfg!(not(windows)) {
107+
bail!("named pipes are only valid on Windows");
108+
}
109+
if name.is_empty() {
110+
bail!("pipe name cannot be empty");
111+
}
112+
return Ok(ListenAddress::Pipe(name.to_string()));
113+
}
114+
115+
bail!("listen endpoint must start with unix:/, tcp:, or pipe:");
116+
}
117+
}
118+
119+
#[derive(Debug, Clone)]
120+
struct WatcherConfig {
121+
database_path: PathBuf,
122+
listen: ListenAddress,
123+
token_file: PathBuf,
124+
poll_interval: Duration,
125+
min_event_bytes: u64,
126+
}
127+
128+
impl TryFrom<Cli> for WatcherConfig {
129+
type Error = anyhow::Error;
130+
131+
fn try_from(args: Cli) -> Result<Self> {
132+
let database_path = ensure_sqlite_file(&args.db_path)?;
133+
let listen = ListenAddress::from_str(args.listen.trim())?;
134+
let token_file = match args.token_file {
135+
Some(path) => expand_home(path)?,
136+
None => default_token_path()?,
137+
};
138+
139+
Ok(Self {
140+
database_path,
141+
listen,
142+
token_file,
143+
poll_interval: Duration::from_millis(args.poll_interval_ms),
144+
min_event_bytes: args.min_event_bytes,
145+
})
146+
}
147+
}
148+
149+
fn ensure_sqlite_file(path: &Path) -> Result<PathBuf> {
150+
if !path.exists() {
151+
bail!("database path {} does not exist", path.display());
152+
}
153+
if !path.is_file() {
154+
bail!("database path {} is not a file", path.display());
155+
}
156+
Ok(path
157+
.canonicalize()
158+
.with_context(|| format!("failed to canonicalize {}", path.display()))?)
159+
}
160+
161+
fn default_token_path() -> Result<PathBuf> {
162+
let home = dirs::home_dir().ok_or_else(|| anyhow!("unable to determine home directory"))?;
163+
Ok(home.join(".seren/sqlite-watcher/token"))
164+
}
165+
166+
fn expand_home(path: PathBuf) -> Result<PathBuf> {
167+
let as_str = path.to_string_lossy();
168+
if let Some(stripped) = as_str.strip_prefix("~/") {
169+
let home = dirs::home_dir().ok_or_else(|| anyhow!("unable to determine home directory"))?;
170+
return Ok(home.join(stripped));
171+
}
172+
if as_str == "~" {
173+
let home = dirs::home_dir().ok_or_else(|| anyhow!("unable to determine home directory"))?;
174+
return Ok(home);
175+
}
176+
Ok(path)
177+
}
178+
179+
fn init_tracing(filter: &str) -> Result<()> {
180+
let env_filter = EnvFilter::try_new(filter).or_else(|_| EnvFilter::try_new("info"))?;
181+
tracing_subscriber::fmt()
182+
.with_env_filter(env_filter)
183+
.with_target(false)
184+
.try_init()
185+
.map_err(|err| anyhow!("failed to init tracing subscriber: {err}"))
186+
}
187+
188+
fn main() -> Result<()> {
189+
let cli = Cli::parse();
190+
init_tracing(&cli.log_filter)?;
191+
let config = WatcherConfig::try_from(cli)?;
192+
193+
tracing::info!(
194+
db = %config.database_path.display(),
195+
listen = %config.listen,
196+
token = %config.token_file.display(),
197+
poll_ms = config.poll_interval.as_millis(),
198+
min_event_bytes = config.min_event_bytes,
199+
"sqlite-watcher starting"
200+
);
201+
202+
let (event_tx, event_rx) = mpsc::channel();
203+
let _wal_handle = start_wal_watcher(
204+
&config.database_path,
205+
TailConfig {
206+
poll_interval: config.poll_interval,
207+
min_event_bytes: config.min_event_bytes,
208+
},
209+
event_tx,
210+
)?;
211+
212+
for event in event_rx {
213+
tracing::info!(
214+
bytes_added = event.bytes_added,
215+
wal_size = event.current_size,
216+
"wal file grew"
217+
);
218+
}
219+
220+
Ok(())
221+
}
222+
223+
#[cfg(test)]
224+
mod tests {
225+
use super::*;
226+
use clap::Parser;
227+
228+
#[test]
229+
fn parses_tcp_listener() {
230+
let tmp = tempfile::NamedTempFile::new().unwrap();
231+
let cli = Cli::try_parse_from([
232+
"sqlite-watcher",
233+
"--db",
234+
tmp.path().to_str().unwrap(),
235+
"--listen",
236+
"tcp:6000",
237+
"--token-file",
238+
"./token",
239+
"--log-level",
240+
"debug",
241+
])
242+
.expect("cli parsing failed");
243+
244+
let config = WatcherConfig::try_from(cli).expect("config conversion failed");
245+
assert!(matches!(
246+
config.listen,
247+
ListenAddress::Tcp { host, port } if host == "127.0.0.1" && port == 6000
248+
));
249+
assert!(config.token_file.ends_with("token"));
250+
}
251+
252+
#[test]
253+
#[cfg(unix)]
254+
fn parses_unix_listener_default() {
255+
let tmp = tempfile::NamedTempFile::new().unwrap();
256+
let cli =
257+
Cli::try_parse_from(["sqlite-watcher", "--db", tmp.path().to_str().unwrap()]).unwrap();
258+
let config = WatcherConfig::try_from(cli).unwrap();
259+
assert!(matches!(config.listen, ListenAddress::Unix(_)));
260+
}
261+
}

0 commit comments

Comments
 (0)