Skip to content

Commit d182164

Browse files
authored
[sqlite-watcher] add queue + grpc server scaffold (#89)
1 parent 4d3cb34 commit d182164

File tree

3 files changed

+51
-60
lines changed

3 files changed

+51
-60
lines changed

Cargo.lock

Lines changed: 9 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqlite-watcher/README.md

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
1-
# sqlite-watcher (alpha)
1+
# sqlite-watcher
22

3-
This crate currently ships the shared queue + gRPC server used by `database-replicator sync-sqlite`. The `sqlite-watcher` binary includes:
3+
This crate provides the building blocks for an upcoming sqlite-watcher binary. Issue #82 adds a durable queue plus a tonic-based gRPC server so other components can stream captured SQLite changes.
44

5-
- `serve`: start the queue-backed gRPC API so clients can pull change batches.
6-
- `enqueue`: helper for tests/smoke scripts to add sample changes to the queue database.
5+
## Components
76

8-
> **Note:** WAL tailing is still under active development; use the binary today to test queue + sync flows.
7+
- `queue.rs`: stores change rows and per-table checkpoints in `~/.seren/sqlite-watcher/changes.db`.
8+
- `proto/watcher.proto`: RPC definitions (`HealthCheck`, `ListChanges`, `AckChanges`, `GetState`, `SetState`).
9+
- `server.rs`: tonic server wrappers exposing the queue over TCP or Unix sockets with shared-secret authentication.
910

10-
See `sqlite-watcher-docs/installers.md` for per-OS service guidance and `scripts/test-sqlite-delta.sh` for the end-to-end smoke test.
11+
## Building & Testing
12+
13+
```bash
14+
cargo test -p sqlite-watcher
15+
```
16+
17+
The tests cover queue durability/state behavior. Server tests will be added once the consumer wiring lands.

sqlite-watcher/src/server.rs

Lines changed: 29 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ use tokio::runtime::Builder;
88
use tokio::sync::oneshot;
99
use tokio_stream::wrappers::TcpListenerStream;
1010
use tonic::service::Interceptor;
11-
use tonic::transport::Server;
12-
use tonic::{Request, Response, Status};
11+
use tonic::{transport::Server, Request, Response, Status};
1312

1413
#[cfg(unix)]
1514
use tokio::net::UnixListener;
@@ -67,28 +66,26 @@ impl Drop for ServerHandle {
6766
}
6867

6968
pub fn spawn_tcp(addr: SocketAddr, queue_path: PathBuf, token: String) -> Result<ServerHandle> {
70-
let (shutdown_tx, shutdown_rx) = oneshot::channel();
69+
let (tx, rx) = oneshot::channel();
7170
let thread = thread::spawn(move || -> Result<()> {
72-
let runtime = Builder::new_multi_thread().enable_all().build()?;
73-
runtime.block_on(async move {
71+
let rt = Builder::new_multi_thread().enable_all().build()?;
72+
rt.block_on(async move {
7473
let listener = tokio::net::TcpListener::bind(addr)
7574
.await
7675
.context("failed to bind tcp listener")?;
77-
let queue_path = Arc::new(queue_path);
78-
let svc = WatcherService::new(queue_path);
76+
let service = WatcherService::new(queue_path);
7977
let interceptor = AuthInterceptor::new(token);
8078
Server::builder()
81-
.add_service(WatcherServer::with_interceptor(svc, interceptor))
79+
.add_service(WatcherServer::with_interceptor(service, interceptor))
8280
.serve_with_incoming_shutdown(TcpListenerStream::new(listener), async move {
83-
let _ = shutdown_rx.await;
81+
let _ = rx.await;
8482
})
8583
.await
8684
.context("grpc server exited")
8785
})
8886
});
89-
9087
Ok(ServerHandle::Tcp {
91-
shutdown: Some(shutdown_tx),
88+
shutdown: Some(tx),
9289
thread: Some(thread),
9390
})
9491
}
@@ -97,41 +94,34 @@ pub fn spawn_tcp(addr: SocketAddr, queue_path: PathBuf, token: String) -> Result
9794
pub fn spawn_unix(path: &Path, queue_path: PathBuf, token: String) -> Result<ServerHandle> {
9895
if path.exists() {
9996
std::fs::remove_file(path)
100-
.with_context(|| format!("failed to remove stale unix socket {}", path.display()))?;
97+
.with_context(|| format!("failed to remove stale socket {}", path.display()))?;
10198
}
10299
if let Some(parent) = path.parent() {
103-
std::fs::create_dir_all(parent).with_context(|| {
104-
format!(
105-
"failed to create unix socket directory {}",
106-
parent.display()
107-
)
108-
})?;
100+
std::fs::create_dir_all(parent)
101+
.with_context(|| format!("failed to create socket dir {}", parent.display()))?;
109102
}
110-
let path_buf = path.to_path_buf();
111-
let listener_path = path_buf.clone();
112-
let (shutdown_tx, shutdown_rx) = oneshot::channel();
103+
let socket_path = path.to_path_buf();
104+
let (tx, rx) = oneshot::channel();
105+
let path_clone = socket_path.clone();
113106
let thread = thread::spawn(move || -> Result<()> {
114-
let runtime = Builder::new_multi_thread().enable_all().build()?;
115-
runtime.block_on(async move {
116-
let listener =
117-
UnixListener::bind(&listener_path).context("failed to bind unix socket")?;
118-
let queue_path = Arc::new(queue_path);
119-
let svc = WatcherService::new(queue_path);
107+
let rt = Builder::new_multi_thread().enable_all().build()?;
108+
rt.block_on(async move {
109+
let listener = UnixListener::bind(&path_clone).context("failed to bind unix socket")?;
110+
let service = WatcherService::new(queue_path);
120111
let interceptor = AuthInterceptor::new(token);
121112
Server::builder()
122-
.add_service(WatcherServer::with_interceptor(svc, interceptor))
113+
.add_service(WatcherServer::with_interceptor(service, interceptor))
123114
.serve_with_incoming_shutdown(UnixListenerStream::new(listener), async move {
124-
let _ = shutdown_rx.await;
115+
let _ = rx.await;
125116
})
126117
.await
127118
.context("grpc server exited")
128119
})
129120
});
130-
131121
Ok(ServerHandle::Unix {
132-
shutdown: Some(shutdown_tx),
122+
shutdown: Some(tx),
133123
thread: Some(thread),
134-
path: path_buf,
124+
path: socket_path,
135125
})
136126
}
137127

@@ -141,8 +131,10 @@ struct WatcherService {
141131
}
142132

143133
impl WatcherService {
144-
fn new(queue_path: Arc<PathBuf>) -> Self {
145-
Self { queue_path }
134+
fn new(queue_path: PathBuf) -> Self {
135+
Self {
136+
queue_path: Arc::new(queue_path),
137+
}
146138
}
147139

148140
fn queue(&self) -> Result<ChangeQueue> {
@@ -164,14 +156,14 @@ impl AuthInterceptor {
164156
}
165157

166158
impl Interceptor for AuthInterceptor {
167-
fn call(&mut self, req: Request<()>) -> Result<Request<()>, Status> {
168-
let provided = req
159+
fn call(&mut self, request: Request<()>) -> Result<Request<()>, Status> {
160+
let provided = request
169161
.metadata()
170162
.get("authorization")
171163
.ok_or_else(|| Status::unauthenticated("missing authorization header"))?;
172164
let expected = format!("Bearer {}", self.token.as_str());
173165
if provided == expected.as_str() {
174-
Ok(req)
166+
Ok(request)
175167
} else {
176168
Err(Status::unauthenticated("invalid authorization header"))
177169
}
@@ -207,7 +199,6 @@ impl Watcher for WatcherService {
207199
let upto = request.get_ref().up_to_change_id;
208200
let queue = self.queue().map_err(internal_err)?;
209201
let count = queue.ack_up_to(upto).map_err(internal_err)?;
210-
queue.purge_acked().ok();
211202
Ok(Response::new(AckChangesResponse {
212203
acknowledged: count,
213204
}))

0 commit comments

Comments
 (0)