Skip to content

Commit 8566627

Browse files
committed
release: 7.0.14
1 parent 5eaebbd commit 8566627

File tree

4 files changed

+115
-13
lines changed

4 files changed

+115
-13
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,16 @@ All notable changes to this project will be documented in this file.
44

55
## [7.0.12] - 2025-12-11
66

7+
## [7.0.14] - 2025-12-11
8+
9+
### Fixed
10+
11+
- **Avoid duplicate JSONB IDs for SQLite tables without primary keys**: the ID detector now only uses real primary keys or candidate columns that are provably unique. Tables like `prices` with repeated `id` values fall back to row-number IDs, preventing immediate `prices_pkey` violations during inserts.
12+
13+
### Added
14+
15+
- **Duplicate-ID regression tests** covering both the rejection path (when duplicates exist) and the acceptance path for unique-but-not-PK `TEXT` IDs.
16+
717
## [7.0.13] - 2025-12-11
818

919
### Fixed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "database-replicator"
3-
version = "7.0.13"
3+
version = "7.0.14"
44
edition = "2021"
55
license = "Apache-2.0"
66
description = "Universal database-to-PostgreSQL replication CLI. Supports PostgreSQL, SQLite, MongoDB, and MySQL."

src/sqlite/converter.rs

Lines changed: 103 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -207,34 +207,95 @@ pub fn convert_table_to_jsonb(conn: &Connection, table: &str) -> Result<Vec<(Str
207207
/// Checks for common ID column names: "id", "rowid", "_id" (case-insensitive).
208208
/// If found, returns the column name. Otherwise returns None.
209209
fn detect_id_column(conn: &Connection, table: &str) -> Result<Option<String>> {
210-
// Get column names for the table
211-
let query = format!("PRAGMA table_info(\"{}\")", table);
210+
crate::jsonb::validate_table_name(table).context("Invalid SQLite table name")?;
211+
212+
// Get column metadata so we can detect declared primary keys
213+
let query = format!("PRAGMA table_info({})", crate::utils::quote_ident(table));
212214
let mut stmt = conn
213215
.prepare(&query)
214216
.with_context(|| format!("Failed to get table info for '{}'", table))?;
215217

216-
let columns: Vec<String> = stmt
217-
.query_map([], |row| row.get::<_, String>(1))
218-
.context("Failed to query table columns")?
219-
.collect::<Result<Vec<_>, _>>()
220-
.context("Failed to collect column names")?;
218+
let mut columns: Vec<String> = Vec::new();
219+
let mut pk_columns: Vec<(i64, String)> = Vec::new();
220+
221+
let rows = stmt
222+
.query_map([], |row| {
223+
let name: String = row.get(1)?;
224+
let pk_position: i64 = row.get(5)?;
225+
Ok((name, pk_position))
226+
})
227+
.context("Failed to query table columns")?;
228+
229+
for row in rows {
230+
let (name, pk_position) = row.context("Failed to parse table_info row")?;
231+
if pk_position > 0 {
232+
pk_columns.push((pk_position, name.clone()));
233+
}
234+
columns.push(name);
235+
}
221236

222-
// Check for common ID column names (case-insensitive)
237+
pk_columns.sort_by_key(|(pos, _)| *pos);
238+
if pk_columns.len() == 1 {
239+
let pk = pk_columns.remove(0).1;
240+
tracing::debug!(
241+
"Using primary key column '{}' as ID for table '{}'",
242+
pk,
243+
table
244+
);
245+
return Ok(Some(pk));
246+
} else if pk_columns.len() > 1 {
247+
tracing::info!(
248+
"Table '{}' has a composite primary key; falling back to row numbers",
249+
table
250+
);
251+
return Ok(None);
252+
}
253+
254+
// No declared primary key – fall back to heuristic columns, but only if unique
223255
let id_candidates = ["id", "rowid", "_id"];
224256
for candidate in &id_candidates {
225257
if let Some(col) = columns.iter().find(|c| c.to_lowercase() == *candidate) {
226-
tracing::debug!("Using column '{}' as ID for table '{}'", col, table);
227-
return Ok(Some(col.clone()));
258+
if column_is_unique(conn, table, col)? {
259+
tracing::debug!("Using unique column '{}' as ID for table '{}'", col, table);
260+
return Ok(Some(col.clone()));
261+
} else {
262+
tracing::warn!(
263+
"Column '{}' on table '{}' contains duplicate values; using row numbers instead",
264+
col,
265+
table
266+
);
267+
}
228268
}
229269
}
230270

231271
tracing::debug!(
232-
"No ID column found for table '{}', will use row number",
272+
"No unique ID column found for table '{}', will use row number",
233273
table
234274
);
235275
Ok(None)
236276
}
237277

278+
fn column_is_unique(conn: &Connection, table: &str, column: &str) -> Result<bool> {
279+
crate::jsonb::validate_table_name(column).context("Invalid column name")?;
280+
281+
let query = format!(
282+
"SELECT COUNT(*) as total_rows, COUNT(DISTINCT {}) as distinct_rows FROM {}",
283+
crate::utils::quote_ident(column),
284+
crate::utils::quote_ident(table)
285+
);
286+
287+
let (total_rows, distinct_rows): (i64, i64) = conn
288+
.query_row(&query, [], |row| Ok((row.get(0)?, row.get(1)?)))
289+
.with_context(|| {
290+
format!(
291+
"Failed to evaluate uniqueness of column '{}' on table '{}'",
292+
column, table
293+
)
294+
})?;
295+
296+
Ok(total_rows == distinct_rows)
297+
}
298+
238299
/// Convert a batch of SQLite rows to JSONB format.
239300
///
240301
/// Converts a pre-read batch of rows, extracting IDs and converting to JSON.
@@ -632,6 +693,37 @@ mod tests {
632693
assert_eq!(id_col.unwrap().to_lowercase(), "id");
633694
}
634695

696+
#[test]
697+
fn test_detect_id_column_rejects_duplicates() {
698+
let conn = Connection::open_in_memory().unwrap();
699+
700+
conn.execute("CREATE TABLE dup_ids (id TEXT, value TEXT)", [])
701+
.unwrap();
702+
conn.execute("INSERT INTO dup_ids (id, value) VALUES ('A', 'one')", [])
703+
.unwrap();
704+
conn.execute("INSERT INTO dup_ids (id, value) VALUES ('A', 'two')", [])
705+
.unwrap();
706+
707+
let id_col = detect_id_column(&conn, "dup_ids").unwrap();
708+
assert!(id_col.is_none(), "Duplicate ID column should be rejected");
709+
}
710+
711+
#[test]
712+
fn test_detect_id_column_accepts_unique_text() {
713+
let conn = Connection::open_in_memory().unwrap();
714+
715+
conn.execute("CREATE TABLE unique_ids (id TEXT, value TEXT)", [])
716+
.unwrap();
717+
conn.execute(
718+
"INSERT INTO unique_ids (id, value) VALUES ('A', 'one'), ('B', 'two')",
719+
[],
720+
)
721+
.unwrap();
722+
723+
let id_col = detect_id_column(&conn, "unique_ids").unwrap();
724+
assert_eq!(id_col.as_deref(), Some("id"));
725+
}
726+
635727
#[test]
636728
fn test_convert_empty_table() {
637729
let conn = Connection::open_in_memory().unwrap();

0 commit comments

Comments
 (0)