Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions PR_BODY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## Summary
- fix sqlite sync change-state handling by treating watcher wal_frame/cursor fields as optional strings and cleaning up unused code
- implement FromStr for sqlite ChangeOperation and resolve needless borrow lints in queue/server modules
- keep clippy happy by applying the suggested clamp change and ensuring proto tests build

## Testing
- cargo clippy
- cargo test
1 change: 1 addition & 0 deletions sqlite-watcher-docs/README-SQLite.md
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ No. The tool uses `SQLITE_OPEN_READ_ONLY` which allows concurrent readers. Other
For issues or questions:
- **GitHub Issues**: https://github.com/serenorg/database-replicator/issues
- **Email**: [email protected]
<<<<<<< HEAD:README-SQLite.md
## Delta replication with sqlite-watcher

Once you have completed the initial snapshot (`database-replicator init --source sqlite ...`), you can switch to incremental change capture:
Expand Down
13 changes: 9 additions & 4 deletions sqlite-watcher/src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fs;
use std::path::{Path, PathBuf};
use std::str::FromStr;

use anyhow::{anyhow, Context, Result};
use rusqlite::{params, Connection, OptionalExtension, Row};
Expand Down Expand Up @@ -41,8 +42,12 @@ impl ChangeOperation {
ChangeOperation::Delete => "delete",
}
}
}

