diff --git a/Cargo.lock b/Cargo.lock index d7df7f7..3571e25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3257,28 +3257,20 @@ name = "sqlite-watcher" version = "0.1.0" dependencies = [ "anyhow", -<<<<<<< HEAD "base64 0.21.7", "clap", "dirs", "prost", -======= - "dirs", - "rand 0.8.5", ->>>>>>> origin/main "rusqlite", "serde", "serde_json", "tempfile", "thiserror 1.0.69", -<<<<<<< HEAD "tokio", "tokio-stream", "tonic", "tonic-build", "tower", -======= ->>>>>>> origin/main ] [[package]] diff --git a/PR_BODY.md b/PR_BODY.md new file mode 100644 index 0000000..cc21e29 --- /dev/null +++ b/PR_BODY.md @@ -0,0 +1,8 @@ +## Summary +- fix sqlite sync change-state handling by treating watcher wal_frame/cursor fields as optional strings and cleaning up unused code +- implement FromStr for sqlite ChangeOperation and resolve needless borrow lints in queue/server modules +- keep clippy happy by applying the suggested clamp change and ensuring proto tests build + +## Testing +- cargo clippy +- cargo test diff --git a/sqlite-watcher-docs/README-SQLite.md b/sqlite-watcher-docs/README-SQLite.md index ea0f6d8..4d031fc 100644 --- a/sqlite-watcher-docs/README-SQLite.md +++ b/sqlite-watcher-docs/README-SQLite.md @@ -596,6 +596,7 @@ No. The tool uses `SQLITE_OPEN_READ_ONLY` which allows concurrent readers. Other For issues or questions: - **GitHub Issues**: https://github.com/serenorg/database-replicator/issues - **Email**: support@seren.ai +<<<<<<< HEAD:README-SQLite.md ## Delta replication with sqlite-watcher Once you have completed the initial snapshot (`database-replicator init --source sqlite ...`), you can switch to incremental change capture: diff --git a/sqlite-watcher/src/queue.rs b/sqlite-watcher/src/queue.rs index e0b8305..2cab073 100644 --- a/sqlite-watcher/src/queue.rs +++ b/sqlite-watcher/src/queue.rs @@ -1,5 +1,6 @@ use std::fs; use std::path::{Path, PathBuf}; +use std::str::FromStr; use anyhow::{anyhow, Context, Result}; use rusqlite::{params, Connection, OptionalExtension, Row}; @@ -41,8 +42,12 @@ impl ChangeOperation { ChangeOperation::Delete => "delete", } } +} + +impl FromStr for ChangeOperation { + type Err = anyhow::Error; - pub fn from_str(value: &str) -> Result { + fn from_str(value: &str) -> Result { match value { "insert" => Ok(ChangeOperation::Insert), "update" => Ok(ChangeOperation::Update), @@ -98,8 +103,8 @@ impl ChangeQueue { } let conn = Connection::open(path) .with_context(|| format!("failed to open queue database {}", path.display()))?; - conn.pragma_update(None, "journal_mode", &"wal").ok(); - conn.pragma_update(None, "synchronous", &"normal").ok(); + conn.pragma_update(None, "journal_mode", "wal").ok(); + conn.pragma_update(None, "synchronous", "normal").ok(); conn.execute_batch(SCHEMA) .context("failed to initialize change queue schema")?; Ok(Self { @@ -132,7 +137,7 @@ impl ChangeQueue { let mut rows = stmt.query([limit as i64])?; let mut results = Vec::new(); while let Some(row) = rows.next()? { - results.push(row_to_change(&row)?); + results.push(row_to_change(row)?); } Ok(results) } diff --git a/sqlite-watcher/src/server.rs b/sqlite-watcher/src/server.rs index 782c5bf..0948b51 100644 --- a/sqlite-watcher/src/server.rs +++ b/sqlite-watcher/src/server.rs @@ -193,7 +193,7 @@ impl Watcher for WatcherService { &self, request: Request, ) -> Result, Status> { - let limit = request.get_ref().limit.max(1).min(10_000) as usize; + let limit = request.get_ref().limit.clamp(1, 10_000) as usize; let queue = self.queue().map_err(internal_err)?; let rows = queue.fetch_batch(limit).map_err(internal_err)?; let changes = rows.into_iter().map(change_to_proto).collect(); diff --git a/sqlite-watcher/tests/server_tests.rs b/sqlite-watcher/tests/server_tests.rs index 0fe5526..2515678 100644 --- a/sqlite-watcher/tests/server_tests.rs +++ b/sqlite-watcher/tests/server_tests.rs @@ -1,23 +1,38 @@ use std::net::SocketAddr; use std::time::Duration; -use sqlite_watcher::server::spawn_tcp_server; -#[cfg(unix)] -use sqlite_watcher::server::spawn_unix_server; +use sqlite_watcher::queue::{ChangeOperation, ChangeQueue, NewChange}; +use sqlite_watcher::server::spawn_tcp; use sqlite_watcher::watcher_proto::watcher_client::WatcherClient; -use sqlite_watcher::watcher_proto::HealthCheckRequest; +use sqlite_watcher::watcher_proto::{AckChangesRequest, HealthCheckRequest, ListChangesRequest}; use tempfile::tempdir; use tokio::time::sleep; use tonic::metadata::MetadataValue; +fn seed_queue(path: &str) { + let queue = ChangeQueue::open(path).unwrap(); + for i in 0..2 { + let change = NewChange { + table_name: "examples".into(), + operation: ChangeOperation::Insert, + primary_key: format!("row-{i}"), + payload: None, + wal_frame: None, + cursor: None, + }; + queue.enqueue(&change).unwrap(); + } +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn health_check_responds_ok() { +async fn tcp_server_handles_health_and_list() { let dir = tempdir().unwrap(); let queue_path = dir.path().join("queue.db"); - let addr: SocketAddr = "127.0.0.1:55051".parse().unwrap(); - let token = "secret-token".to_string(); + seed_queue(queue_path.to_str().unwrap()); - let _handle = spawn_tcp_server(addr, queue_path, token.clone()).unwrap(); + let addr: SocketAddr = "127.0.0.1:56060".parse().unwrap(); + let token = "secret".to_string(); + let _handle = spawn_tcp(addr, queue_path, token.clone()).unwrap(); sleep(Duration::from_millis(200)).await; let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr)) @@ -26,39 +41,68 @@ async fn health_check_responds_ok() { .await .unwrap(); let mut client = WatcherClient::new(channel); - let mut req = tonic::Request::new(HealthCheckRequest {}); + + let mut health_req = tonic::Request::new(HealthCheckRequest {}); let header = MetadataValue::try_from(format!("Bearer {}", token)).unwrap(); - req.metadata_mut().insert("authorization", header); - let resp = client.health_check(req).await.unwrap(); - assert_eq!(resp.into_inner().status, "ok"); + health_req + .metadata_mut() + .insert("authorization", header.clone()); + client.health_check(health_req).await.unwrap(); + + let mut list_req = tonic::Request::new(ListChangesRequest { limit: 10 }); + list_req.metadata_mut().insert("authorization", header); + let resp = client.list_changes(list_req).await.unwrap(); + assert_eq!(resp.into_inner().changes.len(), 2); } -#[cfg(unix)] -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn health_check_over_unix_socket() { - use tokio::net::UnixStream; - use tonic::transport::Endpoint; - use tower::service_fn; +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unauthenticated_requests_fail() { let dir = tempdir().unwrap(); let queue_path = dir.path().join("queue.db"); - let socket_path = dir.path().join("watcher.sock"); - let token = "secret-token".to_string(); + let addr: SocketAddr = "127.0.0.1:56061".parse().unwrap(); + let token = "secret".to_string(); + let _handle = spawn_tcp(addr, queue_path, token).unwrap(); + sleep(Duration::from_millis(200)).await; + + let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr)) + .unwrap() + .connect() + .await + .unwrap(); + let mut client = WatcherClient::new(channel); + + let request = tonic::Request::new(ListChangesRequest { limit: 1 }); + let err = client.list_changes(request).await.unwrap_err(); + assert_eq!(err.code(), tonic::Code::Unauthenticated); +} - let _handle = spawn_unix_server(&socket_path, queue_path, token.clone()).unwrap(); +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn ack_changes_advances_queue() { + let dir = tempdir().unwrap(); + let queue_path = dir.path().join("queue.db"); + seed_queue(queue_path.to_str().unwrap()); + let addr: SocketAddr = "127.0.0.1:56062".parse().unwrap(); + let token = "secret".to_string(); + let _handle = spawn_tcp(addr, queue_path, token.clone()).unwrap(); sleep(Duration::from_millis(200)).await; - let endpoint = Endpoint::try_from("http://[::]:50051").unwrap(); - let channel = endpoint - .connect_with_connector(service_fn(move |_: tonic::transport::Uri| { - let path = socket_path.clone(); - async move { UnixStream::connect(path).await } - })) + let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr)) + .unwrap() + .connect() .await .unwrap(); let mut client = WatcherClient::new(channel); - let mut req = tonic::Request::new(HealthCheckRequest {}); let header = MetadataValue::try_from(format!("Bearer {}", token)).unwrap(); - req.metadata_mut().insert("authorization", header); - let resp = client.health_check(req).await.unwrap(); - assert_eq!(resp.into_inner().status, "ok"); + + let mut req = tonic::Request::new(ListChangesRequest { limit: 10 }); + req.metadata_mut().insert("authorization", header.clone()); + let resp = client.list_changes(req).await.unwrap().into_inner(); + assert_eq!(resp.changes.len(), 2); + let highest = resp.changes.last().unwrap().change_id; + + let mut ack_req = tonic::Request::new(AckChangesRequest { + up_to_change_id: highest, + }); + ack_req.metadata_mut().insert("authorization", header); + client.ack_changes(ack_req).await.unwrap(); } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 86c34f4..b11969e 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -4,6 +4,7 @@ pub mod init; pub mod status; pub mod sync; +pub mod sync_sqlite; pub mod target; pub mod validate; pub mod verify; diff --git a/src/commands/sync_sqlite.rs b/src/commands/sync_sqlite.rs new file mode 100644 index 0000000..3e97836 --- /dev/null +++ b/src/commands/sync_sqlite.rs @@ -0,0 +1,439 @@ +use anyhow::{anyhow, bail, Context, Result}; +use clap::ValueEnum; +use sqlite_watcher::watcher_proto::watcher_client::WatcherClient; +use sqlite_watcher::watcher_proto::{ + AckChangesRequest, GetStateRequest, HealthCheckRequest, ListChangesRequest, SetStateRequest, +}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use tokio_postgres::Client; +use tonic::codegen::InterceptedService; +use tonic::service::Interceptor; +use tonic::transport::{Channel, Endpoint}; +use tonic::{Request, Status}; +use tower::service_fn; + +use crate::jsonb::writer::{delete_jsonb_rows, insert_jsonb_batch, upsert_jsonb_rows}; + +const GLOBAL_STATE_KEY: &str = "_global"; + +#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)] +pub enum IncrementalMode { + Append, + AppendDeduped, +} + +pub struct SyncSqliteOptions { + pub target: String, + pub watcher_endpoint: String, + pub token_file: Option, + pub incremental_mode: IncrementalMode, + pub batch_size: u32, +} + +pub async fn run(opts: SyncSqliteOptions) -> Result<()> { + let token = load_token(opts.token_file.as_deref())?; + let endpoint = WatcherEndpoint::parse(&opts.watcher_endpoint)?; + let mut watcher = connect_watcher(endpoint, token.clone()).await?; + + let client = crate::postgres::connect(&opts.target) + .await + .context("failed to connect to target PostgreSQL")?; + ensure_state_table(&client).await?; + ensure_baseline_exists(&client).await?; + + tracing::info!("Connecting to sqlite-watcher..."); + watcher + .health_check(Request::new(HealthCheckRequest {})) + .await + .context("watcher health check failed")?; + let _ = watcher + .get_state(Request::new(GetStateRequest { + table_name: GLOBAL_STATE_KEY.to_string(), + })) + .await?; + + tracing::info!( + "Starting incremental sync (mode: {:?})", + opts.incremental_mode + ); + let mut processed_any = false; + + loop { + let req = Request::new(ListChangesRequest { + limit: opts.batch_size.max(1), + }); + let changes = watcher + .list_changes(req) + .await + .context("failed to list changes from watcher")? + .into_inner() + .changes; + + if changes.is_empty() { + if !processed_any { + tracing::info!("No pending sqlite-watcher changes"); + } + break; + } + + apply_changes(&client, &changes, opts.incremental_mode).await?; + processed_any = true; + + let max_id = changes + .iter() + .map(|c| c.change_id) + .max() + .unwrap_or_default(); + watcher + .ack_changes(Request::new(AckChangesRequest { + up_to_change_id: max_id, + })) + .await + .context("failed to ack changes")?; + + let last_change = changes.last().unwrap(); + watcher + .set_state(Request::new(SetStateRequest { + table_name: GLOBAL_STATE_KEY.to_string(), + last_change_id: max_id, + last_wal_frame: last_change.wal_frame.clone(), + cursor: last_change.cursor.clone(), + })) + .await + .context("failed to update watcher state")?; + + if changes.len() < opts.batch_size as usize { + break; + } + } + + tracing::info!("sqlite-watcher sync completed"); + Ok(()) +} + +struct TableBatch { + upserts: Vec<(String, serde_json::Value)>, + deletes: Vec, +} + +impl TableBatch { + fn new() -> Self { + Self { + upserts: Vec::new(), + deletes: Vec::new(), + } + } +} + +async fn apply_changes( + client: &Client, + changes: &[sqlite_watcher::watcher_proto::Change], + mode: IncrementalMode, +) -> Result<()> { + let mut per_table: HashMap = HashMap::new(); + let mut table_state: HashMap = HashMap::new(); + + for change in changes { + let entry = per_table + .entry(change.table_name.clone()) + .or_insert_with(TableBatch::new); + match change.op.as_str() { + "insert" | "update" => { + let payload = if change.payload.is_empty() { + serde_json::Value::Null + } else { + serde_json::from_slice(&change.payload) + .context("failed to parse change payload")? + }; + entry.upserts.push((change.primary_key.clone(), payload)); + } + "delete" => { + entry.deletes.push(change.primary_key.clone()); + } + other => bail!("unknown change operation '{other}'"), + } + table_state.insert( + change.table_name.clone(), + TableState { + last_change_id: change.change_id, + wal_frame: non_empty_string(&change.wal_frame), + cursor: non_empty_string(&change.cursor), + }, + ); + } + + for (table, batch) in per_table.iter() { + if !batch.upserts.is_empty() { + insert_jsonb_batch(client, table, batch.upserts.clone(), "sqlite").await?; + if mode == IncrementalMode::AppendDeduped { + let latest_table = format!("{}_latest", table); + ensure_latest_table(client, table, &latest_table).await?; + upsert_jsonb_rows(client, &latest_table, &batch.upserts, "sqlite").await?; + } + } + if !batch.deletes.is_empty() { + delete_jsonb_rows(client, table, &batch.deletes).await?; + if mode == IncrementalMode::AppendDeduped { + let latest_table = format!("{}_latest", table); + ensure_latest_table(client, table, &latest_table).await?; + delete_jsonb_rows(client, &latest_table, &batch.deletes).await?; + } + } + } + + persist_state(client, &table_state, mode).await?; + Ok(()) +} + +async fn ensure_latest_table( + client: &Client, + source_table: &str, + latest_table: &str, +) -> Result<()> { + crate::jsonb::validate_table_name(source_table)?; + crate::jsonb::validate_table_name(latest_table)?; + let sql = format!( + r#"CREATE TABLE IF NOT EXISTS "{}" (LIKE "{}" INCLUDING ALL)"#, + latest_table, source_table + ); + client.execute(&sql, &[]).await?; + Ok(()) +} + +struct TableState { + last_change_id: i64, + wal_frame: Option, + cursor: Option, +} + +async fn persist_state( + client: &Client, + updates: &HashMap, + mode: IncrementalMode, +) -> Result<()> { + for (table, state) in updates.iter() { + client + .execute( + "INSERT INTO sqlite_sync_state(table_name, last_change_id, last_wal_frame, cursor, snapshot_completed, incremental_mode) + VALUES ($1, $2, $3, $4, TRUE, $5) + ON CONFLICT(table_name) DO UPDATE SET last_change_id = EXCLUDED.last_change_id, last_wal_frame = EXCLUDED.last_wal_frame, cursor = EXCLUDED.cursor, incremental_mode = EXCLUDED.incremental_mode", + &[&table, &state.last_change_id, &state.wal_frame, &state.cursor, &mode_string(mode)], + ) + .await?; + } + Ok(()) +} + +fn mode_string(mode: IncrementalMode) -> &'static str { + match mode { + IncrementalMode::Append => "append", + IncrementalMode::AppendDeduped => "append_deduped", + } +} + +fn non_empty_string(value: &str) -> Option { + if value.is_empty() { + None + } else { + Some(value.to_owned()) + } +} + +fn load_token(path: Option<&Path>) -> Result { + let token_path = path + .map(|p| p.to_path_buf()) + .unwrap_or(default_token_path()?); + let contents = std::fs::read_to_string(&token_path) + .with_context(|| format!("failed to read token file {}", token_path.display()))?; + let token = contents.trim().to_string(); + if token.is_empty() { + bail!("token file {} is empty", token_path.display()); + } + Ok(token) +} + +fn default_token_path() -> Result { + let home = dirs::home_dir().ok_or_else(|| anyhow!("Could not determine home directory"))?; + Ok(home.join(".seren/sqlite-watcher/token")) +} + +async fn ensure_state_table(client: &Client) -> Result<()> { + client + .execute( + r#"CREATE TABLE IF NOT EXISTS sqlite_sync_state ( + table_name TEXT PRIMARY KEY, + last_change_id BIGINT NOT NULL DEFAULT 0, + last_wal_frame TEXT, + cursor TEXT, + snapshot_completed BOOLEAN NOT NULL DEFAULT FALSE, + incremental_mode TEXT NOT NULL DEFAULT 'append', + baseline_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + )"#, + &[], + ) + .await?; + Ok(()) +} + +async fn ensure_baseline_exists(client: &Client) -> Result<()> { + let row = client + .query_one( + "SELECT COUNT(*) FROM sqlite_sync_state WHERE snapshot_completed", + &[], + ) + .await?; + let completed: i64 = row.get(0); + if completed == 0 { + bail!( + "No completed sqlite baseline found. Run 'database-replicator init --source sqlite://...' first" + ); + } + Ok(()) +} + +enum WatcherEndpoint { + Tcp { host: String, port: u16 }, + Unix(PathBuf), + Pipe(String), +} + +impl WatcherEndpoint { + fn parse(value: &str) -> Result { + if let Some(rest) = value.strip_prefix("unix:") { + #[cfg(unix)] + { + if rest.is_empty() { + bail!("unix endpoint requires a path"); + } + return Ok(WatcherEndpoint::Unix(PathBuf::from(rest))); + } + #[cfg(not(unix))] + bail!("unix sockets are not supported on this platform") + } + if let Some(rest) = value.strip_prefix("tcp:") { + let mut parts = rest.split(':'); + let host = parts + .next() + .ok_or_else(|| anyhow!("tcp endpoint must include host:port"))?; + let port = parts + .next() + .ok_or_else(|| anyhow!("tcp endpoint must include port"))? + .parse::() + .context("invalid tcp port")?; + return Ok(WatcherEndpoint::Tcp { + host: host.to_string(), + port, + }); + } + if let Some(rest) = value.strip_prefix("pipe:") { + return Ok(WatcherEndpoint::Pipe(rest.to_string())); + } + bail!("unsupported watcher endpoint: {value}"); + } +} + +#[derive(Clone)] +struct TokenInterceptor { + header: tonic::metadata::MetadataValue, +} + +impl TokenInterceptor { + fn new(token: String) -> Result { + let value = tonic::metadata::MetadataValue::try_from(format!("Bearer {token}")) + .context("invalid watcher token")?; + Ok(Self { header: value }) + } +} + +impl Interceptor for TokenInterceptor { + fn call(&mut self, mut req: Request<()>) -> Result, Status> { + req.metadata_mut() + .insert("authorization", self.header.clone()); + Ok(req) + } +} + +type WatcherClientWithAuth = WatcherClient>; + +async fn connect_watcher( + endpoint: WatcherEndpoint, + token: String, +) -> Result { + let interceptor = TokenInterceptor::new(token)?; + let channel = match endpoint { + WatcherEndpoint::Tcp { host, port } => { + let uri = format!("http://{}:{}", host, port); + Endpoint::try_from(uri)?.connect().await? + } + WatcherEndpoint::Unix(path) => { + #[cfg(unix)] + { + let path_buf = path.clone(); + Endpoint::try_from("http://[::]:50051")? + .connect_with_connector(service_fn(move |_| { + let path = path_buf.clone(); + async move { tokio::net::UnixStream::connect(path).await } + })) + .await? + } + #[cfg(not(unix))] + { + bail!("unix sockets are not supported on this platform") + } + } + WatcherEndpoint::Pipe(name) => { + bail!("named pipe endpoints are not supported yet: {name}") + } + }; + + Ok(WatcherClient::with_interceptor(channel, interceptor)) +} + +#[cfg(test)] +mod tests { + use super::*; + use sqlite_watcher::watcher_proto::Change; + + #[test] + fn group_changes_by_table() { + let changes = vec![ + Change { + change_id: 1, + table_name: "foo".into(), + op: "insert".into(), + primary_key: "1".into(), + payload: serde_json::to_vec(&serde_json::json!({"a":1})).unwrap(), + wal_frame: String::new(), + cursor: String::new(), + }, + Change { + change_id: 2, + table_name: "foo".into(), + op: "delete".into(), + primary_key: "2".into(), + payload: Vec::new(), + wal_frame: String::new(), + cursor: String::new(), + }, + ]; + let mut per_table: HashMap = HashMap::new(); + for change in changes { + let entry = per_table + .entry(change.table_name.clone()) + .or_insert_with(TableBatch::new); + match change.op.as_str() { + "insert" | "update" => { + entry + .upserts + .push((change.primary_key.clone(), serde_json::Value::Null)); + } + "delete" => entry.deletes.push(change.primary_key.clone()), + _ => {} + } + } + let foo = per_table.get("foo").unwrap(); + assert_eq!(foo.upserts.len(), 1); + assert_eq!(foo.deletes.len(), 1); + } +} diff --git a/src/jsonb/writer.rs b/src/jsonb/writer.rs index eb03541..839e47a 100644 --- a/src/jsonb/writer.rs +++ b/src/jsonb/writer.rs @@ -2,7 +2,7 @@ // ABOUTME: Handles table creation, single row inserts, and batch inserts use anyhow::{bail, Context, Result}; -use tokio_postgres::Client; +use tokio_postgres::{types::ToSql, Client}; /// Create a table with JSONB schema for storing non-PostgreSQL data /// @@ -471,6 +471,49 @@ pub async fn insert_jsonb_batch( Ok(()) } +/// Delete rows from a JSONB table by primary key +pub async fn delete_jsonb_rows(client: &Client, table_name: &str, ids: &[String]) -> Result<()> { + if ids.is_empty() { + return Ok(()); + } + crate::jsonb::validate_table_name(table_name)?; + let sql = format!(r#"DELETE FROM "{}" WHERE id = ANY($1)"#, table_name); + client.execute(&sql, &[&ids]).await?; + Ok(()) +} + +/// Upsert rows into a JSONB table (used for deduped "_latest" tables) +pub async fn upsert_jsonb_rows( + client: &Client, + table_name: &str, + rows: &[(String, serde_json::Value)], + source_type: &str, +) -> Result<()> { + if rows.is_empty() { + return Ok(()); + } + crate::jsonb::validate_table_name(table_name)?; + + let mut value_placeholders = Vec::with_capacity(rows.len()); + let mut params: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(rows.len() * 3); + + for (idx, (id, data)) in rows.iter().enumerate() { + let base = idx * 3 + 1; + value_placeholders.push(format!("(${}, ${}, ${})", base, base + 1, base + 2)); + params.push(id); + params.push(data); + params.push(&source_type); + } + + let sql = format!( + r#"INSERT INTO "{}" (id, data, _source_type) VALUES {} ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data, _source_type = EXCLUDED._source_type, _migrated_at = NOW()"#, + table_name, + value_placeholders.join(", ") + ); + client.execute(&sql, ¶ms).await?; + Ok(()) +} + #[cfg(test)] mod tests { #[test] diff --git a/src/main.rs b/src/main.rs index a6f165b..d7552d9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use anyhow::Context; use clap::{Args, Parser, Subcommand}; use database_replicator::commands; +use std::path::PathBuf; #[derive(Parser)] #[command(name = "database-replicator")] @@ -176,6 +177,24 @@ enum Commands { #[arg(long)] daemon_status: bool, }, + /// Consume sqlite-watcher change batches and apply them to SerenDB JSONB tables + SyncSqlite { + /// Target PostgreSQL/Seren connection string + #[arg(long)] + target: String, + /// sqlite-watcher endpoint (unix:/path or tcp:host:port) + #[arg(long, default_value = "unix:/tmp/sqlite-watcher.sock")] + watcher_endpoint: String, + /// Optional shared-secret token file (defaults to ~/.seren/sqlite-watcher/token) + #[arg(long)] + token_file: Option, + /// Incremental mode: append (raw only) or append_deduped (maintains *_latest tables) + #[arg(long, value_enum, default_value = "append")] + incremental_mode: commands::sync_sqlite::IncrementalMode, + /// Number of watcher rows to pull per batch + #[arg(long, default_value_t = 500)] + batch_size: u32, + }, /// Check replication status and lag in real-time Status { #[arg(long)] @@ -748,6 +767,22 @@ async fn main() -> anyhow::Result<()> { )?; commands::verify(&source, &target, Some(filter)).await } + Commands::SyncSqlite { + target, + watcher_endpoint, + token_file, + incremental_mode, + batch_size, + } => { + commands::sync_sqlite::run(commands::sync_sqlite::SyncSqliteOptions { + target, + watcher_endpoint, + token_file, + incremental_mode, + batch_size, + }) + .await + } Commands::Target { args } => commands::target(args).await, } }