Skip to content

Commit 50bbbbc

Browse files
authored
[sqlite-watcher] add durable change queue (#86)
1 parent 8566627 commit 50bbbbc

File tree

7 files changed

+332
-0
lines changed

7 files changed

+332
-0
lines changed

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
[workspace]
2+
resolver = "2"
3+
members = [
4+
".",
5+
"sqlite-watcher",
6+
]
7+
18
[package]
29
name = "database-replicator"
310
version = "7.0.14"

sqlite-watcher/Cargo.toml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[package]
2+
name = "sqlite-watcher"
3+
version = "0.1.0"
4+
edition = "2021"
5+
authors = ["SerenAI <[email protected]>"]
6+
description = "Utilities for monitoring SQLite databases (queue module)."
7+
license = "Apache-2.0"
8+
repository = "https://github.com/serenorg/database-replicator"
9+
10+
[dependencies]
11+
anyhow = "1.0"
12+
dirs = "5.0"
13+
rusqlite = { version = "0.30", features = ["chrono"] }
14+
serde = { version = "1.0", features = ["derive"] }
15+
serde_json = "1.0"
16+
thiserror = "1.0"
17+
18+
[dev-dependencies]
19+
tempfile = "3.8"
20+
rand = "0.8"

sqlite-watcher/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# sqlite-watcher
2+
3+
Work-in-progress tooling for monitoring SQLite databases. This issue adds the durable change queue used by the watcher service. The queue stores row-level changes plus per-table checkpoints in `~/.seren/sqlite-watcher/changes.db` so restarts can resume from the last acknowledged WAL frame.
4+
5+
Run `cargo test -p sqlite-watcher` to execute the queue integration tests.

sqlite-watcher/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod queue;

sqlite-watcher/src/queue.rs

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
use std::fs;
2+
use std::path::{Path, PathBuf};
3+
4+
use anyhow::{anyhow, Context, Result};
5+
use rusqlite::{params, Connection, OptionalExtension, Row};
6+
use serde::Serialize;
7+
8+
const SCHEMA: &str = r#"
9+
CREATE TABLE IF NOT EXISTS changes (
10+
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
11+
table_name TEXT NOT NULL,
12+
op TEXT NOT NULL,
13+
id TEXT NOT NULL,
14+
payload BLOB,
15+
wal_frame TEXT,
16+
cursor TEXT,
17+
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
18+
acked INTEGER NOT NULL DEFAULT 0
19+
);
20+
21+
CREATE TABLE IF NOT EXISTS state (
22+
table_name TEXT PRIMARY KEY,
23+
last_change_id INTEGER NOT NULL DEFAULT 0,
24+
last_wal_frame TEXT,
25+
cursor TEXT,
26+
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
27+
);
28+
"#;
29+
30+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
31+
pub enum ChangeOperation {
32+
Insert,
33+
Update,
34+
Delete,
35+
}
36+
37+
impl ChangeOperation {
38+
pub fn as_str(&self) -> &'static str {
39+
match self {
40+
ChangeOperation::Insert => "insert",
41+
ChangeOperation::Update => "update",
42+
ChangeOperation::Delete => "delete",
43+
}
44+
}
45+
46+
fn from_str(value: &str) -> Result<Self> {
47+
match value {
48+
"insert" => Ok(ChangeOperation::Insert),
49+
"update" => Ok(ChangeOperation::Update),
50+
"delete" => Ok(ChangeOperation::Delete),
51+
other => Err(anyhow!("unknown change operation '{other}'")),
52+
}
53+
}
54+
}
55+
56+
#[derive(Debug, Clone, PartialEq, Eq)]
57+
pub struct NewChange {
58+
pub table_name: String,
59+
pub operation: ChangeOperation,
60+
pub primary_key: String,
61+
pub payload: Option<Vec<u8>>,
62+
pub wal_frame: Option<String>,
63+
pub cursor: Option<String>,
64+
}
65+
66+
#[derive(Debug, Clone, PartialEq, Eq)]
67+
pub struct ChangeRecord {
68+
pub change_id: i64,
69+
pub table_name: String,
70+
pub operation: ChangeOperation,
71+
pub primary_key: String,
72+
pub payload: Option<Vec<u8>>,
73+
pub wal_frame: Option<String>,
74+
pub cursor: Option<String>,
75+
}
76+
77+
#[derive(Debug, Clone, PartialEq, Eq)]
78+
pub struct QueueState {
79+
pub table_name: String,
80+
pub last_change_id: i64,
81+
pub last_wal_frame: Option<String>,
82+
pub cursor: Option<String>,
83+
}
84+
85+
pub struct ChangeQueue {
86+
path: PathBuf,
87+
conn: Connection,
88+
}
89+
90+
impl ChangeQueue {
91+
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
92+
let path = path.as_ref();
93+
if let Some(parent) = path.parent() {
94+
fs::create_dir_all(parent).with_context(|| {
95+
format!("failed to create queue directory {}", parent.display())
96+
})?;
97+
#[cfg(unix)]
98+
enforce_dir_perms(parent)?;
99+
}
100+
let conn = Connection::open(path)
101+
.with_context(|| format!("failed to open queue database {}", path.display()))?;
102+
conn.pragma_update(None, "journal_mode", &"wal").ok();
103+
conn.pragma_update(None, "synchronous", &"normal").ok();
104+
conn.execute_batch(SCHEMA)
105+
.context("failed to initialize change queue schema")?;
106+
Ok(Self {
107+
path: path.to_path_buf(),
108+
conn,
109+
})
110+
}
111+
112+
pub fn path(&self) -> &Path {
113+
&self.path
114+
}
115+
116+
pub fn enqueue(&self, change: &NewChange) -> Result<i64> {
117+
self.conn.execute(
118+
"INSERT INTO changes(table_name, op, id, payload, wal_frame, cursor)
119+
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
120+
params![
121+
change.table_name,
122+
change.operation.as_str(),
123+
change.primary_key,
124+
change.payload,
125+
change.wal_frame,
126+
change.cursor,
127+
],
128+
)?;
129+
Ok(self.conn.last_insert_rowid())
130+
}
131+
132+
pub fn fetch_batch(&self, limit: usize) -> Result<Vec<ChangeRecord>> {
133+
let mut stmt = self.conn.prepare(
134+
"SELECT change_id, table_name, op, id, payload, wal_frame, cursor
135+
FROM changes WHERE acked = 0 ORDER BY change_id ASC LIMIT ?1",
136+
)?;
137+
let mut rows = stmt.query([limit as i64])?;
138+
let mut results = Vec::new();
139+
while let Some(row) = rows.next()? {
140+
results.push(row_to_change(&row)?);
141+
}
142+
Ok(results)
143+
}
144+
145+
pub fn ack_up_to(&self, change_id: i64) -> Result<u64> {
146+
let updated = self.conn.execute(
147+
"UPDATE changes SET acked = 1 WHERE change_id <= ?1",
148+
[change_id],
149+
)?;
150+
Ok(updated as u64)
151+
}
152+
153+
pub fn purge_acked(&self) -> Result<u64> {
154+
let deleted = self
155+
.conn
156+
.execute("DELETE FROM changes WHERE acked = 1", [])?;
157+
Ok(deleted as u64)
158+
}
159+
160+
pub fn get_state(&self, table: &str) -> Result<Option<QueueState>> {
161+
self.conn
162+
.prepare(
163+
"SELECT table_name, last_change_id, last_wal_frame, cursor
164+
FROM state WHERE table_name = ?1",
165+
)?
166+
.query_row([table], |row| {
167+
Ok(QueueState {
168+
table_name: row.get(0)?,
169+
last_change_id: row.get(1)?,
170+
last_wal_frame: row.get(2)?,
171+
cursor: row.get(3)?,
172+
})
173+
})
174+
.optional()
175+
.map_err(Into::into)
176+
}
177+
178+
pub fn set_state(&self, state: &QueueState) -> Result<()> {
179+
self.conn.execute(
180+
"INSERT INTO state(table_name, last_change_id, last_wal_frame, cursor, updated_at)
181+
VALUES (?1, ?2, ?3, ?4, CURRENT_TIMESTAMP)
182+
ON CONFLICT(table_name) DO UPDATE SET
183+
last_change_id = excluded.last_change_id,
184+
last_wal_frame = excluded.last_wal_frame,
185+
cursor = excluded.cursor,
186+
updated_at = CURRENT_TIMESTAMP",
187+
params![
188+
state.table_name,
189+
state.last_change_id,
190+
state.last_wal_frame,
191+
state.cursor,
192+
],
193+
)?;
194+
Ok(())
195+
}
196+
}
197+
198+
fn row_to_change(row: &Row<'_>) -> Result<ChangeRecord> {
199+
let op_str: String = row.get(2)?;
200+
Ok(ChangeRecord {
201+
change_id: row.get(0)?,
202+
table_name: row.get(1)?,
203+
operation: ChangeOperation::from_str(&op_str)?,
204+
primary_key: row.get(3)?,
205+
payload: row.get(4)?,
206+
wal_frame: row.get(5)?,
207+
cursor: row.get(6)?,
208+
})
209+
}
210+
211+
#[cfg(unix)]
212+
fn enforce_dir_perms(path: &Path) -> Result<()> {
213+
use std::os::unix::fs::PermissionsExt;
214+
215+
let metadata = fs::metadata(path)?;
216+
let mut perms = metadata.permissions();
217+
perms.set_mode(0o700);
218+
fs::set_permissions(path, perms)?;
219+
Ok(())
220+
}
221+
222+
#[cfg(not(unix))]
223+
fn enforce_dir_perms(_path: &Path) -> Result<()> {
224+
Ok(())
225+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use sqlite_watcher::queue::{ChangeOperation, ChangeQueue, NewChange, QueueState};
2+
use tempfile::tempdir;
3+
4+
fn new_change(table: &str, id: &str, op: ChangeOperation) -> NewChange {
5+
NewChange {
6+
table_name: table.to_string(),
7+
operation: op,
8+
primary_key: id.to_string(),
9+
payload: Some(format!("{{\"id\":\"{id}\"}}").into_bytes()),
10+
wal_frame: Some("frame1".to_string()),
11+
cursor: None,
12+
}
13+
}
14+
15+
#[test]
16+
fn durable_enqueue_and_ack_flow() {
17+
let dir = tempdir().unwrap();
18+
let queue_path = dir.path().join("changes.db");
19+
let queue = ChangeQueue::open(&queue_path).unwrap();
20+
21+
let mut ids = Vec::new();
22+
for i in 0..3 {
23+
let change = new_change("vaults", &format!("pk-{i}"), ChangeOperation::Insert);
24+
ids.push(queue.enqueue(&change).unwrap());
25+
}
26+
27+
let batch = queue.fetch_batch(10).unwrap();
28+
assert_eq!(batch.len(), 3);
29+
30+
queue.ack_up_to(ids[1]).unwrap();
31+
queue.purge_acked().unwrap();
32+
33+
drop(queue);
34+
35+
let reopened = ChangeQueue::open(&queue_path).unwrap();
36+
let remaining = reopened.fetch_batch(10).unwrap();
37+
assert_eq!(remaining.len(), 1);
38+
assert_eq!(remaining[0].change_id, ids[2]);
39+
}
40+
41+
#[test]
42+
fn state_round_trip() {
43+
let dir = tempdir().unwrap();
44+
let queue_path = dir.path().join("state.db");
45+
let queue = ChangeQueue::open(&queue_path).unwrap();
46+
47+
assert!(queue.get_state("prices").unwrap().is_none());
48+
49+
let state = QueueState {
50+
table_name: "prices".into(),
51+
last_change_id: 42,
52+
last_wal_frame: Some("frame-42".into()),
53+
cursor: Some("cursor-data".into()),
54+
};
55+
queue.set_state(&state).unwrap();
56+
57+
let fetched = queue.get_state("prices").unwrap().unwrap();
58+
assert_eq!(fetched, state);
59+
}

0 commit comments

Comments
 (0)