Skip to content

Commit 076de67

Browse files
committed
[sqlite-watcher] add unix socket listener and tests
1 parent f079d2a commit 076de67

File tree

6 files changed

+172
-71
lines changed

6 files changed

+172
-71
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqlite-watcher/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
2828
[dev-dependencies]
2929
tempfile = "3.8"
3030
tokio = { version = "1.35", features = ["rt", "macros"] }
31+
tower = "0.4"

sqlite-watcher/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ Flag summary:
3232
- `--log-level`: Tracing filter (also settable via `SQLITE_WATCHER_LOG`).
3333
- `--poll-interval-ms`: How often to check the WAL file for growth (default 500 ms). Lower values react faster but cost more syscalls.
3434
- `--min-event-bytes`: Minimum WAL byte growth before emitting an event. Use larger values to avoid spam when very small transactions occur.
35-
- `--listen` + `--token-file` now control the embedded gRPC server. Clients must send `Authorization: Bearer <token>` metadata when calling the `Watcher` service (see `proto/watcher.proto`). Unix sockets/pipes are placeholders until Ticket D is completed; TCP listens on `127.0.0.1:<port>`.
35+
- `--listen` + `--token-file` now control the embedded gRPC server. Clients must send `Authorization: Bearer <token>` metadata when calling the `Watcher` service (see `proto/watcher.proto`). TCP (`tcp:50051`) and Unix sockets (`unix:/tmp/sqlite-watcher.sock`) are available today; Windows named pipes currently fall back to TCP until native support lands.
3636

3737
## Cross-platform notes
3838

39-
- **Linux/macOS**: Default listener is a Unix domain socket at `/tmp/sqlite-watcher.sock`. Ensure the target SQLite database enables WAL journaling.
39+
- **Linux/macOS**: Default listener is a Unix domain socket at `/tmp/sqlite-watcher.sock`. The watcher cleans up stale socket files on startup; point `--listen unix:/path` elsewhere if needed.
4040
- **Windows**: Unix sockets are disabled; pass `--listen tcp:50051` or `--listen pipe:SerenWatcher`. Named pipes allow local service accounts without opening TCP ports.
4141
- All platforms expect the token file to live under `~/.seren/sqlite-watcher/token` by default; create the directory with `0700` permissions so the watcher refuses to start if the secret is world-readable.
4242
- The current WAL watcher polls the `*.sqlite-wal` file for byte growth. To keep WAL history available, configure your writers with `PRAGMA journal_mode=WAL;` and raise `wal_autocheckpoint` (or disable it) so the SQLite engine does not aggressively truncate the log.

sqlite-watcher/src/main.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ use anyhow::{anyhow, bail, Context, Result};
1010
use clap::Parser;
1111
use sqlite_watcher::decoder::WalGrowthDecoder;
1212
use sqlite_watcher::queue::ChangeQueue;
13-
use sqlite_watcher::server::TcpServerHandle;
13+
#[cfg(unix)]
14+
use sqlite_watcher::server::spawn_unix_server;
15+
use sqlite_watcher::server::{spawn_tcp_server, ServerHandle};
1416
use sqlite_watcher::wal::{start_wal_watcher, WalWatcherConfig as TailConfig};
1517
use tracing_subscriber::EnvFilter;
1618

