Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
[workspace]
resolver = "2"
members = [
".",
"sqlite-watcher",
]

[package]
name = "database-replicator"
version = "7.0.14"
Expand Down
20 changes: 20 additions & 0 deletions sqlite-watcher/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "sqlite-watcher"
version = "0.1.0"
edition = "2021"
authors = ["SerenAI <[email protected]>"]
description = "Utilities for monitoring SQLite databases (queue module)."
license = "Apache-2.0"
repository = "https://github.com/serenorg/database-replicator"

[dependencies]
anyhow = "1.0"
dirs = "5.0"
rusqlite = { version = "0.30", features = ["chrono"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"

[dev-dependencies]
tempfile = "3.8"
rand = "0.8"
5 changes: 5 additions & 0 deletions sqlite-watcher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# sqlite-watcher

Work-in-progress tooling for monitoring SQLite databases. This issue adds the durable change queue used by the watcher service. The queue stores row-level changes plus per-table checkpoints in `~/.seren/sqlite-watcher/changes.db` so restarts can resume from the last acknowledged WAL frame.

Run `cargo test -p sqlite-watcher` to execute the queue integration tests.
1 change: 1 addition & 0 deletions sqlite-watcher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod queue;
225 changes: 225 additions & 0 deletions sqlite-watcher/src/queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
use std::fs;
use std::path::{Path, PathBuf};

use anyhow::{anyhow, Context, Result};
use rusqlite::{params, Connection, OptionalExtension, Row};
use serde::Serialize;

const SCHEMA: &str = r#"
CREATE TABLE IF NOT EXISTS changes (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
op TEXT NOT NULL,
id TEXT NOT NULL,
payload BLOB,
wal_frame TEXT,
cursor TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
acked INTEGER NOT NULL DEFAULT 0
);

CREATE TABLE IF NOT EXISTS state (
table_name TEXT PRIMARY KEY,
last_change_id INTEGER NOT NULL DEFAULT 0,
last_wal_frame TEXT,
cursor TEXT,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
"#;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub enum ChangeOperation {
Insert,
Update,
Delete,
}

impl ChangeOperation {
pub fn as_str(&self) -> &'static str {
match self {
ChangeOperation::Insert => "insert",
ChangeOperation::Update => "update",
ChangeOperation::Delete => "delete",
}
}

fn from_str(value: &str) -> Result<Self> {
match value {
"insert" => Ok(ChangeOperation::Insert),
"update" => Ok(ChangeOperation::Update),
"delete" => Ok(ChangeOperation::Delete),
other => Err(anyhow!("unknown change operation '{other}'")),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NewChange {
pub table_name: String,
pub operation: ChangeOperation,
pub primary_key: String,
pub payload: Option<Vec<u8>>,
pub wal_frame: Option<String>,
pub cursor: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChangeRecord {
pub change_id: i64,
pub table_name: String,
pub operation: ChangeOperation,
pub primary_key: String,
pub payload: Option<Vec<u8>>,
pub wal_frame: Option<String>,
pub cursor: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueueState {
pub table_name: String,
pub last_change_id: i64,
pub last_wal_frame: Option<String>,
pub cursor: Option<String>,
}

pub struct ChangeQueue {
path: PathBuf,
conn: Connection,
}

impl ChangeQueue {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).with_context(|| {
format!("failed to create queue directory {}", parent.display())
})?;
#[cfg(unix)]
enforce_dir_perms(parent)?;
}
let conn = Connection::open(path)
.with_context(|| format!("failed to open queue database {}", path.display()))?;
conn.pragma_update(None, "journal_mode", &"wal").ok();
conn.pragma_update(None, "synchronous", &"normal").ok();
conn.execute_batch(SCHEMA)
.context("failed to initialize change queue schema")?;
Ok(Self {
path: path.to_path_buf(),
conn,
})
}

pub fn path(&self) -> &Path {
&self.path
}

pub fn enqueue(&self, change: &NewChange) -> Result<i64> {
self.conn.execute(
"INSERT INTO changes(table_name, op, id, payload, wal_frame, cursor)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![
change.table_name,
change.operation.as_str(),
change.primary_key,
change.payload,
change.wal_frame,
change.cursor,
],
)?;
Ok(self.conn.last_insert_rowid())
}

pub fn fetch_batch(&self, limit: usize) -> Result<Vec<ChangeRecord>> {
let mut stmt = self.conn.prepare(
"SELECT change_id, table_name, op, id, payload, wal_frame, cursor
FROM changes WHERE acked = 0 ORDER BY change_id ASC LIMIT ?1",
)?;
let mut rows = stmt.query([limit as i64])?;
let mut results = Vec::new();
while let Some(row) = rows.next()? {
results.push(row_to_change(&row)?);
}
Ok(results)
}

pub fn ack_up_to(&self, change_id: i64) -> Result<u64> {
let updated = self.conn.execute(
"UPDATE changes SET acked = 1 WHERE change_id <= ?1",
[change_id],
)?;
Ok(updated as u64)
}

pub fn purge_acked(&self) -> Result<u64> {
let deleted = self
.conn
.execute("DELETE FROM changes WHERE acked = 1", [])?;
Ok(deleted as u64)
}

pub fn get_state(&self, table: &str) -> Result<Option<QueueState>> {
self.conn
.prepare(
"SELECT table_name, last_change_id, last_wal_frame, cursor
FROM state WHERE table_name = ?1",
)?
.query_row([table], |row| {
Ok(QueueState {
table_name: row.get(0)?,
last_change_id: row.get(1)?,
last_wal_frame: row.get(2)?,
cursor: row.get(3)?,
})
})
.optional()
.map_err(Into::into)
}

pub fn set_state(&self, state: &QueueState) -> Result<()> {
self.conn.execute(
"INSERT INTO state(table_name, last_change_id, last_wal_frame, cursor, updated_at)
VALUES (?1, ?2, ?3, ?4, CURRENT_TIMESTAMP)
ON CONFLICT(table_name) DO UPDATE SET
last_change_id = excluded.last_change_id,
last_wal_frame = excluded.last_wal_frame,
cursor = excluded.cursor,
updated_at = CURRENT_TIMESTAMP",
params![
state.table_name,
state.last_change_id,
state.last_wal_frame,
state.cursor,
],
)?;
Ok(())
}
}

fn row_to_change(row: &Row<'_>) -> Result<ChangeRecord> {
let op_str: String = row.get(2)?;
Ok(ChangeRecord {
change_id: row.get(0)?,
table_name: row.get(1)?,
operation: ChangeOperation::from_str(&op_str)?,
primary_key: row.get(3)?,
payload: row.get(4)?,
wal_frame: row.get(5)?,
cursor: row.get(6)?,
})
}

#[cfg(unix)]
fn enforce_dir_perms(path: &Path) -> Result<()> {
use std::os::unix::fs::PermissionsExt;

let metadata = fs::metadata(path)?;
let mut perms = metadata.permissions();
perms.set_mode(0o700);
fs::set_permissions(path, perms)?;
Ok(())
}

#[cfg(not(unix))]
fn enforce_dir_perms(_path: &Path) -> Result<()> {
Ok(())
}
59 changes: 59 additions & 0 deletions sqlite-watcher/tests/queue_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use sqlite_watcher::queue::{ChangeOperation, ChangeQueue, NewChange, QueueState};
use tempfile::tempdir;

fn new_change(table: &str, id: &str, op: ChangeOperation) -> NewChange {
NewChange {
table_name: table.to_string(),
operation: op,
primary_key: id.to_string(),
payload: Some(format!("{{\"id\":\"{id}\"}}").into_bytes()),
wal_frame: Some("frame1".to_string()),
cursor: None,
}
}

#[test]
fn durable_enqueue_and_ack_flow() {
let dir = tempdir().unwrap();
let queue_path = dir.path().join("changes.db");
let queue = ChangeQueue::open(&queue_path).unwrap();

let mut ids = Vec::new();
for i in 0..3 {
let change = new_change("vaults", &format!("pk-{i}"), ChangeOperation::Insert);
ids.push(queue.enqueue(&change).unwrap());
}

let batch = queue.fetch_batch(10).unwrap();
assert_eq!(batch.len(), 3);

queue.ack_up_to(ids[1]).unwrap();
queue.purge_acked().unwrap();

drop(queue);

let reopened = ChangeQueue::open(&queue_path).unwrap();
let remaining = reopened.fetch_batch(10).unwrap();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].change_id, ids[2]);
}

#[test]
fn state_round_trip() {
let dir = tempdir().unwrap();
let queue_path = dir.path().join("state.db");
let queue = ChangeQueue::open(&queue_path).unwrap();

assert!(queue.get_state("prices").unwrap().is_none());

let state = QueueState {
table_name: "prices".into(),
last_change_id: 42,
last_wal_frame: Some("frame-42".into()),
cursor: Some("cursor-data".into()),
};
queue.set_state(&state).unwrap();

let fetched = queue.get_state("prices").unwrap().unwrap();
assert_eq!(fetched, state);
}
Loading