@@ -2,14 +2,13 @@ use std::fmt;
22use std:: path:: { Path , PathBuf } ;
33use std:: str:: FromStr ;
44use std:: sync:: mpsc;
5- use std:: time:: { Duration , SystemTime , UNIX_EPOCH } ;
5+ use std:: time:: Duration ;
66
77use anyhow:: { anyhow, bail, Context , Result } ;
88use clap:: Parser ;
9- use serde_json:: json;
10- use sqlite_watcher:: change:: RowChange ;
11- use sqlite_watcher:: queue:: { ChangeOperation , ChangeQueue } ;
12- use sqlite_watcher:: wal:: { start_wal_watcher, WalEvent , WalWatcherConfig as TailConfig } ;
9+ use sqlite_watcher:: decoder:: WalGrowthDecoder ;
10+ use sqlite_watcher:: queue:: ChangeQueue ;
11+ use sqlite_watcher:: wal:: { start_wal_watcher, WalWatcherConfig as TailConfig } ;
1312use tracing_subscriber:: EnvFilter ;
1413
1514#[ cfg( unix) ]
@@ -219,6 +218,7 @@ fn main() -> Result<()> {
219218 ) ;
220219
221220 let queue = ChangeQueue :: open ( & config. queue_path ) ?;
221+ let decoder = WalGrowthDecoder :: default ( ) ;
222222 let ( event_tx, event_rx) = mpsc:: channel ( ) ;
223223 let _wal_handle = start_wal_watcher (
224224 & config. database_path ,
@@ -230,50 +230,42 @@ fn main() -> Result<()> {
230230 ) ?;
231231
232232 for event in event_rx {
233- match persist_wal_event ( & queue, & event) {
234- Ok ( change_id ) => {
233+ match process_wal_event ( & decoder , & queue, & event) {
234+ Ok ( change_ids ) if !change_ids . is_empty ( ) => {
235235 tracing:: info!(
236236 bytes_added = event. bytes_added,
237237 wal_size = event. current_size,
238- change_id ,
238+ change_count = change_ids . len ( ) ,
239239 "queued wal growth event"
240240 ) ;
241241 }
242242 Err ( err) => {
243243 tracing:: warn!( error = %err, "failed to persist wal event to queue" ) ;
244244 }
245+ _ => { }
245246 }
246247 }
247248
248249 Ok ( ( ) )
249250}
250251
251- fn persist_wal_event ( queue : & ChangeQueue , event : & WalEvent ) -> Result < i64 > {
252- let now = SystemTime :: now ( )
253- . duration_since ( UNIX_EPOCH )
254- . map_err ( |err| anyhow ! ( "system clock drift: {err}" ) ) ?;
255- let row = RowChange {
256- table_name : "__wal__" . to_string ( ) ,
257- operation : ChangeOperation :: Insert ,
258- primary_key : now. as_nanos ( ) . to_string ( ) ,
259- payload : Some ( json ! ( {
260- "kind" : "wal_growth" ,
261- "bytes_added" : event. bytes_added,
262- "current_size" : event. current_size,
263- "recorded_at" : now. as_secs_f64( ) ,
264- } ) ) ,
265- wal_frame : None ,
266- cursor : None ,
267- } ;
268- queue. enqueue ( & row. into_new_change ( ) )
252+ fn process_wal_event (
253+ decoder : & WalGrowthDecoder ,
254+ queue : & ChangeQueue ,
255+ event : & sqlite_watcher:: wal:: WalEvent ,
256+ ) -> Result < Vec < i64 > > {
257+ let mut ids = Vec :: new ( ) ;
258+ for row in decoder. decode ( event) {
259+ ids. push ( queue. enqueue ( & row. into_new_change ( ) ) ?) ;
260+ }
261+ Ok ( ids)
269262}
270263
271264#[ cfg( test) ]
272265mod tests {
273266 use super :: * ;
274267 use clap:: Parser ;
275268 use sqlite_watcher:: queue:: ChangeQueue ;
276- use sqlite_watcher:: wal:: WalEvent ;
277269 use tempfile:: { tempdir, NamedTempFile } ;
278270
279271 #[ test]
@@ -316,15 +308,15 @@ mod tests {
316308 let dir = tempdir ( ) . unwrap ( ) ;
317309 let queue_path = dir. path ( ) . join ( "queue.db" ) ;
318310 let queue = ChangeQueue :: open ( & queue_path) . unwrap ( ) ;
311+ let decoder = WalGrowthDecoder :: default ( ) ;
319312
320- let event = WalEvent {
313+ let event = sqlite_watcher :: wal :: WalEvent {
321314 bytes_added : 2048 ,
322315 current_size : 4096 ,
323316 } ;
324- let change_id = persist_wal_event ( & queue, & event) . unwrap ( ) ;
317+ let change_ids = process_wal_event ( & decoder , & queue, & event) . unwrap ( ) ;
325318 let batch = queue. fetch_batch ( 10 ) . unwrap ( ) ;
326- assert_eq ! ( batch. len( ) , 1 ) ;
327- assert_eq ! ( batch[ 0 ] . change_id, change_id) ;
319+ assert_eq ! ( batch. len( ) , change_ids. len( ) ) ;
328320 assert_eq ! ( batch[ 0 ] . table_name, "__wal__" ) ;
329321 }
330322}
0 commit comments