@@ -281,21 +283,25 @@ fn start_grpc_server(
281283
listen: &ListenAddress,
282284
queue_path: &Path,
283285
token: &str,
284-
) -> Result<Option<TcpServerHandle>> {
286+
) -> Result<Option<ServerHandle>> {
285287
match listen {
286288
ListenAddress::Tcp { host, port } => {
287289
let addr: SocketAddr = format!("{}:{}", host, port)
288290
.parse()
289291
.with_context(|| format!("invalid tcp listen address {host}:{port}"))?;
290-
let handle = TcpServerHandle::spawn(addr, queue_path.to_path_buf(), token.to_string())?;
292+
let handle = spawn_tcp_server(addr, queue_path.to_path_buf(), token.to_string())?;
291293
Ok(Some(handle))
292294
}
293295
ListenAddress::Unix(path) => {
294-
tracing::warn!(
295-
path = %path.display(),
296-
"unix socket gRPC transport is not yet implemented"
297-
);
298-
Ok(None)
296+
#[cfg(unix)]
297+
{
298+
let handle = spawn_unix_server(path, queue_path.to_path_buf(), token.to_string())?;
299+
Ok(Some(handle))
300+
}
301+
#[cfg(not(unix))]
302+
{
303+
bail!("unix sockets are not supported on this platform")
304+
}
299305
}
300306
ListenAddress::Pipe(name) => {
301307
tracing::warn!(pipe = name, "named pipe transport is not yet implemented");

sqlite-watcher/src/server.rs

Lines changed: 119 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::net::SocketAddr;
2-
use std::path::PathBuf;
2+
use std::path::{Path, PathBuf};
33
use std::sync::Arc;
44
use std::thread::{self, JoinHandle};
55

@@ -18,55 +18,115 @@ use crate::watcher_proto::{
1818
SetStateRequest, SetStateResponse,
1919
};
2020

21-
pub struct TcpServerHandle {
21+
#[cfg(unix)]
22+
use tokio::net::UnixListener;
23+
#[cfg(unix)]
24+
use tokio_stream::wrappers::UnixListenerStream;
25+
26+
pub struct ServerHandle {
2227
shutdown: Option<oneshot::Sender<()>>,
2328
thread: Option<JoinHandle<Result<()>>>,
29+
#[cfg(unix)]
30+
unix_path: Option<PathBuf>,
2431
}
2532

26-
impl Drop for TcpServerHandle {
33+
impl Drop for ServerHandle {
2734
fn drop(&mut self) {
2835
if let Some(tx) = self.shutdown.take() {
2936
let _ = tx.send(());
3037
}
3138
if let Some(handle) = self.thread.take() {
3239
let _ = handle.join();
3340
}
41+
#[cfg(unix)]
42+
if let Some(path) = self.unix_path.take() {
43+
let _ = std::fs::remove_file(path);
44+
}
3445
}
3546
}
3647

37-
impl TcpServerHandle {
38-
pub fn spawn(addr: SocketAddr, queue_path: PathBuf, token: String) -> Result<Self> {
39-
let (shutdown_tx, shutdown_rx) = oneshot::channel();
40-
let thread = thread::spawn(move || -> Result<()> {
41-
let runtime = Builder::new_multi_thread()
42-
.enable_all()
43-
.build()
44-
.context("failed to build tokio runtime")?;
45-
runtime.block_on(async move {
46-
let listener = tokio::net::TcpListener::bind(addr)
47-
.await
48-
.context("failed to bind tcp listener")?;
49-
let queue_path = Arc::new(queue_path);
50-
let svc = WatcherService::new(queue_path.clone());
51-
let interceptor = AuthInterceptor {
52-
token: Arc::new(token),
53-
};
54-
Server::builder()
55-
.add_service(WatcherServer::with_interceptor(svc, interceptor))
56-
.serve_with_incoming_shutdown(TcpListenerStream::new(listener), async move {
57-
let _ = shutdown_rx.await;
58-
})
59-
.await
60-
.context("grpc server exited with error")?;
61-
Ok(())
62-
})
63-
});
64-
65-
Ok(Self {
66-
shutdown: Some(shutdown_tx),
67-
thread: Some(thread),
68-
})
48+
pub fn spawn_tcp_server(
49+
addr: SocketAddr,
50+
queue_path: PathBuf,
51+
token: String,
52+
) -> Result<ServerHandle> {
53+
let (shutdown_tx, shutdown_rx) = oneshot::channel();
54+
let thread = thread::spawn(move || -> Result<()> {
55+
let runtime = Builder::new_multi_thread()
56+
.enable_all()
57+
.build()
58+
.context("failed to build tokio runtime")?;
59+
runtime.block_on(async move {
60+
let listener = tokio::net::TcpListener::bind(addr)
61+
.await
62+
.context("failed to bind tcp listener")?;
63+
let queue_path = Arc::new(queue_path);
64+
let svc = WatcherService::new(queue_path);
65+
let interceptor = AuthInterceptor {
66+
token: Arc::new(token),
67+
};
68+
Server::builder()
69+
.add_service(WatcherServer::with_interceptor(svc, interceptor))
70+
.serve_with_incoming_shutdown(TcpListenerStream::new(listener), async move {
71+
let _ = shutdown_rx.await;
72+
})
73+
.await
74+
.context("grpc server exited with error")?;
75+
Ok::<(), anyhow::Error>(())
76+
})?;
77+
Ok(())
78+
});
79+
80+
Ok(ServerHandle {
81+
shutdown: Some(shutdown_tx),
82+
thread: Some(thread),
83+
#[cfg(unix)]
84+
unix_path: None,
85+
})
86+
}
87+
88+
#[cfg(unix)]
89+
pub fn spawn_unix_server(path: &Path, queue_path: PathBuf, token: String) -> Result<ServerHandle> {
90+
if path.exists() {
91+
std::fs::remove_file(path)
92+
.with_context(|| format!("failed to remove stale unix socket {}", path.display()))?;
93+
}
94+
if let Some(parent) = path.parent() {
95+
std::fs::create_dir_all(parent)
96+
.with_context(|| format!("failed to create unix socket dir {}", parent.display()))?;
6997
}
98+
let path_buf = path.to_path_buf();
99+
let (shutdown_tx, shutdown_rx) = oneshot::channel();
100+
let path_for_drop = path_buf.clone();
101+
let thread = thread::spawn(move || -> Result<()> {
102+
let runtime = Builder::new_multi_thread()
103+
.enable_all()
104+
.build()
105+
.context("failed to build tokio runtime")?;
106+
runtime.block_on(async move {
107+
let listener = UnixListener::bind(&path_buf).context("failed to bind unix socket")?;
108+
let queue_path = Arc::new(queue_path);
109+
let svc = WatcherService::new(queue_path);
110+
let interceptor = AuthInterceptor {
111+
token: Arc::new(token),
112+
};
113+
Server::builder()
114+
.add_service(WatcherServer::with_interceptor(svc, interceptor))
115+
.serve_with_incoming_shutdown(UnixListenerStream::new(listener), async move {
116+
let _ = shutdown_rx.await;
117+
})
118+
.await
119+
.context("grpc server exited with error")?;
120+
Ok::<(), anyhow::Error>(())
121+
})?;
122+
Ok(())
123+
});
124+
125+
Ok(ServerHandle {
126+
shutdown: Some(shutdown_tx),
127+
thread: Some(thread),
128+
unix_path: Some(path_for_drop),
129+
})
70130
}
71131

72132
struct WatcherService {
@@ -83,6 +143,30 @@ impl WatcherService {
83143
}
84144
}
85145

146+
#[derive(Clone)]
147+
struct AuthInterceptor {
148+
token: Arc<String>,
149+
}
150+
151+
impl Interceptor for AuthInterceptor {
152+
fn call(&mut self, request: Request<()>) -> Result<Request<()>, Status> {
153+
let header = request
154+
.metadata()
155+
.get("authorization")
156+
.ok_or_else(|| Status::unauthenticated("missing authorization header"))?;
157+
let expected = format!("Bearer {}", self.token.as_ref());
158+
if header
159+
.to_str()
160+
.map(|value| value == expected)
161+
.unwrap_or(false)
162+
{
163+
Ok(request)
164+
} else {
165+
Err(Status::unauthenticated("invalid authorization header"))
166+
}
167+
}
168+
}
169+
86170
#[tonic::async_trait]
87171
impl Watcher for WatcherService {
88172
async fn health_check(
@@ -196,27 +280,3 @@ fn change_to_proto(row: crate::queue::ChangeRecord) -> Change {
196280
cursor: row.cursor.unwrap_or_default(),
197281
}
198282
}
199-
200-
#[derive(Clone)]
201-
struct AuthInterceptor {
202-
token: Arc<String>,
203-
}
204-
205-
impl Interceptor for AuthInterceptor {
206-
fn call(&mut self, request: Request<()>) -> Result<Request<()>, Status> {
207-
let header = request
208-
.metadata()
209-
.get("authorization")
210-
.ok_or_else(|| Status::unauthenticated("missing authorization header"))?;
211-
let expected = format!("Bearer {}", self.token.as_ref());
212-
if header
213-
.to_str()
214-
.map(|value| value == expected)
215-
.unwrap_or(false)
216-
{
217-
Ok(request)
218-
} else {
219-
Err(Status::unauthenticated("invalid authorization header"))
220-
}
221-
}
222-
}

sqlite-watcher/tests/server_tests.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use std::net::SocketAddr;
22
use std::time::Duration;
33

4-
use sqlite_watcher::server::TcpServerHandle;
4+
use sqlite_watcher::server::spawn_tcp_server;
5+
#[cfg(unix)]
6+
use sqlite_watcher::server::spawn_unix_server;
57
use sqlite_watcher::watcher_proto::watcher_client::WatcherClient;
68
use sqlite_watcher::watcher_proto::HealthCheckRequest;
79
use tempfile::tempdir;
@@ -15,7 +17,7 @@ async fn health_check_responds_ok() {
1517
let addr: SocketAddr = "127.0.0.1:55051".parse().unwrap();
1618
let token = "secret-token".to_string();
1719

18-
let _handle = TcpServerHandle::spawn(addr, queue_path, token.clone()).unwrap();
20+
let _handle = spawn_tcp_server(addr, queue_path, token.clone()).unwrap();
1921
sleep(Duration::from_millis(200)).await;
2022

2123
let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr))
@@ -30,3 +32,33 @@ async fn health_check_responds_ok() {
3032
let resp = client.health_check(req).await.unwrap();
3133
assert_eq!(resp.into_inner().status, "ok");
3234
}
35+
#[cfg(unix)]
36+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
37+
async fn health_check_over_unix_socket() {
38+
use tokio::net::UnixStream;
39+
use tonic::transport::Endpoint;
40+
use tower::service_fn;
41+
42+
let dir = tempdir().unwrap();
43+
let queue_path = dir.path().join("queue.db");
44+
let socket_path = dir.path().join("watcher.sock");
45+
let token = "secret-token".to_string();
46+
47+
let _handle = spawn_unix_server(&socket_path, queue_path, token.clone()).unwrap();
48+
sleep(Duration::from_millis(200)).await;
49+
50+
let endpoint = Endpoint::try_from("http://[::]:50051").unwrap();
51+
let channel = endpoint
52+
.connect_with_connector(service_fn(move |_: tonic::transport::Uri| {
53+
let path = socket_path.clone();
54+
async move { UnixStream::connect(path).await }
55+
}))
56+
.await
57+
.unwrap();
58+
let mut client = WatcherClient::new(channel);
59+
let mut req = tonic::Request::new(HealthCheckRequest {});
60+
let header = MetadataValue::try_from(format!("Bearer {}", token)).unwrap();
61+
req.metadata_mut().insert("authorization", header);
62+
let resp = client.health_check(req).await.unwrap();
63+
assert_eq!(resp.into_inner().status, "ok");
64+
}

0 commit comments

Comments
 (0)