Skip to content

Commit 7b6b633

Browse files
committed
Merge origin/main into issue-83-plan-replicator-sync-sqlite
2 parents 61b67ac + 822ab37 commit 7b6b633

File tree

5 files changed

+444
-0
lines changed

5 files changed

+444
-0
lines changed

sqlite-watcher/src/change.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use serde_json::Value;
2+
3+
use crate::queue::{ChangeOperation, NewChange};
4+
5+
#[derive(Debug, Clone, PartialEq)]
6+
pub struct RowChange {
7+
pub table_name: String,
8+
pub operation: ChangeOperation,
9+
pub primary_key: String,
10+
pub payload: Option<Value>,
11+
pub wal_frame: Option<String>,
12+
pub cursor: Option<String>,
13+
}
14+
15+
impl RowChange {
16+
pub fn into_new_change(self) -> NewChange {
17+
let payload = self
18+
.payload
19+
.map(|value| serde_json::to_vec(&value).expect("row change payload serializes"));
20+
NewChange {
21+
table_name: self.table_name,
22+
operation: self.operation,
23+
primary_key: self.primary_key,
24+
payload,
25+
wal_frame: self.wal_frame,
26+
cursor: self.cursor,
27+
}
28+
}
29+
}
30+
31+
#[cfg(test)]
32+
mod tests {
33+
use super::*;
34+
35+
#[test]
36+
fn converts_to_new_change() {
37+
let row = RowChange {
38+
table_name: "prices".into(),
39+
operation: ChangeOperation::Update,
40+
primary_key: "pk1".into(),
41+
payload: Some(serde_json::json!({"foo": "bar"})),
42+
wal_frame: Some("frame-1".into()),
43+
cursor: Some("cursor".into()),
44+
};
45+
let change = row.into_new_change();
46+
assert_eq!(change.table_name, "prices");
47+
assert!(change.payload.unwrap().contains(&b'b'));
48+
}
49+
}

sqlite-watcher/src/decoder.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use crate::change::RowChange;
2+
use crate::queue::ChangeOperation;
3+
use crate::wal::WalEvent;
4+
use serde_json::json;
5+
use std::time::{SystemTime, UNIX_EPOCH};
6+
7+
/// Temporary decoder that turns WAL growth bytes into placeholder RowChange events.
8+
/// Placeholder until row-level decoding is implemented.
9+
#[derive(Debug, Default, Clone)]
10+
pub struct WalGrowthDecoder;
11+
12+
impl WalGrowthDecoder {
13+
pub fn decode(&self, event: &WalEvent) -> Vec<RowChange> {
14+
let now = SystemTime::now()
15+
.duration_since(UNIX_EPOCH)
16+
.expect("clock should be >= UNIX epoch");
17+
vec![RowChange {
18+
table_name: "__wal__".to_string(),
19+
operation: ChangeOperation::Insert,
20+
primary_key: now.as_nanos().to_string(),
21+
payload: Some(json!({
22+
"kind": "wal_growth",
23+
"bytes_added": event.bytes_added,
24+
"current_size": event.current_size,
25+
"recorded_at": now.as_secs_f64(),
26+
})),
27+
wal_frame: None,
28+
cursor: None,
29+
}]
30+
}
31+
}
32+
33+
#[cfg(test)]
34+
mod tests {
35+
use super::*;
36+
37+
#[test]
38+
fn produces_placeholder_row_change() {
39+
let decoder = WalGrowthDecoder::default();
40+
let rows = decoder.decode(&WalEvent {
41+
bytes_added: 1024,
42+
current_size: 2048,
43+
});
44+
assert_eq!(rows.len(), 1);
45+
assert_eq!(rows[0].table_name, "__wal__");
46+
assert_eq!(rows[0].operation, ChangeOperation::Insert);
47+
}
48+
}

