Skip to content

Commit d12d0c4

Browse files
committed
Fix sqlite watcher clippy warnings
1 parent 7b6b633 commit d12d0c4

File tree

4 files changed

+35
-17
lines changed

4 files changed

+35
-17
lines changed

PR_BODY.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
## Summary
2+
- fix sqlite sync change-state handling by treating watcher wal_frame/cursor fields as optional strings and cleaning up unused code
3+
- implement FromStr for sqlite ChangeOperation and resolve needless borrow lints in queue/server modules
4+
- keep clippy happy by applying the suggested clamp change and ensuring proto tests build
5+
6+
## Testing
7+
- cargo clippy
8+
- cargo test

sqlite-watcher/src/queue.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::fs;
22
use std::path::{Path, PathBuf};
3+
use std::str::FromStr;
34

45
use anyhow::{anyhow, Context, Result};
56
use rusqlite::{params, Connection, OptionalExtension, Row};
@@ -41,8 +42,12 @@ impl ChangeOperation {
4142
ChangeOperation::Delete => "delete",
4243
}
4344
}
45+
}
46+
47+
impl FromStr for ChangeOperation {
48+
type Err = anyhow::Error;
4449

45-
pub fn from_str(value: &str) -> Result<Self> {
50+
fn from_str(value: &str) -> Result<Self, Self::Err> {
4651
match value {
4752
"insert" => Ok(ChangeOperation::Insert),
4853
"update" => Ok(ChangeOperation::Update),
@@ -98,8 +103,8 @@ impl ChangeQueue {
98103
}
99104
let conn = Connection::open(path)
100105
.with_context(|| format!("failed to open queue database {}", path.display()))?;
101-
conn.pragma_update(None, "journal_mode", &"wal").ok();
102-
conn.pragma_update(None, "synchronous", &"normal").ok();
106+
conn.pragma_update(None, "journal_mode", "wal").ok();
107+
conn.pragma_update(None, "synchronous", "normal").ok();
103108
conn.execute_batch(SCHEMA)
104109
.context("failed to initialize change queue schema")?;
105110
Ok(Self {
@@ -132,7 +137,7 @@ impl ChangeQueue {
132137
let mut rows = stmt.query([limit as i64])?;
133138
let mut results = Vec::new();
134139
while let Some(row) = rows.next()? {
135-
results.push(row_to_change(&row)?);
140+
results.push(row_to_change(row)?);
136141
}
137142
Ok(results)
138143
}

sqlite-watcher/src/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ impl Watcher for WatcherService {
193193
&self,
194194
request: Request<ListChangesRequest>,
195195
) -> Result<Response<ListChangesResponse>, Status> {
196-
let limit = request.get_ref().limit.max(1).min(10_000) as usize;
196+
let limit = request.get_ref().limit.clamp(1, 10_000) as usize;
197197
let queue = self.queue().map_err(internal_err)?;
198198
let rows = queue.fetch_batch(limit).map_err(internal_err)?;
199199
let changes = rows.into_iter().map(change_to_proto).collect();

src/commands/sync_sqlite.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1-
use std::collections::HashMap;
2-
use std::path::{Path, PathBuf};
3-
use std::time::Duration;
4-
51
use anyhow::{anyhow, bail, Context, Result};
62
use clap::ValueEnum;
73
use sqlite_watcher::watcher_proto::watcher_client::WatcherClient;
84
use sqlite_watcher::watcher_proto::{
95
AckChangesRequest, GetStateRequest, HealthCheckRequest, ListChangesRequest, SetStateRequest,
106
};
7+
use std::collections::HashMap;
8+
use std::path::{Path, PathBuf};
119
use tokio_postgres::Client;
1210
use tonic::codegen::InterceptedService;
1311
use tonic::service::Interceptor;
@@ -17,7 +15,6 @@ use tower::service_fn;
1715

1816
use crate::jsonb::writer::{delete_jsonb_rows, insert_jsonb_batch, upsert_jsonb_rows};
1917

20-
const DEFAULT_BATCH_LIMIT: u32 = 500;
2118
const GLOBAL_STATE_KEY: &str = "_global";
2219

2320
#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)]
@@ -63,7 +60,7 @@ pub async fn run(opts: SyncSqliteOptions) -> Result<()> {
6360
let mut processed_any = false;
6461

6562
loop {
66-
let mut req = Request::new(ListChangesRequest {
63+
let req = Request::new(ListChangesRequest {
6764
limit: opts.batch_size.max(1),
6865
});
6966
let changes = watcher
@@ -160,8 +157,8 @@ async fn apply_changes(
160157
change.table_name.clone(),
161158
TableState {
162159
last_change_id: change.change_id,
163-
wal_frame: change.wal_frame.clone(),
164-
cursor: change.cursor.clone(),
160+
wal_frame: non_empty_string(&change.wal_frame),
161+
cursor: non_empty_string(&change.cursor),
165162
},
166163
);
167164
}
@@ -235,6 +232,14 @@ fn mode_string(mode: IncrementalMode) -> &'static str {
235232
}
236233
}
237234

235+
fn non_empty_string(value: &str) -> Option<String> {
236+
if value.is_empty() {
237+
None
238+
} else {
239+
Some(value.to_owned())
240+
}
241+
}
242+
238243
fn load_token(path: Option<&Path>) -> Result<String> {
239244
let token_path = path
240245
.map(|p| p.to_path_buf())
@@ -399,17 +404,17 @@ mod tests {
399404
op: "insert".into(),
400405
primary_key: "1".into(),
401406
payload: serde_json::to_vec(&serde_json::json!({"a":1})).unwrap(),
402-
wal_frame: None,
403-
cursor: None,
407+
wal_frame: String::new(),
408+
cursor: String::new(),
404409
},
405410
Change {
406411
change_id: 2,
407412
table_name: "foo".into(),
408413
op: "delete".into(),
409414
primary_key: "2".into(),
410415
payload: Vec::new(),
411-
wal_frame: None,
412-
cursor: None,
416+
wal_frame: String::new(),
417+
cursor: String::new(),
413418
},
414419
];
415420
let mut per_table: HashMap<String, TableBatch> = HashMap::new();

0 commit comments

Comments
 (0)