Skip to content

Commit 4d3cb34

Browse files
authored
Fix sqlite watcher clippy warnings (#90)
* [sync-sqlite] add watcher client and incremental command * [docs/packaging] add watcher crate, release artifacts, and smoke test * chore(docs): move sqlite docs under sqlite-watcher-docs * fix(ci): replace unmaintained deps for security audit * Fix sqlite watcher clippy warnings
1 parent 822ab37 commit 4d3cb34

File tree

10 files changed

+613
-45
lines changed

10 files changed

+613
-45
lines changed

Cargo.lock

Lines changed: 0 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

PR_BODY.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
## Summary
2+
- fix sqlite sync change-state handling by treating watcher wal_frame/cursor fields as optional strings and cleaning up unused code
3+
- implement FromStr for sqlite ChangeOperation and resolve needless borrow lints in queue/server modules
4+
- keep clippy happy by applying the suggested clamp change and ensuring proto tests build
5+
6+
## Testing
7+
- cargo clippy
8+
- cargo test

sqlite-watcher-docs/README-SQLite.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,7 @@ No. The tool uses `SQLITE_OPEN_READ_ONLY` which allows concurrent readers. Other
596596
For issues or questions:
597597
- **GitHub Issues**: https://github.com/serenorg/database-replicator/issues
598598
- **Email**: [email protected]
599+
<<<<<<< HEAD:README-SQLite.md
599600
## Delta replication with sqlite-watcher
600601

601602
Once you have completed the initial snapshot (`database-replicator init --source sqlite ...`), you can switch to incremental change capture:

sqlite-watcher/src/queue.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::fs;
22
use std::path::{Path, PathBuf};
3+
use std::str::FromStr;
34

45
use anyhow::{anyhow, Context, Result};
56
use rusqlite::{params, Connection, OptionalExtension, Row};
@@ -41,8 +42,12 @@ impl ChangeOperation {
4142
ChangeOperation::Delete => "delete",
4243
}
4344
}
45+
}
46+
47+
impl FromStr for ChangeOperation {
48+
type Err = anyhow::Error;
4449

45-
pub fn from_str(value: &str) -> Result<Self> {
50+
fn from_str(value: &str) -> Result<Self, Self::Err> {
4651
match value {
4752
"insert" => Ok(ChangeOperation::Insert),
4853
"update" => Ok(ChangeOperation::Update),
@@ -98,8 +103,8 @@ impl ChangeQueue {
98103
}
99104
let conn = Connection::open(path)
100105
.with_context(|| format!("failed to open queue database {}", path.display()))?;
101-
conn.pragma_update(None, "journal_mode", &"wal").ok();
102-
conn.pragma_update(None, "synchronous", &"normal").ok();
106+
conn.pragma_update(None, "journal_mode", "wal").ok();
107+
conn.pragma_update(None, "synchronous", "normal").ok();
103108
conn.execute_batch(SCHEMA)
104109
.context("failed to initialize change queue schema")?;
105110
Ok(Self {
@@ -132,7 +137,7 @@ impl ChangeQueue {
132137
let mut rows = stmt.query([limit as i64])?;
133138
let mut results = Vec::new();
134139
while let Some(row) = rows.next()? {
135-
results.push(row_to_change(&row)?);
140+
results.push(row_to_change(row)?);
136141
}
137142
Ok(results)
138143
}

sqlite-watcher/src/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ impl Watcher for WatcherService {
193193
&self,
194194
request: Request<ListChangesRequest>,
195195
) -> Result<Response<ListChangesResponse>, Status> {
196-
let limit = request.get_ref().limit.max(1).min(10_000) as usize;
196+
let limit = request.get_ref().limit.clamp(1, 10_000) as usize;
197197
let queue = self.queue().map_err(internal_err)?;
198198
let rows = queue.fetch_batch(limit).map_err(internal_err)?;
199199
let changes = rows.into_iter().map(change_to_proto).collect();
Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,38 @@
11
use std::net::SocketAddr;
22
use std::time::Duration;
33

4-
use sqlite_watcher::server::spawn_tcp_server;
5-
#[cfg(unix)]
6-
use sqlite_watcher::server::spawn_unix_server;
4+
use sqlite_watcher::queue::{ChangeOperation, ChangeQueue, NewChange};
5+
use sqlite_watcher::server::spawn_tcp;
76
use sqlite_watcher::watcher_proto::watcher_client::WatcherClient;
8-
use sqlite_watcher::watcher_proto::HealthCheckRequest;
7+
use sqlite_watcher::watcher_proto::{AckChangesRequest, HealthCheckRequest, ListChangesRequest};
98
use tempfile::tempdir;
109
use tokio::time::sleep;
1110
use tonic::metadata::MetadataValue;
1211

12+
fn seed_queue(path: &str) {
13+
let queue = ChangeQueue::open(path).unwrap();
14+
for i in 0..2 {
15+
let change = NewChange {
16+
table_name: "examples".into(),
17+
operation: ChangeOperation::Insert,
18+
primary_key: format!("row-{i}"),
19+
payload: None,
20+
wal_frame: None,
21+
cursor: None,
22+
};
23+
queue.enqueue(&change).unwrap();
24+
}
25+
}
26+
1327
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
14-
async fn health_check_responds_ok() {
28+
async fn tcp_server_handles_health_and_list() {
1529
let dir = tempdir().unwrap();
1630
let queue_path = dir.path().join("queue.db");
17-
let addr: SocketAddr = "127.0.0.1:55051".parse().unwrap();
18-
let token = "secret-token".to_string();
31+
seed_queue(queue_path.to_str().unwrap());
1932

20-
let _handle = spawn_tcp_server(addr, queue_path, token.clone()).unwrap();
33+
let addr: SocketAddr = "127.0.0.1:56060".parse().unwrap();
34+
let token = "secret".to_string();
35+
let _handle = spawn_tcp(addr, queue_path, token.clone()).unwrap();
2136
sleep(Duration::from_millis(200)).await;
2237

2338
let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr))
@@ -26,39 +41,68 @@ async fn health_check_responds_ok() {
2641
.await
2742
.unwrap();
2843
let mut client = WatcherClient::new(channel);
29-
let mut req = tonic::Request::new(HealthCheckRequest {});
44+
45+
let mut health_req = tonic::Request::new(HealthCheckRequest {});
3046
let header = MetadataValue::try_from(format!("Bearer {}", token)).unwrap();
31-
req.metadata_mut().insert("authorization", header);
32-
let resp = client.health_check(req).await.unwrap();
33-
assert_eq!(resp.into_inner().status, "ok");
47+
health_req
48+
.metadata_mut()
49+
.insert("authorization", header.clone());
50+
client.health_check(health_req).await.unwrap();
51+
52+
let mut list_req = tonic::Request::new(ListChangesRequest { limit: 10 });
53+
list_req.metadata_mut().insert("authorization", header);
54+
let resp = client.list_changes(list_req).await.unwrap();
55+
assert_eq!(resp.into_inner().changes.len(), 2);
3456
}
35-
#[cfg(unix)]
36-
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
37-
async fn health_check_over_unix_socket() {
38-
use tokio::net::UnixStream;
39-
use tonic::transport::Endpoint;
40-
use tower::service_fn;
4157

58+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
59+
async fn unauthenticated_requests_fail() {
4260
let dir = tempdir().unwrap();
4361
let queue_path = dir.path().join("queue.db");
44-
let socket_path = dir.path().join("watcher.sock");
45-
let token = "secret-token".to_string();
62+
let addr: SocketAddr = "127.0.0.1:56061".parse().unwrap();
63+
let token = "secret".to_string();
64+
let _handle = spawn_tcp(addr, queue_path, token).unwrap();
65+
sleep(Duration::from_millis(200)).await;
66+
67+
let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr))
68+
.unwrap()
69+
.connect()
70+
.await
71+
.unwrap();
72+
let mut client = WatcherClient::new(channel);
73+
74+
let request = tonic::Request::new(ListChangesRequest { limit: 1 });
75+
let err = client.list_changes(request).await.unwrap_err();
76+
assert_eq!(err.code(), tonic::Code::Unauthenticated);
77+
}
4678

47-
let _handle = spawn_unix_server(&socket_path, queue_path, token.clone()).unwrap();
79+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
80+
async fn ack_changes_advances_queue() {
81+
let dir = tempdir().unwrap();
82+
let queue_path = dir.path().join("queue.db");
83+
seed_queue(queue_path.to_str().unwrap());
84+
let addr: SocketAddr = "127.0.0.1:56062".parse().unwrap();
85+
let token = "secret".to_string();
86+
let _handle = spawn_tcp(addr, queue_path, token.clone()).unwrap();
4887
sleep(Duration::from_millis(200)).await;
4988

50-
let endpoint = Endpoint::try_from("http://[::]:50051").unwrap();
51-
let channel = endpoint
52-
.connect_with_connector(service_fn(move |_: tonic::transport::Uri| {
53-
let path = socket_path.clone();
54-
async move { UnixStream::connect(path).await }
55-
}))
89+
let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr))
90+
.unwrap()
91+
.connect()
5692
.await
5793
.unwrap();
5894
let mut client = WatcherClient::new(channel);
59-
let mut req = tonic::Request::new(HealthCheckRequest {});
6095
let header = MetadataValue::try_from(format!("Bearer {}", token)).unwrap();
61-
req.metadata_mut().insert("authorization", header);
62-
let resp = client.health_check(req).await.unwrap();
63-
assert_eq!(resp.into_inner().status, "ok");
96+
97+
let mut req = tonic::Request::new(ListChangesRequest { limit: 10 });
98+
req.metadata_mut().insert("authorization", header.clone());
99+
let resp = client.list_changes(req).await.unwrap().into_inner();
100+
assert_eq!(resp.changes.len(), 2);
101+
let highest = resp.changes.last().unwrap().change_id;
102+
103+
let mut ack_req = tonic::Request::new(AckChangesRequest {
104+
up_to_change_id: highest,
105+
});
106+
ack_req.metadata_mut().insert("authorization", header);
107+
client.ack_changes(ack_req).await.unwrap();
64108
}

src/commands/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
pub mod init;
55
pub mod status;
66
pub mod sync;
7+
pub mod sync_sqlite;
78
pub mod target;
89
pub mod validate;
910
pub mod verify;

0 commit comments

Comments
 (0)