Skip to content

Commit cacbe5b

Browse files
goffrieConvex, Inc.
authored andcommitted
Make prev_ts non-optional in DocumentUpdateWithPrevTs (#34636)
GitOrigin-RevId: fd9ea18f01f57c65b1d8d0946a0b71dd2906e678
1 parent e3f55dc commit cacbe5b

File tree

5 files changed

+21
-53
lines changed

5 files changed

+21
-53
lines changed

crates/common/src/document.rs

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -593,29 +593,10 @@ pub struct DocumentUpdateWithPrevTs {
593593
pub id: ResolvedDocumentId,
594594
/// The old document and its timestamp in the document log.
595595
/// The timestamp will become the update's `prev_ts`.
596-
// TODO: make the timestamp non-optional after everything has pushed
597-
pub old_document: Option<(ResolvedDocument, Option<Timestamp>)>,
596+
pub old_document: Option<(ResolvedDocument, Timestamp)>,
598597
pub new_document: Option<ResolvedDocument>,
599598
}
600599

601-
impl DocumentUpdateWithPrevTs {
602-
/// Checks if two DocumentUpdates are almost equal, ignoring the case where
603-
/// one has a missing old timestamp
604-
// TODO: remove this once the old timestamp is non-optional
605-
pub fn eq_ignoring_none_old_ts(&self, other: &DocumentUpdateWithPrevTs) -> bool {
606-
self.id == other.id
607-
&& self.old_document.as_ref().map(|(d, _)| d)
608-
== other.old_document.as_ref().map(|(d, _)| d)
609-
&& self
610-
.old_document
611-
.as_ref()
612-
.map(|(_, ts)| ts)
613-
.zip(other.old_document.as_ref().map(|(_, ts)| ts))
614-
.is_none_or(|(a, b)| a == b)
615-
&& self.new_document == other.new_document
616-
}
617-
}
618-
619600
impl HeapSize for DocumentUpdateWithPrevTs {
620601
fn heap_size(&self) -> usize {
621602
self.old_document.heap_size() + self.new_document.heap_size()
@@ -636,7 +617,7 @@ impl TryFrom<DocumentUpdateWithPrevTs> for DocumentUpdateWithPrevTsProto {
636617
Ok(Self {
637618
id: Some(id.into()),
638619
old_document: old_document.map(|d| d.try_into()).transpose()?,
639-
old_ts: old_ts.flatten().map(|ts| ts.into()),
620+
old_ts: old_ts.map(|ts| ts.into()),
640621
new_document: new_document.map(|d| d.try_into()).transpose()?,
641622
})
642623
}
@@ -659,7 +640,12 @@ impl TryFrom<DocumentUpdateWithPrevTsProto> for DocumentUpdateWithPrevTs {
659640
Ok(Self {
660641
id,
661642
old_document: old_document
662-
.map(|d| anyhow::Ok((d.try_into()?, old_ts.map(Timestamp::try_from).transpose()?)))
643+
.map(|d| {
644+
anyhow::Ok((
645+
d.try_into()?,
646+
Timestamp::try_from(old_ts.context("old_ts missing")?)?,
647+
))
648+
})
663649
.transpose()?,
664650
new_document: new_document.map(|d| d.try_into()).transpose()?,
665651
})

crates/database/src/committer.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -696,10 +696,7 @@ impl<RT: Runtime> Committer<RT> {
696696
document: document_update.new_document.clone(),
697697
},
698698
doc_in_vector_index,
699-
prev_ts: document_update
700-
.old_document
701-
.as_ref()
702-
.and_then(|&(_, ts)| ts),
699+
prev_ts: document_update.old_document.as_ref().map(|&(_, ts)| ts),
703700
});
704701
}
705702
let index_writes = index_writes

crates/database/src/transaction.rs

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ impl<RT: Runtime> Transaction<RT> {
474474
// already written to in this transaction.
475475
if let Some(existing_update) = existing_updates.get(&id) {
476476
anyhow::ensure!(
477-
existing_update.eq_ignoring_none_old_ts(&update),
477+
*existing_update == update,
478478
"Conflicting updates for document {id}"
479479
);
480480
preserved_update_count += 1;
@@ -490,11 +490,11 @@ impl<RT: Runtime> Transaction<RT> {
490490
self.next_creation_time.increment()?;
491491
}
492492
}
493-
self.apply_validated_write_maybe_old_ts(
493+
self.apply_validated_write(
494494
id,
495495
update
496496
.old_document
497-
.map(|(d, ts)| (d, ts.map(WriteTimestamp::Committed))),
497+
.map(|(d, ts)| (d, WriteTimestamp::Committed(ts))),
498498
update.new_document,
499499
)?;
500500
}
@@ -935,21 +935,6 @@ impl<RT: Runtime> Transaction<RT> {
935935
id: ResolvedDocumentId,
936936
old_document_and_ts: Option<(ResolvedDocument, WriteTimestamp)>,
937937
new_document: Option<ResolvedDocument>,
938-
) -> anyhow::Result<()> {
939-
self.apply_validated_write_maybe_old_ts(
940-
id,
941-
old_document_and_ts.map(|(d, ts)| (d, Some(ts))),
942-
new_document,
943-
)
944-
}
945-
946-
// TODO: make WriteTimestamp non-optional and merge with
947-
// `apply_validated_write`
948-
pub(crate) fn apply_validated_write_maybe_old_ts(
949-
&mut self,
950-
id: ResolvedDocumentId,
951-
old_document_and_ts: Option<(ResolvedDocument, Option<WriteTimestamp>)>,
952-
new_document: Option<ResolvedDocument>,
953938
) -> anyhow::Result<()> {
954939
// Implement something like two-phase commit between the index and the document
955940
// store. We first guarantee that the changes are valid for the index and

crates/database/src/writes.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ impl Writes {
208208
is_system_document: bool,
209209
reads: &mut TransactionReadSet,
210210
document_id: ResolvedDocumentId,
211-
old_document: Option<(ResolvedDocument, Option<WriteTimestamp>)>,
211+
old_document: Option<(ResolvedDocument, WriteTimestamp)>,
212212
new_document: Option<ResolvedDocument>,
213213
) -> anyhow::Result<()> {
214214
if old_document.is_none() {
@@ -281,7 +281,7 @@ impl Writes {
281281
document's old update"
282282
);
283283
anyhow::ensure!(
284-
[None, Some(WriteTimestamp::Pending)].contains(&old_document_ts.flatten()),
284+
[None, Some(WriteTimestamp::Pending)].contains(&old_document_ts),
285285
"Inconsistent update: The new document's old update timestamp should be Pending \
286286
but is {:?}",
287287
old_document_ts
@@ -295,14 +295,13 @@ impl Writes {
295295
old_document: match old_document {
296296
Some((d, ts)) => Some((
297297
d,
298-
ts.map(|ts| match ts {
299-
WriteTimestamp::Committed(ts) => Ok(ts),
298+
match ts {
299+
WriteTimestamp::Committed(ts) => ts,
300300
WriteTimestamp::Pending => anyhow::bail!(
301301
"Old document timestamp is Pending, but there is no pending \
302302
write"
303303
),
304-
})
305-
.transpose()?,
304+
},
306305
)),
307306
None => None,
308307
},
@@ -635,7 +634,7 @@ mod tests {
635634
id,
636635
Some((
637636
old_document.clone(),
638-
Some(WriteTimestamp::Committed(Timestamp::must(123))),
637+
WriteTimestamp::Committed(Timestamp::must(123)),
639638
)),
640639
Some(new_document.clone()),
641640
)?;
@@ -649,7 +648,7 @@ mod tests {
649648
false,
650649
&mut reads,
651650
id,
652-
Some((new_document, Some(WriteTimestamp::Pending))),
651+
Some((new_document, WriteTimestamp::Pending)),
653652
Some(newer_document.clone()),
654653
)?;
655654

@@ -660,7 +659,7 @@ mod tests {
660659
id,
661660
DocumentUpdateWithPrevTs {
662661
id,
663-
old_document: Some((old_document, Some(Timestamp::must(123)))),
662+
old_document: Some((old_document, Timestamp::must(123))),
664663
new_document: Some(newer_document),
665664
}
666665
)

crates/pb/protos/common.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ message DocumentUpdate {
4848
message DocumentUpdateWithPrevTs {
4949
ResolvedDocumentId id = 3;
5050
ResolvedDocument old_document = 1;
51+
// `old_ts` should be present iff `old_document` is present
5152
optional uint64 old_ts = 4;
5253
ResolvedDocument new_document = 2;
5354
}

0 commit comments

Comments
 (0)