Skip to content

Commit 5188e44

Browse files
committed
[sqlite-watcher] add RowChange abstraction and queue wiring
1 parent 65594dc commit 5188e44

File tree

6 files changed

+137
-9
lines changed

6 files changed

+137
-9
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqlite-watcher/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ anyhow = "1.0"
1313
clap = { version = "4.4", features = ["derive", "env"] }
1414
dirs = "5.0"
1515
rusqlite = "0.30"
16+
serde_json = "1.0"
1617
tracing = "0.1"
1718
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
1819

sqlite-watcher/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ sqlite-watcher \
1818
--listen unix:/tmp/sqlite-watcher.sock \
1919
--token-file ~/.seren/sqlite-watcher/token \
2020
--log-level info \
21+
--queue-db ~/.seren/sqlite-watcher/changes.db \
2122
--poll-interval-ms 250 \
2223
--min-event-bytes 4096
2324
```
@@ -27,6 +28,7 @@ Flag summary:
2728
- `--db` (required): SQLite file to monitor; must exist and be accessible in WAL mode.
2829
- `--listen`: Listener endpoint; accepts `unix:/path`, `tcp:<port>`, or `pipe:<name>`.
2930
- `--token-file`: Shared-secret used to authenticate gRPC clients (defaults to `~/.seren/sqlite-watcher/token`).
31+
- `--queue-db`: SQLite file used to persist change events + checkpoints (defaults to `~/.seren/sqlite-watcher/changes.db`).
3032
- `--log-level`: Tracing filter (also settable via `SQLITE_WATCHER_LOG`).
3133
- `--poll-interval-ms`: How often to check the WAL file for growth (default 500 ms). Lower values react faster but cost more syscalls.
3234
- `--min-event-bytes`: Minimum WAL byte growth before emitting an event. Use larger values to avoid spam when very small transactions occur.
@@ -37,5 +39,10 @@ Flag summary:
3739
- **Windows**: Unix sockets are disabled; pass `--listen tcp:50051` or `--listen pipe:SerenWatcher`. Named pipes allow local service accounts without opening TCP ports.
3840
- All platforms expect the token file to live under `~/.seren/sqlite-watcher/token` by default; create the directory with `0700` permissions so the watcher refuses to start if the secret is world-readable.
3941
- The current WAL watcher polls the `*.sqlite-wal` file for byte growth. To keep WAL history available, configure your writers with `PRAGMA journal_mode=WAL;` and raise `wal_autocheckpoint` (or disable it) so the SQLite engine does not aggressively truncate the log.
42+
- Change queue data is stored under `~/.seren/sqlite-watcher/changes.db`. The binary enforces owner-only permissions on that directory to keep tokens + change data private.
43+
44+
### Placeholder change format
45+
46+
Until the WAL decoder lands, each growth event is recorded as a `RowChange` with `table_name="__wal__"`, `operation=insert`, and a JSON payload describing the byte delta + timestamp. Downstream components can treat these as heartbeats while we finish Tickets B–D.
4047

4148
Additional design constraints and follow-up work items live in `docs/plans/sqlite-watcher-plan.md` and `docs/plans/sqlite-watcher-tickets.md`.

sqlite-watcher/src/change.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use serde_json::Value;
2+
3+
use crate::queue::{ChangeOperation, NewChange};
4+
5+
#[derive(Debug, Clone, PartialEq)]
6+
pub struct RowChange {
7+
pub table_name: String,
8+
pub operation: ChangeOperation,
9+
pub primary_key: String,
10+
pub payload: Option<Value>,
11+
pub wal_frame: Option<String>,
12+
pub cursor: Option<String>,
13+
}
14+
15+
impl RowChange {
16+
pub fn into_new_change(self) -> NewChange {
17+
let payload = self
18+
.payload
19+
.map(|value| serde_json::to_vec(&value).expect("row change payload serializes"));
20+
NewChange {
21+
table_name: self.table_name,
22+
operation: self.operation,
23+
primary_key: self.primary_key,
24+
payload,
25+
wal_frame: self.wal_frame,
26+
cursor: self.cursor,
27+
}
28+
}
29+
}
30+
31+
#[cfg(test)]
32+
mod tests {
33+
use super::*;
34+
35+
#[test]
36+
fn converts_to_new_change() {
37+
let row = RowChange {
38+
table_name: "prices".into(),
39+
operation: ChangeOperation::Update,
40+
primary_key: "pk1".into(),
41+
payload: Some(serde_json::json!({"foo": "bar"})),
42+
wal_frame: Some("frame-1".into()),
43+
cursor: Some("cursor".into()),
44+
};
45+
let change = row.into_new_change();
46+
assert_eq!(change.table_name, "prices");
47+
assert!(change.payload.unwrap().contains(&b'b'));
48+
}
49+
}

sqlite-watcher/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
pub mod change;
12
pub mod queue;
23
pub mod wal;

sqlite-watcher/src/main.rs

Lines changed: 78 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ use std::fmt;
22
use std::path::{Path, PathBuf};
33
use std::str::FromStr;
44
use std::sync::mpsc;
5-
use std::time::Duration;
5+
use std::time::{Duration, SystemTime, UNIX_EPOCH};
66

77
use anyhow::{anyhow, bail, Context, Result};
88
use clap::Parser;
9-
use sqlite_watcher::wal::{start_wal_watcher, WalWatcherConfig as TailConfig};
9+
use serde_json::json;
10+
use sqlite_watcher::change::RowChange;
11+
use sqlite_watcher::queue::{ChangeOperation, ChangeQueue};
12+
use sqlite_watcher::wal::{start_wal_watcher, WalEvent, WalWatcherConfig as TailConfig};
1013
use tracing_subscriber::EnvFilter;
1114

1215
#[cfg(unix)]
@@ -35,6 +38,10 @@ struct Cli {
3538
#[arg(long = "token-file", value_name = "PATH")]
3639
token_file: Option<PathBuf>,
3740

41+
/// Path to the durable change queue database.
42+
#[arg(long = "queue-db", value_name = "PATH")]
43+
queue_db: Option<PathBuf>,
44+
3845
/// Tracing filter (info,warn,debug,trace). Can also be provided via SQLITE_WATCHER_LOG.
3946
#[arg(
4047
long = "log-level",
@@ -121,6 +128,7 @@ struct WatcherConfig {
121128
database_path: PathBuf,
122129
listen: ListenAddress,
123130
token_file: PathBuf,
131+
queue_path: PathBuf,
124132
poll_interval: Duration,
125133
min_event_bytes: u64,
126134
}
@@ -135,11 +143,16 @@ impl TryFrom<Cli> for WatcherConfig {
135143
Some(path) => expand_home(path)?,
136144
None => default_token_path()?,
137145
};
146+
let queue_path = match args.queue_db {
147+
Some(path) => expand_home(path)?,
148+
None => default_queue_path()?,
149+
};
138150

139151
Ok(Self {
140152
database_path,
141153
listen,
142154
token_file,
155+
queue_path,
143156
poll_interval: Duration::from_millis(args.poll_interval_ms),
144157
min_event_bytes: args.min_event_bytes,
145158
})
@@ -163,6 +176,11 @@ fn default_token_path() -> Result<PathBuf> {
163176
Ok(home.join(".seren/sqlite-watcher/token"))
164177
}
165178

179+
fn default_queue_path() -> Result<PathBuf> {
180+
let home = dirs::home_dir().ok_or_else(|| anyhow!("unable to determine home directory"))?;
181+
Ok(home.join(".seren/sqlite-watcher/changes.db"))
182+
}
183+
166184
fn expand_home(path: PathBuf) -> Result<PathBuf> {
167185
let as_str = path.to_string_lossy();
168186
if let Some(stripped) = as_str.strip_prefix("~/") {
@@ -194,11 +212,13 @@ fn main() -> Result<()> {
194212
db = %config.database_path.display(),
195213
listen = %config.listen,
196214
token = %config.token_file.display(),
215+
queue = %config.queue_path.display(),
197216
poll_ms = config.poll_interval.as_millis(),
198217
min_event_bytes = config.min_event_bytes,
199218
"sqlite-watcher starting"
200219
);
201220

221+
let queue = ChangeQueue::open(&config.queue_path)?;
202222
let (event_tx, event_rx) = mpsc::channel();
203223
let _wal_handle = start_wal_watcher(
204224
&config.database_path,
@@ -210,24 +230,55 @@ fn main() -> Result<()> {
210230
)?;
211231

212232
for event in event_rx {
213-
tracing::info!(
214-
bytes_added = event.bytes_added,
215-
wal_size = event.current_size,
216-
"wal file grew"
217-
);
233+
match persist_wal_event(&queue, &event) {
234+
Ok(change_id) => {
235+
tracing::info!(
236+
bytes_added = event.bytes_added,
237+
wal_size = event.current_size,
238+
change_id,
239+
"queued wal growth event"
240+
);
241+
}
242+
Err(err) => {
243+
tracing::warn!(error = %err, "failed to persist wal event to queue");
244+
}
245+
}
218246
}
219247

220248
Ok(())
221249
}
222250

251+
fn persist_wal_event(queue: &ChangeQueue, event: &WalEvent) -> Result<i64> {
252+
let now = SystemTime::now()
253+
.duration_since(UNIX_EPOCH)
254+
.map_err(|err| anyhow!("system clock drift: {err}"))?;
255+
let row = RowChange {
256+
table_name: "__wal__".to_string(),
257+
operation: ChangeOperation::Insert,
258+
primary_key: now.as_nanos().to_string(),
259+
payload: Some(json!({
260+
"kind": "wal_growth",
261+
"bytes_added": event.bytes_added,
262+
"current_size": event.current_size,
263+
"recorded_at": now.as_secs_f64(),
264+
})),
265+
wal_frame: None,
266+
cursor: None,
267+
};
268+
queue.enqueue(&row.into_new_change())
269+
}
270+
223271
#[cfg(test)]
224272
mod tests {
225273
use super::*;
226274
use clap::Parser;
275+
use sqlite_watcher::queue::ChangeQueue;
276+
use sqlite_watcher::wal::WalEvent;
277+
use tempfile::{tempdir, NamedTempFile};
227278

228279
#[test]
229280
fn parses_tcp_listener() {
230-
let tmp = tempfile::NamedTempFile::new().unwrap();
281+
let tmp = NamedTempFile::new().unwrap();
231282
let cli = Cli::try_parse_from([
232283
"sqlite-watcher",
233284
"--db",
@@ -247,15 +298,33 @@ mod tests {
247298
ListenAddress::Tcp { host, port } if host == "127.0.0.1" && port == 6000
248299
));
249300
assert!(config.token_file.ends_with("token"));
301+
assert!(config.queue_path.ends_with("changes.db"));
250302
}
251303

252304
#[test]
253305
#[cfg(unix)]
254306
fn parses_unix_listener_default() {
255-
let tmp = tempfile::NamedTempFile::new().unwrap();
307+
let tmp = NamedTempFile::new().unwrap();
256308
let cli =
257309
Cli::try_parse_from(["sqlite-watcher", "--db", tmp.path().to_str().unwrap()]).unwrap();
258310
let config = WatcherConfig::try_from(cli).unwrap();
259311
assert!(matches!(config.listen, ListenAddress::Unix(_)));
260312
}
313+
314+
#[test]
315+
fn persist_wal_events_into_queue() {
316+
let dir = tempdir().unwrap();
317+
let queue_path = dir.path().join("queue.db");
318+
let queue = ChangeQueue::open(&queue_path).unwrap();
319+
320+
let event = WalEvent {
321+
bytes_added: 2048,
322+
current_size: 4096,
323+
};
324+
let change_id = persist_wal_event(&queue, &event).unwrap();
325+
let batch = queue.fetch_batch(10).unwrap();
326+
assert_eq!(batch.len(), 1);
327+
assert_eq!(batch[0].change_id, change_id);
328+
assert_eq!(batch[0].table_name, "__wal__");
329+
}
261330
}

0 commit comments

Comments
 (0)