diff --git a/Cargo.lock b/Cargo.lock index eed6140..a654f04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2648,6 +2648,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a78046161564f5e7cd9008aff3b2990b3850dc8e0349119b98e8f251e099f24d" dependencies = [ "bitflags 2.10.0", + "chrono", "fallible-iterator 0.3.0", "fallible-streaming-iterator", "hashlink", @@ -3055,6 +3056,20 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "sqlite-watcher" +version = "0.1.0" +dependencies = [ + "anyhow", + "dirs", + "rand 0.8.5", + "rusqlite", + "serde", + "serde_json", + "tempfile", + "thiserror 1.0.69", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" diff --git a/Cargo.toml b/Cargo.toml index 72b4e75..ab7988c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,10 @@ +[workspace] +resolver = "2" +members = [ + ".", + "sqlite-watcher", +] + [package] name = "database-replicator" version = "7.0.14" diff --git a/sqlite-watcher/Cargo.toml b/sqlite-watcher/Cargo.toml new file mode 100644 index 0000000..5028dcc --- /dev/null +++ b/sqlite-watcher/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "sqlite-watcher" +version = "0.1.0" +edition = "2021" +authors = ["SerenAI "] +description = "Utilities for monitoring SQLite databases (queue module)." +license = "Apache-2.0" +repository = "https://github.com/serenorg/database-replicator" + +[dependencies] +anyhow = "1.0" +dirs = "5.0" +rusqlite = { version = "0.30", features = ["chrono"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "1.0" + +[dev-dependencies] +tempfile = "3.8" +rand = "0.8" diff --git a/sqlite-watcher/README.md b/sqlite-watcher/README.md new file mode 100644 index 0000000..6f798d3 --- /dev/null +++ b/sqlite-watcher/README.md @@ -0,0 +1,5 @@ +# sqlite-watcher + +Work-in-progress tooling for monitoring SQLite databases. This issue adds the durable change queue used by the watcher service. The queue stores row-level changes plus per-table checkpoints in `~/.seren/sqlite-watcher/changes.db` so restarts can resume from the last acknowledged WAL frame. + +Run `cargo test -p sqlite-watcher` to execute the queue integration tests. diff --git a/sqlite-watcher/src/lib.rs b/sqlite-watcher/src/lib.rs new file mode 100644 index 0000000..e8ae652 --- /dev/null +++ b/sqlite-watcher/src/lib.rs @@ -0,0 +1 @@ +pub mod queue; diff --git a/sqlite-watcher/src/queue.rs b/sqlite-watcher/src/queue.rs new file mode 100644 index 0000000..8a9b0b7 --- /dev/null +++ b/sqlite-watcher/src/queue.rs @@ -0,0 +1,225 @@ +use std::fs; +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Context, Result}; +use rusqlite::{params, Connection, OptionalExtension, Row}; +use serde::Serialize; + +const SCHEMA: &str = r#" +CREATE TABLE IF NOT EXISTS changes ( + change_id INTEGER PRIMARY KEY AUTOINCREMENT, + table_name TEXT NOT NULL, + op TEXT NOT NULL, + id TEXT NOT NULL, + payload BLOB, + wal_frame TEXT, + cursor TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + acked INTEGER NOT NULL DEFAULT 0 +); + +CREATE TABLE IF NOT EXISTS state ( + table_name TEXT PRIMARY KEY, + last_change_id INTEGER NOT NULL DEFAULT 0, + last_wal_frame TEXT, + cursor TEXT, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +"#; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +pub enum ChangeOperation { + Insert, + Update, + Delete, +} + +impl ChangeOperation { + pub fn as_str(&self) -> &'static str { + match self { + ChangeOperation::Insert => "insert", + ChangeOperation::Update => "update", + ChangeOperation::Delete => "delete", + } + } + + fn from_str(value: &str) -> Result { + match value { + "insert" => Ok(ChangeOperation::Insert), + "update" => Ok(ChangeOperation::Update), + "delete" => Ok(ChangeOperation::Delete), + other => Err(anyhow!("unknown change operation '{other}'")), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct NewChange { + pub table_name: String, + pub operation: ChangeOperation, + pub primary_key: String, + pub payload: Option>, + pub wal_frame: Option, + pub cursor: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChangeRecord { + pub change_id: i64, + pub table_name: String, + pub operation: ChangeOperation, + pub primary_key: String, + pub payload: Option>, + pub wal_frame: Option, + pub cursor: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct QueueState { + pub table_name: String, + pub last_change_id: i64, + pub last_wal_frame: Option, + pub cursor: Option, +} + +pub struct ChangeQueue { + path: PathBuf, + conn: Connection, +} + +impl ChangeQueue { + pub fn open(path: impl AsRef) -> Result { + let path = path.as_ref(); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).with_context(|| { + format!("failed to create queue directory {}", parent.display()) + })?; + #[cfg(unix)] + enforce_dir_perms(parent)?; + } + let conn = Connection::open(path) + .with_context(|| format!("failed to open queue database {}", path.display()))?; + conn.pragma_update(None, "journal_mode", &"wal").ok(); + conn.pragma_update(None, "synchronous", &"normal").ok(); + conn.execute_batch(SCHEMA) + .context("failed to initialize change queue schema")?; + Ok(Self { + path: path.to_path_buf(), + conn, + }) + } + + pub fn path(&self) -> &Path { + &self.path + } + + pub fn enqueue(&self, change: &NewChange) -> Result { + self.conn.execute( + "INSERT INTO changes(table_name, op, id, payload, wal_frame, cursor) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + params![ + change.table_name, + change.operation.as_str(), + change.primary_key, + change.payload, + change.wal_frame, + change.cursor, + ], + )?; + Ok(self.conn.last_insert_rowid()) + } + + pub fn fetch_batch(&self, limit: usize) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT change_id, table_name, op, id, payload, wal_frame, cursor + FROM changes WHERE acked = 0 ORDER BY change_id ASC LIMIT ?1", + )?; + let mut rows = stmt.query([limit as i64])?; + let mut results = Vec::new(); + while let Some(row) = rows.next()? { + results.push(row_to_change(&row)?); + } + Ok(results) + } + + pub fn ack_up_to(&self, change_id: i64) -> Result { + let updated = self.conn.execute( + "UPDATE changes SET acked = 1 WHERE change_id <= ?1", + [change_id], + )?; + Ok(updated as u64) + } + + pub fn purge_acked(&self) -> Result { + let deleted = self + .conn + .execute("DELETE FROM changes WHERE acked = 1", [])?; + Ok(deleted as u64) + } + + pub fn get_state(&self, table: &str) -> Result> { + self.conn + .prepare( + "SELECT table_name, last_change_id, last_wal_frame, cursor + FROM state WHERE table_name = ?1", + )? + .query_row([table], |row| { + Ok(QueueState { + table_name: row.get(0)?, + last_change_id: row.get(1)?, + last_wal_frame: row.get(2)?, + cursor: row.get(3)?, + }) + }) + .optional() + .map_err(Into::into) + } + + pub fn set_state(&self, state: &QueueState) -> Result<()> { + self.conn.execute( + "INSERT INTO state(table_name, last_change_id, last_wal_frame, cursor, updated_at) + VALUES (?1, ?2, ?3, ?4, CURRENT_TIMESTAMP) + ON CONFLICT(table_name) DO UPDATE SET + last_change_id = excluded.last_change_id, + last_wal_frame = excluded.last_wal_frame, + cursor = excluded.cursor, + updated_at = CURRENT_TIMESTAMP", + params![ + state.table_name, + state.last_change_id, + state.last_wal_frame, + state.cursor, + ], + )?; + Ok(()) + } +} + +fn row_to_change(row: &Row<'_>) -> Result { + let op_str: String = row.get(2)?; + Ok(ChangeRecord { + change_id: row.get(0)?, + table_name: row.get(1)?, + operation: ChangeOperation::from_str(&op_str)?, + primary_key: row.get(3)?, + payload: row.get(4)?, + wal_frame: row.get(5)?, + cursor: row.get(6)?, + }) +} + +#[cfg(unix)] +fn enforce_dir_perms(path: &Path) -> Result<()> { + use std::os::unix::fs::PermissionsExt; + + let metadata = fs::metadata(path)?; + let mut perms = metadata.permissions(); + perms.set_mode(0o700); + fs::set_permissions(path, perms)?; + Ok(()) +} + +#[cfg(not(unix))] +fn enforce_dir_perms(_path: &Path) -> Result<()> { + Ok(()) +} diff --git a/sqlite-watcher/tests/queue_tests.rs b/sqlite-watcher/tests/queue_tests.rs new file mode 100644 index 0000000..ae1d9d6 --- /dev/null +++ b/sqlite-watcher/tests/queue_tests.rs @@ -0,0 +1,59 @@ +use sqlite_watcher::queue::{ChangeOperation, ChangeQueue, NewChange, QueueState}; +use tempfile::tempdir; + +fn new_change(table: &str, id: &str, op: ChangeOperation) -> NewChange { + NewChange { + table_name: table.to_string(), + operation: op, + primary_key: id.to_string(), + payload: Some(format!("{{\"id\":\"{id}\"}}").into_bytes()), + wal_frame: Some("frame1".to_string()), + cursor: None, + } +} + +#[test] +fn durable_enqueue_and_ack_flow() { + let dir = tempdir().unwrap(); + let queue_path = dir.path().join("changes.db"); + let queue = ChangeQueue::open(&queue_path).unwrap(); + + let mut ids = Vec::new(); + for i in 0..3 { + let change = new_change("vaults", &format!("pk-{i}"), ChangeOperation::Insert); + ids.push(queue.enqueue(&change).unwrap()); + } + + let batch = queue.fetch_batch(10).unwrap(); + assert_eq!(batch.len(), 3); + + queue.ack_up_to(ids[1]).unwrap(); + queue.purge_acked().unwrap(); + + drop(queue); + + let reopened = ChangeQueue::open(&queue_path).unwrap(); + let remaining = reopened.fetch_batch(10).unwrap(); + assert_eq!(remaining.len(), 1); + assert_eq!(remaining[0].change_id, ids[2]); +} + +#[test] +fn state_round_trip() { + let dir = tempdir().unwrap(); + let queue_path = dir.path().join("state.db"); + let queue = ChangeQueue::open(&queue_path).unwrap(); + + assert!(queue.get_state("prices").unwrap().is_none()); + + let state = QueueState { + table_name: "prices".into(), + last_change_id: 42, + last_wal_frame: Some("frame-42".into()), + cursor: Some("cursor-data".into()), + }; + queue.set_state(&state).unwrap(); + + let fetched = queue.get_state("prices").unwrap().unwrap(); + assert_eq!(fetched, state); +}