Skip to content

Commit 3c6bd3e

Browse files
emmaling27Convex, Inc.
authored andcommitted
Add delete_tablet_documents method to Persistence trait (#43255)
Adds `delete_tablet_documents` method to `Persistence` trait. Soon, we'll call this from the retention worker to delete documents in deleted tables. GitOrigin-RevId: 9d63c72e4a09d61e5c5c0200f741930d9a121e7c
1 parent 22473bd commit 3c6bd3e

File tree

9 files changed

+182
-0
lines changed

9 files changed

+182
-0
lines changed

crates/common/src/knobs.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,10 @@ pub static DOCUMENT_RETENTION_RATE_LIMIT: LazyLock<NonZeroU32> = LazyLock::new(|
513513
pub static DOCUMENT_RETENTION_MAX_SCANNED_DOCUMENTS: LazyLock<usize> =
514514
LazyLock::new(|| env_config("DOCUMENT_RETENTION_MAX_SCANNED_DOCUMENTS", 10000));
515515

516+
/// Chunk size for SQL queries deleting documents from Deleting tables.
517+
pub static DELETE_TABLET_CHUNK_SIZE: LazyLock<u16> =
518+
LazyLock::new(|| env_config("DELETE_TABLET_CHUNK_SIZE", 128));
519+
516520
/// Size at which a search index will be queued for snapshotting.
517521
pub static SEARCH_INDEX_SIZE_SOFT_LIMIT: LazyLock<usize> =
518522
LazyLock::new(|| env_config("SEARCH_INDEX_SIZE_SOFT_LIMIT", 10 * (1 << 20))); // 10 MiB

crates/common/src/persistence.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,12 @@ pub trait Persistence: Sync + Send + 'static {
237237
documents: Vec<(Timestamp, InternalDocumentId)>,
238238
) -> anyhow::Result<usize>;
239239

240+
async fn delete_tablet_documents(
241+
&self,
242+
tablet_id: TabletId,
243+
chunk_size: usize,
244+
) -> anyhow::Result<usize>;
245+
240246
// No-op by default. Persistence implementation can override.
241247
async fn shutdown(&self) -> anyhow::Result<()> {
242248
Ok(())

crates/common/src/testing/persistence_test_suite.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use crate::{
5353
Interval,
5454
StartIncluded,
5555
},
56+
knobs::DELETE_TABLET_CHUNK_SIZE,
5657
persistence::{
5758
fake_retention_validator::FakeRetentionValidator,
5859
ConflictStrategy,
@@ -226,6 +227,14 @@ macro_rules! run_persistence_test_suite {
226227
.await
227228
}
228229

230+
#[tokio::test]
231+
async fn test_persistence_delete_tablet_documents() -> anyhow::Result<()> {
232+
let $db = $create_db;
233+
let p = $create_persistence;
234+
persistence_test_suite::persistence_delete_tablet_documents(::std::sync::Arc::new(p))
235+
.await
236+
}
237+
229238
#[tokio::test]
230239
async fn test_persistence_previous_revisions_of_documents() -> anyhow::Result<()> {
231240
let $db = $create_db;
@@ -1778,6 +1787,38 @@ pub async fn persistence_delete_many_documents<P: Persistence>(p: Arc<P>) -> any
17781787
Ok(())
17791788
}
17801789

1790+
pub async fn persistence_delete_tablet_documents<P: Persistence>(p: Arc<P>) -> anyhow::Result<()> {
1791+
let mut id_generator = TestIdGenerator::new();
1792+
let table: TableName = str::parse("table")?;
1793+
1794+
const NUM_DOCS: usize = 256;
1795+
let ids: Vec<_> = (0..NUM_DOCS)
1796+
.map(|_| id_generator.user_generate(&table))
1797+
.collect();
1798+
let tablet_id = ids[0].tablet_id;
1799+
let documents: Vec<_> = ids
1800+
.iter()
1801+
.enumerate()
1802+
.map(|(i, &id)| doc(id, i as i32 + 1, Some(i as i64), None).unwrap())
1803+
.collect();
1804+
1805+
p.write(&documents, &[], ConflictStrategy::Error).await?;
1806+
1807+
let num_deleted = p
1808+
.delete_tablet_documents(tablet_id, *DELETE_TABLET_CHUNK_SIZE as usize)
1809+
.await?;
1810+
assert_eq!(num_deleted, *DELETE_TABLET_CHUNK_SIZE as usize);
1811+
1812+
let reader = p.reader();
1813+
let stream = reader.load_all_documents();
1814+
let all_docs = stream.try_collect::<Vec<_>>().await?;
1815+
assert_eq!(
1816+
all_docs.len(),
1817+
NUM_DOCS - *DELETE_TABLET_CHUNK_SIZE as usize
1818+
);
1819+
Ok(())
1820+
}
1821+
17811822
pub async fn persistence_previous_revisions_of_documents<P: Persistence>(
17821823
p: Arc<P>,
17831824
) -> anyhow::Result<()> {

crates/common/src/testing/test_persistence.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,25 @@ impl Persistence for TestPersistence {
204204
}
205205
Ok(total_deleted)
206206
}
207+
208+
async fn delete_tablet_documents(
209+
&self,
210+
tablet_id: TabletId,
211+
chunk_size: usize,
212+
) -> anyhow::Result<usize> {
213+
let mut inner = self.inner.lock();
214+
let log = &mut inner.log;
215+
let log_len = log.len();
216+
let mut num_deleted = 0;
217+
log.retain(|(_ts, id), _| {
218+
if num_deleted >= chunk_size {
219+
return true;
220+
}
221+
num_deleted += 1;
222+
id.table() != tablet_id
223+
});
224+
Ok(log_len - log.len())
225+
}
207226
}
208227

209228
#[async_trait]

crates/mysql/src/lib.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,32 @@ impl<RT: Runtime> Persistence for MySqlPersistence<RT> {
598598
})
599599
.await
600600
}
601+
602+
async fn delete_tablet_documents(
603+
&self,
604+
tablet_id: TabletId,
605+
chunk_size: usize,
606+
) -> anyhow::Result<usize> {
607+
let multitenant = self.multitenant;
608+
let instance_name = mysql_async::Value::from(&self.instance_name.raw);
609+
self.lease
610+
.transact(async move |tx| {
611+
let mut deleted_count = 0;
612+
let mut params =
613+
Vec::with_capacity(sql::DELETE_TABLE_COLUMN_COUNT + (multitenant as usize));
614+
let tablet_id: Vec<u8> = tablet_id.0.into();
615+
params.push(tablet_id.into());
616+
if multitenant {
617+
params.push(instance_name.clone());
618+
}
619+
params.push(chunk_size.into());
620+
deleted_count += tx
621+
.exec_iter(sql::delete_tablet_chunk(multitenant), params)
622+
.await?;
623+
Ok(deleted_count as usize)
624+
})
625+
.await
626+
}
601627
}
602628