sqlite-watcher/src/wal.rs

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
use std::ffi::OsString;
2+
use std::path::{Path, PathBuf};
3+
use std::sync::atomic::{AtomicBool, Ordering};
4+
use std::sync::mpsc::Sender;
5+
use std::sync::Arc;
6+
use std::thread::{self, JoinHandle};
7+
use std::time::Duration;
8+
9+
use anyhow::{Context, Result};
10+
use tracing::{debug, warn};
11+
12+
#[derive(Debug, Clone, Copy)]
13+
pub struct WalWatcherConfig {
14+
pub poll_interval: Duration,
15+
pub min_event_bytes: u64,
16+
}
17+
18+
impl Default for WalWatcherConfig {
19+
fn default() -> Self {
20+
Self {
21+
poll_interval: Duration::from_millis(500),
22+
min_event_bytes: 0,
23+
}
24+
}
25+
}
26+
27+
#[derive(Debug, Clone, PartialEq, Eq)]
28+
pub struct WalEvent {
29+
pub bytes_added: u64,
30+
pub current_size: u64,
31+
}
32+
33+
pub struct WalWatcherHandle {
34+
stop: Arc<AtomicBool>,
35+
thread: Option<JoinHandle<()>>,
36+
}
37+
38+
impl Drop for WalWatcherHandle {
39+
fn drop(&mut self) {
40+
self.stop.store(true, Ordering::SeqCst);
41+
if let Some(handle) = self.thread.take() {
42+
let _ = handle.join();
43+
}
44+
}
45+
}
46+
47+
pub fn start_wal_watcher<P: AsRef<Path>>(
48+
db_path: P,
49+
options: WalWatcherConfig,
50+
sender: Sender<WalEvent>,
51+
) -> Result<WalWatcherHandle> {
52+
let db_path = db_path.as_ref().canonicalize().with_context(|| {
53+
format!(
54+
"failed to canonicalize database path {}",
55+
db_path.as_ref().display()
56+
)
57+
})?;
58+
if !db_path.is_file() {
59+
anyhow::bail!("database path {} is not a file", db_path.display());
60+
}
61+
62+
let wal_path = wal_file_path(&db_path);
63+
let poll_interval = options.poll_interval;
64+
let min_event_bytes = options.min_event_bytes;
65+
let stop_flag = Arc::new(AtomicBool::new(false));
66+
let thread_stop = Arc::clone(&stop_flag);
67+
68+
let handle = thread::spawn(move || {
69+
let mut last_len = wal_file_size(&wal_path).unwrap_or(0);
70+
debug!(
71+
wal = %wal_path.display(),
72+
last_len,
73+
"wal watcher started"
74+
);
75+
while !thread_stop.load(Ordering::SeqCst) {
76+
match wal_file_size(&wal_path) {
77+
Ok(len) => {
78+
if len < last_len {
79+
debug!(
80+
wal = %wal_path.display(),
81+
prev = last_len,
82+
current = len,
83+
"wal truncated; resetting baseline"
84+
);
85+
last_len = len;
86+
} else if len > last_len {
87+
let delta = len - last_len;
88+
last_len = len;
89+
if delta >= min_event_bytes {
90+
let event = WalEvent {
91+
bytes_added: delta,
92+
current_size: len,
93+
};
94+
if sender.send(event).is_err() {
95+
debug!("wal watcher stopping because receiver closed");
96+
break;
97+
}
98+
}
99+
}
100+
}
101+
Err(err) => {
102+
if err.kind() == std::io::ErrorKind::NotFound {
103+
last_len = 0;
104+
} else {
105+
warn!(
106+
wal = %wal_path.display(),
107+
error = %err,
108+
"failed to read wal metadata"
109+
);
110+
}
111+
}
112+
}
113+
114+
thread::sleep(poll_interval);
115+
}
116+
117+
debug!("wal watcher exiting");
118+
});
119+
120+
Ok(WalWatcherHandle {
121+
stop: stop_flag,
122+
thread: Some(handle),
123+
})
124+
}
125+
126+
fn wal_file_path(db_path: &Path) -> PathBuf {
127+
let mut os_string = OsString::from(db_path.as_os_str());
128+
os_string.push("-wal");
129+
PathBuf::from(os_string)
130+
}
131+
132+
fn wal_file_size(path: &Path) -> std::io::Result<u64> {
133+
std::fs::metadata(path).map(|m| m.len())
134+
}
135+
136+
#[cfg(test)]
137+
mod tests {
138+
use super::*;
139+
use rusqlite::Connection;
140+
use std::sync::mpsc::channel;
141+
use std::time::{Duration, Instant};
142+
use tempfile::tempdir;
143+
144+
#[test]
145+
fn emits_event_when_wal_grows() {
146+
let dir = tempdir().unwrap();
147+
let db_path = dir.path().join("watch.sqlite");
148+
let writer = Connection::open(&db_path).unwrap();
149+
writer.pragma_update(None, "journal_mode", &"wal").unwrap();
150+
writer
151+
.pragma_update(None, "wal_autocheckpoint", &0i64)
152+
.unwrap();
153+
writer
154+
.execute(
155+
"CREATE TABLE changes(id INTEGER PRIMARY KEY, value TEXT)",
156+
[],
157+
)
158+
.unwrap();
159+
160+
let (tx, rx) = channel();
161+
let handle = start_wal_watcher(
162+
&db_path,
163+
WalWatcherConfig {
164+
poll_interval: Duration::from_millis(50),
165+
min_event_bytes: 1,
166+
},
167+
tx,
168+
)
169+
.unwrap();
170+
171+
for i in 0..50 {
172+
writer
173+
.execute(
174+
"INSERT INTO changes(value) VALUES (?1)",
175+
[format!("value-{i}")],
176+
)
177+
.unwrap();
178+
}
179+
180+
let event = rx.recv_timeout(Duration::from_secs(5)).unwrap();
181+
assert!(event.bytes_added > 0);
182+
assert!(event.current_size >= event.bytes_added);
183+
184+
drop(handle);
185+
}
186+
187+
#[test]
188+
fn handles_wal_truncation() {
189+
let dir = tempdir().unwrap();
190+
let db_path = dir.path().join("truncate.sqlite");
191+
let writer = Connection::open(&db_path).unwrap();
192+
writer.pragma_update(None, "journal_mode", &"wal").unwrap();
193+
writer
194+
.pragma_update(None, "wal_autocheckpoint", &0i64)
195+
.unwrap();
196+
writer
197+
.execute("CREATE TABLE stuff(id INTEGER PRIMARY KEY, value TEXT)", [])
198+
.unwrap();
199+
200+
let (tx, rx) = channel();
201+
let handle = start_wal_watcher(
202+
&db_path,
203+
WalWatcherConfig {
204+
poll_interval: Duration::from_millis(25),
205+
min_event_bytes: 1,
206+
},
207+
tx,
208+
)
209+
.unwrap();
210+
211+
for i in 0..10 {
212+
writer
213+
.execute("INSERT INTO stuff(value) VALUES (?1)", [format!("row-{i}")])
214+
.unwrap();
215+
}
216+
217+
rx.recv_timeout(Duration::from_secs(5)).unwrap();
218+
219+
writer
220+
.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
221+
.unwrap();
222+
223+
// Ensure watcher does not send negative deltas (would panic or overflow)
224+
let start = Instant::now();
225+
loop {
226+
if rx.recv_timeout(Duration::from_millis(100)).is_ok() {
227+
break;
228+
}
229+
if start.elapsed() > Duration::from_millis(500) {
230+
break;
231+
}
232+
}
233+
234+
drop(handle);
235+
}
236+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use sqlite_watcher::queue::{ChangeOperation, ChangeQueue, NewChange, QueueState};
2+
use tempfile::tempdir;
3+
4+
fn new_change(table: &str, id: &str, op: ChangeOperation) -> NewChange {
5+
NewChange {
6+
table_name: table.to_string(),
7+
operation: op,
8+
primary_key: id.to_string(),
9+
payload: Some(format!("{{\"id\":\"{id}\"}}").into_bytes()),
10+
wal_frame: Some("frame1".to_string()),
11+
cursor: None,
12+
}
13+
}
14+
15+
#[test]
16+
fn durable_enqueue_and_ack_flow() {
17+
let dir = tempdir().unwrap();
18+
let queue_path = dir.path().join("changes.db");
19+
let queue = ChangeQueue::open(&queue_path).unwrap();
20+
21+
let mut ids = Vec::new();
22+
for i in 0..3 {
23+
let change = new_change("vaults", &format!("pk-{i}"), ChangeOperation::Insert);
24+
ids.push(queue.enqueue(&change).unwrap());
25+
}
26+
27+
let batch = queue.fetch_batch(10).unwrap();
28+
assert_eq!(batch.len(), 3);
29+
30+
queue.ack_up_to(ids[1]).unwrap();
31+
queue.purge_acked().unwrap();
32+
33+
drop(queue);
34+
35+
let reopened = ChangeQueue::open(&queue_path).unwrap();
36+
let remaining = reopened.fetch_batch(10).unwrap();
37+
assert_eq!(remaining.len(), 1);
38+
assert_eq!(remaining[0].change_id, ids[2]);
39+
}
40+
41+
#[test]
42+
fn state_round_trip() {
43+
let dir = tempdir().unwrap();
44+
let queue_path = dir.path().join("state.db");
45+
let queue = ChangeQueue::open(&queue_path).unwrap();
46+
47+
assert!(queue.get_state("prices").unwrap().is_none());
48+
49+
let state = QueueState {
50+
table_name: "prices".into(),
51+
last_change_id: 42,
52+
last_wal_frame: Some("frame-42".into()),
53+
cursor: Some("cursor-data".into()),
54+
};
55+
queue.set_state(&state).unwrap();
56+
57+
let fetched = queue.get_state("prices").unwrap().unwrap();
58+
assert_eq!(fetched, state);
59+
}

0 commit comments

Comments
 (0)