Skip to content

Commit 9fcea91

Browse files
authored
Issue 79: sqlite-watcher bootstrap (#87)
* [sqlite-watcher] scaffold crate with wal watcher + queue * [sqlite-watcher] add RowChange abstraction and queue wiring * [sqlite-watcher] introduce wal growth decoder * [sqlite-watcher] add tonic-based watcher server * [sqlite-watcher] add unix socket listener and tests
1 parent 50bbbbc commit 9fcea91

File tree

8 files changed

+1120
-0
lines changed

8 files changed

+1120
-0
lines changed

sqlite-watcher/build.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
fn main() -> Result<(), Box<dyn std::error::Error>> {
2+
tonic_build::configure()
3+
.build_client(true)
4+
.build_server(true)
5+
.compile(&["proto/watcher.proto"], &["proto"])?;
6+
println!("cargo:rerun-if-changed=proto/watcher.proto");
7+
Ok(())
8+
}

sqlite-watcher/proto/watcher.proto

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
syntax = "proto3";
2+
3+
package sqlitewatcher;
4+
5+
message HealthCheckRequest {}
6+
message HealthCheckResponse {
7+
string status = 1;
8+
}
9+
10+
message ListChangesRequest {
11+
uint32 limit = 1;
12+
}
13+
14+
message Change {
15+
int64 change_id = 1;
16+
string table_name = 2;
17+
string op = 3;
18+
string primary_key = 4;
19+
bytes payload = 5;
20+
string wal_frame = 6;
21+
string cursor = 7;
22+
}
23+
24+
message ListChangesResponse {
25+
repeated Change changes = 1;
26+
}
27+
28+
message AckChangesRequest {
29+
int64 up_to_change_id = 1;
30+
}
31+
32+
message AckChangesResponse {
33+
uint64 acknowledged = 1;
34+
}
35+
36+
message GetStateRequest {
37+
string table_name = 1;
38+
}
39+
40+
message GetStateResponse {
41+
bool exists = 1;
42+
int64 last_change_id = 2;
43+
string last_wal_frame = 3;
44+
string cursor = 4;
45+
}
46+
47+
message SetStateRequest {
48+
string table_name = 1;
49+
int64 last_change_id = 2;
50+
string last_wal_frame = 3;
51+
string cursor = 4;
52+
}
53+
54+
message SetStateResponse {}
55+
56+
service Watcher {
57+
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
58+
rpc ListChanges(ListChangesRequest) returns (ListChangesResponse);
59+
rpc AckChanges(AckChangesRequest) returns (AckChangesResponse);
60+
rpc GetState(GetStateRequest) returns (GetStateResponse);
61+
rpc SetState(SetStateRequest) returns (SetStateResponse);
62+
}

sqlite-watcher/src/change.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use serde_json::Value;
2+
3+
use crate::queue::{ChangeOperation, NewChange};
4+
5+
#[derive(Debug, Clone, PartialEq)]
6+
pub struct RowChange {
7+
pub table_name: String,
8+
pub operation: ChangeOperation,
9+
pub primary_key: String,
10+
pub payload: Option<Value>,
11+
pub wal_frame: Option<String>,
12+
pub cursor: Option<String>,
13+
}
14+
15+
impl RowChange {
16+
pub fn into_new_change(self) -> NewChange {
17+
let payload = self
18+
.payload
19+
.map(|value| serde_json::to_vec(&value).expect("row change payload serializes"));
20+
NewChange {
21+
table_name: self.table_name,
22+
operation: self.operation,
23+
primary_key: self.primary_key,
24+
payload,
25+
wal_frame: self.wal_frame,
26+
cursor: self.cursor,
27+
}
28+
}
29+
}
30+
31+
#[cfg(test)]
32+
mod tests {
33+
use super::*;
34+
35+
#[test]
36+
fn converts_to_new_change() {
37+
let row = RowChange {
38+
table_name: "prices".into(),
39+
operation: ChangeOperation::Update,
40+
primary_key: "pk1".into(),
41+
payload: Some(serde_json::json!({"foo": "bar"})),
42+
wal_frame: Some("frame-1".into()),
43+
cursor: Some("cursor".into()),
44+
};
45+
let change = row.into_new_change();
46+
assert_eq!(change.table_name, "prices");
47+
assert!(change.payload.unwrap().contains(&b'b'));
48+
}
49+
}

sqlite-watcher/src/decoder.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use crate::change::RowChange;
2+
use crate::queue::ChangeOperation;
3+
use crate::wal::WalEvent;
4+
use serde_json::json;
5+
use std::time::{SystemTime, UNIX_EPOCH};
6+
7+
/// Temporary decoder that turns WAL growth bytes into placeholder RowChange events.
8+
/// Placeholder until row-level decoding is implemented.
9+
#[derive(Debug, Default, Clone)]
10+
pub struct WalGrowthDecoder;
11+
12+
impl WalGrowthDecoder {
13+
pub fn decode(&self, event: &WalEvent) -> Vec<RowChange> {
14+
let now = SystemTime::now()
15+
.duration_since(UNIX_EPOCH)
16+
.expect("clock should be >= UNIX epoch");
17+
vec![RowChange {
18+
table_name: "__wal__".to_string(),
19+
operation: ChangeOperation::Insert,
20+
primary_key: now.as_nanos().to_string(),
21+
payload: Some(json!({
22+
"kind": "wal_growth",
23+
"bytes_added": event.bytes_added,
24+
"current_size": event.current_size,
25+
"recorded_at": now.as_secs_f64(),
26+
})),
27+
wal_frame: None,
28+
cursor: None,
29+
}]
30+
}
31+
}
32+
33+
#[cfg(test)]
34+
mod tests {
35+
use super::*;
36+
37+
#[test]
38+
fn produces_placeholder_row_change() {
39+
let decoder = WalGrowthDecoder::default();
40+
let rows = decoder.decode(&WalEvent {
41+
bytes_added: 1024,
42+
current_size: 2048,
43+
});
44+
assert_eq!(rows.len(), 1);
45+
assert_eq!(rows[0].table_name, "__wal__");
46+
assert_eq!(rows[0].operation, ChangeOperation::Insert);
47+
}
48+
}

0 commit comments

Comments
 (0)