impl FromStr for ChangeOperation {
type Err = anyhow::Error;

pub fn from_str(value: &str) -> Result<Self> {
fn from_str(value: &str) -> Result<Self, Self::Err> {
match value {
"insert" => Ok(ChangeOperation::Insert),
"update" => Ok(ChangeOperation::Update),
Expand Down Expand Up @@ -98,8 +103,8 @@ impl ChangeQueue {
}
let conn = Connection::open(path)
.with_context(|| format!("failed to open queue database {}", path.display()))?;
conn.pragma_update(None, "journal_mode", &"wal").ok();
conn.pragma_update(None, "synchronous", &"normal").ok();
conn.pragma_update(None, "journal_mode", "wal").ok();
conn.pragma_update(None, "synchronous", "normal").ok();
conn.execute_batch(SCHEMA)
.context("failed to initialize change queue schema")?;
Ok(Self {
Expand Down Expand Up @@ -132,7 +137,7 @@ impl ChangeQueue {
let mut rows = stmt.query([limit as i64])?;
let mut results = Vec::new();
while let Some(row) = rows.next()? {
results.push(row_to_change(&row)?);
results.push(row_to_change(row)?);
}
Ok(results)
}
Expand Down
2 changes: 1 addition & 1 deletion sqlite-watcher/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl Watcher for WatcherService {
&self,
request: Request<ListChangesRequest>,
) -> Result<Response<ListChangesResponse>, Status> {
let limit = request.get_ref().limit.max(1).min(10_000) as usize;
let limit = request.get_ref().limit.clamp(1, 10_000) as usize;
let queue = self.queue().map_err(internal_err)?;
let rows = queue.fetch_batch(limit).map_err(internal_err)?;
let changes = rows.into_iter().map(change_to_proto).collect();
Expand Down
106 changes: 75 additions & 31 deletions sqlite-watcher/tests/server_tests.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,38 @@
use std::net::SocketAddr;
use std::time::Duration;

use sqlite_watcher::server::spawn_tcp_server;
#[cfg(unix)]
use sqlite_watcher::server::spawn_unix_server;
use sqlite_watcher::queue::{ChangeOperation, ChangeQueue, NewChange};
use sqlite_watcher::server::spawn_tcp;
use sqlite_watcher::watcher_proto::watcher_client::WatcherClient;
use sqlite_watcher::watcher_proto::HealthCheckRequest;
use sqlite_watcher::watcher_proto::{AckChangesRequest, HealthCheckRequest, ListChangesRequest};
use tempfile::tempdir;
use tokio::time::sleep;
use tonic::metadata::MetadataValue;

fn seed_queue(path: &str) {
let queue = ChangeQueue::open(path).unwrap();
for i in 0..2 {
let change = NewChange {
table_name: "examples".into(),
operation: ChangeOperation::Insert,
primary_key: format!("row-{i}"),
payload: None,
wal_frame: None,
cursor: None,
};
queue.enqueue(&change).unwrap();
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn health_check_responds_ok() {
async fn tcp_server_handles_health_and_list() {
let dir = tempdir().unwrap();
let queue_path = dir.path().join("queue.db");
let addr: SocketAddr = "127.0.0.1:55051".parse().unwrap();
let token = "secret-token".to_string();
seed_queue(queue_path.to_str().unwrap());

let _handle = spawn_tcp_server(addr, queue_path, token.clone()).unwrap();
let addr: SocketAddr = "127.0.0.1:56060".parse().unwrap();
let token = "secret".to_string();
let _handle = spawn_tcp(addr, queue_path, token.clone()).unwrap();
sleep(Duration::from_millis(200)).await;

let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr))
Expand All @@ -26,39 +41,68 @@ async fn health_check_responds_ok() {
.await
.unwrap();
let mut client = WatcherClient::new(channel);
let mut req = tonic::Request::new(HealthCheckRequest {});

let mut health_req = tonic::Request::new(HealthCheckRequest {});
let header = MetadataValue::try_from(format!("Bearer {}", token)).unwrap();
req.metadata_mut().insert("authorization", header);
let resp = client.health_check(req).await.unwrap();
assert_eq!(resp.into_inner().status, "ok");
health_req
.metadata_mut()
.insert("authorization", header.clone());
client.health_check(health_req).await.unwrap();

let mut list_req = tonic::Request::new(ListChangesRequest { limit: 10 });
list_req.metadata_mut().insert("authorization", header);
let resp = client.list_changes(list_req).await.unwrap();
assert_eq!(resp.into_inner().changes.len(), 2);
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn health_check_over_unix_socket() {
use tokio::net::UnixStream;
use tonic::transport::Endpoint;
use tower::service_fn;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unauthenticated_requests_fail() {
let dir = tempdir().unwrap();
let queue_path = dir.path().join("queue.db");
let socket_path = dir.path().join("watcher.sock");
let token = "secret-token".to_string();
let addr: SocketAddr = "127.0.0.1:56061".parse().unwrap();
let token = "secret".to_string();
let _handle = spawn_tcp(addr, queue_path, token).unwrap();
sleep(Duration::from_millis(200)).await;

let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr))
.unwrap()
.connect()
.await
.unwrap();
let mut client = WatcherClient::new(channel);

let request = tonic::Request::new(ListChangesRequest { limit: 1 });
let err = client.list_changes(request).await.unwrap_err();
assert_eq!(err.code(), tonic::Code::Unauthenticated);
}

let _handle = spawn_unix_server(&socket_path, queue_path, token.clone()).unwrap();
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ack_changes_advances_queue() {
let dir = tempdir().unwrap();
let queue_path = dir.path().join("queue.db");
seed_queue(queue_path.to_str().unwrap());
let addr: SocketAddr = "127.0.0.1:56062".parse().unwrap();
let token = "secret".to_string();
let _handle = spawn_tcp(addr, queue_path, token.clone()).unwrap();
sleep(Duration::from_millis(200)).await;

let endpoint = Endpoint::try_from("http://[::]:50051").unwrap();
let channel = endpoint
.connect_with_connector(service_fn(move |_: tonic::transport::Uri| {
let path = socket_path.clone();
async move { UnixStream::connect(path).await }
}))
let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr))
.unwrap()
.connect()
.await
.unwrap();
let mut client = WatcherClient::new(channel);
let mut req = tonic::Request::new(HealthCheckRequest {});
let header = MetadataValue::try_from(format!("Bearer {}", token)).unwrap();
req.metadata_mut().insert("authorization", header);
let resp = client.health_check(req).await.unwrap();
assert_eq!(resp.into_inner().status, "ok");

let mut req = tonic::Request::new(ListChangesRequest { limit: 10 });
req.metadata_mut().insert("authorization", header.clone());
let resp = client.list_changes(req).await.unwrap().into_inner();
assert_eq!(resp.changes.len(), 2);
let highest = resp.changes.last().unwrap().change_id;

let mut ack_req = tonic::Request::new(AckChangesRequest {
up_to_change_id: highest,
});
ack_req.metadata_mut().insert("authorization", header);
client.ack_changes(ack_req).await.unwrap();
}
1 change: 1 addition & 0 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
pub mod init;
pub mod status;
pub mod sync;
pub mod sync_sqlite;
pub mod target;
pub mod validate;
pub mod verify;
Expand Down
Loading
Loading