603629
#[derive(Clone)]

crates/mysql/src/sql.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,28 @@ pub fn delete_document_chunk(chunk_size: usize, multitenant: bool) -> &'static s
541541
.unwrap()
542542
}
543543

544+
pub const DELETE_TABLE_COLUMN_COUNT: usize = 2;
545+
static DELETE_TABLET_CHUNK_QUERIES: LazyLock<HashMap<bool, String>> = LazyLock::new(|| {
546+
[false, true]
547+
.into_iter()
548+
.map(move |multitenant| {
549+
let where_clause = if multitenant {
550+
"(table_id = ? AND instance_name = ?)"
551+
} else {
552+
"(table_id = ?)"
553+
};
554+
(
555+
multitenant,
556+
format!("DELETE FROM @db_name.documents WHERE {where_clause} LIMIT ?",),
557+
)
558+
})
559+
.collect()
560+
});
561+
562+
pub fn delete_tablet_chunk(multitenant: bool) -> &'static str {
563+
DELETE_TABLET_CHUNK_QUERIES.get(&multitenant).unwrap()
564+
}
565+
544566
pub const fn write_persistence_global(multitenant: bool) -> &'static str {
545567
tableify!(
546568
multitenant,

crates/postgres/src/lib.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,29 @@ impl Persistence for PostgresPersistence {
724724
.await
725725
}
726726

727+
async fn delete_tablet_documents(
728+
&self,
729+
tablet_id: TabletId,
730+
chunk_size: usize,
731+
) -> anyhow::Result<usize> {
732+
let multitenant = self.multitenant;
733+
let instance_name = self.instance_name.clone();
734+
self.lease
735+
.transact(async move |tx| {
736+
let delete_doc = tx
737+
.prepare_cached(sql::delete_tablet_chunk(multitenant))
738+
.await?;
739+
let mut params = vec![Param::Bytes(tablet_id.0.into())];
740+
if multitenant {
741+
params.push(Param::Text(instance_name.to_string()));
742+
}
743+
params.push(Param::Limit(chunk_size as i64));
744+
let deleted_count = tx.execute_raw(&delete_doc, params).await?;
745+
Ok(deleted_count as usize)
746+
})
747+
.await
748+
}
749+
727750
async fn import_documents_batch(
728751
&self,
729752
mut documents: BoxStream<'_, Vec<DocumentLogEntry>>,

crates/postgres/src/sql.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,26 @@ DELETE FROM @db_name.documents WHERE
629629
)
630630
}
631631

