Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 9 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 13 additions & 6 deletions sqlite-watcher/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
# sqlite-watcher (alpha)
# sqlite-watcher

This crate currently ships the shared queue + gRPC server used by `database-replicator sync-sqlite`. The `sqlite-watcher` binary includes:
This crate provides the building blocks for an upcoming sqlite-watcher binary. Issue #82 adds a durable queue plus a tonic-based gRPC server so other components can stream captured SQLite changes.

- `serve`: start the queue-backed gRPC API so clients can pull change batches.
- `enqueue`: helper for tests/smoke scripts to add sample changes to the queue database.
## Components

> **Note:** WAL tailing is still under active development; use the binary today to test queue + sync flows.
- `queue.rs`: stores change rows and per-table checkpoints in `~/.seren/sqlite-watcher/changes.db`.
- `proto/watcher.proto`: RPC definitions (`HealthCheck`, `ListChanges`, `AckChanges`, `GetState`, `SetState`).
- `server.rs`: tonic server wrappers exposing the queue over TCP or Unix sockets with shared-secret authentication.

See `sqlite-watcher-docs/installers.md` for per-OS service guidance and `scripts/test-sqlite-delta.sh` for the end-to-end smoke test.
## Building & Testing

```bash
cargo test -p sqlite-watcher
```

The tests cover queue durability/state behavior. Server tests will be added once the consumer wiring lands.
67 changes: 29 additions & 38 deletions sqlite-watcher/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use tokio::runtime::Builder;
use tokio::sync::oneshot;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::service::Interceptor;
use tonic::transport::Server;
use tonic::{Request, Response, Status};
use tonic::{transport::Server, Request, Response, Status};

#[cfg(unix)]
use tokio::net::UnixListener;
Expand Down Expand Up @@ -67,28 +66,26 @@ impl Drop for ServerHandle {
}

pub fn spawn_tcp(addr: SocketAddr, queue_path: PathBuf, token: String) -> Result<ServerHandle> {
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let (tx, rx) = oneshot::channel();
let thread = thread::spawn(move || -> Result<()> {
let runtime = Builder::new_multi_thread().enable_all().build()?;
runtime.block_on(async move {
let rt = Builder::new_multi_thread().enable_all().build()?;
rt.block_on(async move {
let listener = tokio::net::TcpListener::bind(addr)
.await
.context("failed to bind tcp listener")?;
let queue_path = Arc::new(queue_path);
let svc = WatcherService::new(queue_path);
let service = WatcherService::new(queue_path);
let interceptor = AuthInterceptor::new(token);
Server::builder()
.add_service(WatcherServer::with_interceptor(svc, interceptor))
.add_service(WatcherServer::with_interceptor(service, interceptor))
.serve_with_incoming_shutdown(TcpListenerStream::new(listener), async move {
let _ = shutdown_rx.await;
let _ = rx.await;
})
.await
.context("grpc server exited")
})
});

Ok(ServerHandle::Tcp {
shutdown: Some(shutdown_tx),
shutdown: Some(tx),
thread: Some(thread),
})
}
Expand All @@ -97,41 +94,34 @@ pub fn spawn_tcp(addr: SocketAddr, queue_path: PathBuf, token: String) -> Result
pub fn spawn_unix(path: &Path, queue_path: PathBuf, token: String) -> Result<ServerHandle> {
if path.exists() {
std::fs::remove_file(path)
.with_context(|| format!("failed to remove stale unix socket {}", path.display()))?;
.with_context(|| format!("failed to remove stale socket {}", path.display()))?;
}
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).with_context(|| {
format!(
"failed to create unix socket directory {}",
parent.display()
)
})?;
std::fs::create_dir_all(parent)
.with_context(|| format!("failed to create socket dir {}", parent.display()))?;
}
let path_buf = path.to_path_buf();
let listener_path = path_buf.clone();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let socket_path = path.to_path_buf();
let (tx, rx) = oneshot::channel();
let path_clone = socket_path.clone();
let thread = thread::spawn(move || -> Result<()> {
let runtime = Builder::new_multi_thread().enable_all().build()?;
runtime.block_on(async move {
let listener =
UnixListener::bind(&listener_path).context("failed to bind unix socket")?;
let queue_path = Arc::new(queue_path);
let svc = WatcherService::new(queue_path);
let rt = Builder::new_multi_thread().enable_all().build()?;
rt.block_on(async move {
let listener = UnixListener::bind(&path_clone).context("failed to bind unix socket")?;
let service = WatcherService::new(queue_path);
let interceptor = AuthInterceptor::new(token);
Server::builder()
.add_service(WatcherServer::with_interceptor(svc, interceptor))
.add_service(WatcherServer::with_interceptor(service, interceptor))
.serve_with_incoming_shutdown(UnixListenerStream::new(listener), async move {
let _ = shutdown_rx.await;
let _ = rx.await;
})
.await
.context("grpc server exited")
})
});

Ok(ServerHandle::Unix {
shutdown: Some(shutdown_tx),
shutdown: Some(tx),
thread: Some(thread),
path: path_buf,
path: socket_path,
})
}

Expand All @@ -141,8 +131,10 @@ struct WatcherService {
}

impl WatcherService {
fn new(queue_path: Arc<PathBuf>) -> Self {
Self { queue_path }
fn new(queue_path: PathBuf) -> Self {
Self {
queue_path: Arc::new(queue_path),
}
}

fn queue(&self) -> Result<ChangeQueue> {
Expand All @@ -164,14 +156,14 @@ impl AuthInterceptor {
}

impl Interceptor for AuthInterceptor {
fn call(&mut self, req: Request<()>) -> Result<Request<()>, Status> {
let provided = req
fn call(&mut self, request: Request<()>) -> Result<Request<()>, Status> {
let provided = request
.metadata()
.get("authorization")
.ok_or_else(|| Status::unauthenticated("missing authorization header"))?;
let expected = format!("Bearer {}", self.token.as_str());
if provided == expected.as_str() {
Ok(req)
Ok(request)
} else {
Err(Status::unauthenticated("invalid authorization header"))
}
Expand Down Expand Up @@ -207,7 +199,6 @@ impl Watcher for WatcherService {
let upto = request.get_ref().up_to_change_id;
let queue = self.queue().map_err(internal_err)?;
let count = queue.ack_up_to(upto).map_err(internal_err)?;
queue.purge_acked().ok();
Ok(Response::new(AckChangesResponse {
acknowledged: count,
}))
Expand Down
Loading
Loading