632+
pub const fn delete_tablet_chunk(multitenant: bool) -> &'static str {
633+
tableify!(multitenant, {
634+
const LIMIT_PARAM: i32 = if multitenant { 3 } else { 2 };
635+
formatcp!(
636+
r#"DELETE FROM @db_name.documents WHERE table_id = $1 AND id IN (
637+
SELECT id FROM @db_name.documents
638+
WHERE table_id = $1{instance_clause}
639+
LIMIT ${limit_param}
640+
){instance_clause}
641+
"#,
642+
instance_clause = if multitenant {
643+
" AND instance_name = $2"
644+
} else {
645+
""
646+
},
647+
limit_param = LIMIT_PARAM,
648+
)
649+
})
650+
}
651+
632652
pub const fn write_persistence_global(multitenant: bool) -> &'static str {
633653
tableify!(
634654
multitenant,

crates/sqlite/src/lib.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,24 @@ impl Persistence for SqlitePersistence {
421421
tx.commit()?;
422422
Ok(count_deleted)
423423
}
424+
425+
async fn delete_tablet_documents(
426+
&self,
427+
tablet_id: TabletId,
428+
chunk_size: usize,
429+
) -> anyhow::Result<usize> {
430+
let mut inner = self.inner.lock();
431+
let tx = inner.connection.transaction()?;
432+
let mut delete_table_documents_query = tx.prepare_cached(DELETE_TABLE_DOCUMENTS)?;
433+
let count_deleted = delete_table_documents_query.execute(params![
434+
&tablet_id.0[..],
435+
&tablet_id.0[..],
436+
chunk_size,
437+
])?;
438+
drop(delete_table_documents_query);
439+
tx.commit()?;
440+
Ok(count_deleted)
441+
}
424442
}
425443

426444
#[async_trait]
@@ -682,6 +700,9 @@ const DELETE_INDEX: &str = "DELETE FROM indexes WHERE index_id = ? AND ts <= ? A
682700

683701
const DELETE_DOCUMENT: &str = "DELETE FROM documents WHERE table_id = ? AND id = ? AND ts <= ?";
684702

703+
const DELETE_TABLE_DOCUMENTS: &str = "DELETE FROM documents WHERE table_id = ? AND id IN (SELECT \
704+
id FROM documents WHERE table_id = ? LIMIT ?)";
705+
685706
const PREV_REV_QUERY: &str = r#"
686707
SELECT id, ts, table_id, json_value, deleted, prev_ts
687708
FROM documents

0 commit comments

Comments
 (0)