Skip to content

Commit 5e0c30e

Browse files
goffrieConvex, Inc.
authored andcommitted
Always update SnapshotManager::persisted_max_repeatable_ts after persisting it (#35978)
GitOrigin-RevId: a8ef6ecebe870fb26eb4e6fe607a8c1dea44c902
1 parent cdb38ac commit 5e0c30e

File tree

2 files changed

+27
-23
lines changed

2 files changed

+27
-23
lines changed

crates/database/src/committer.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use ::metrics::{
99
StatusTimer,
1010
Timer,
1111
};
12+
use anyhow::Context as _;
1213
use common::{
1314
bootstrap_model::tables::{
1415
TableMetadata,
@@ -176,9 +177,6 @@ pub struct Committer<RT: Runtime> {
176177

177178
last_assigned_ts: Timestamp,
178179

179-
// Allows us to send signal to the app to shutdown.
180-
shutdown: ShutdownSignal,
181-
182180
persistence_writes: FuturesOrdered<BoxFuture<'static, anyhow::Result<PersistenceWrite>>>,
183181

184182
retention_validator: Arc<dyn RetentionValidator>,
@@ -205,10 +203,17 @@ impl<RT: Runtime> Committer<RT> {
205203
runtime: runtime.clone(),
206204
last_assigned_ts: Timestamp::MIN,
207205
persistence_writes: FuturesOrdered::new(),
208-
shutdown,
209206
retention_validator: retention_validator.clone(),
210207
};
211-
let handle = runtime.spawn("committer", committer.go(rx));
208+
let handle = runtime.spawn("committer", async move {
209+
if let Err(err) = committer.go(rx).await {
210+
// Committer hit a fatal error. This should only happen if a
211+
// persistence write fails or in case of unrecoverable logic
212+
// errors.
213+
shutdown.signal(err);
214+
tracing::error!("Shutting down committer");
215+
}
216+
});
212217
CommitterClient {
213218
handle: Arc::new(Mutex::new(handle)),
214219
sender: tx,
@@ -218,7 +223,7 @@ impl<RT: Runtime> Committer<RT> {
218223
}
219224
}
220225

221-
async fn go(mut self, mut rx: mpsc::Receiver<CommitterMessage>) {
226+
async fn go(mut self, mut rx: mpsc::Receiver<CommitterMessage>) -> anyhow::Result<()> {
222227
let mut last_bumped_repeatable_ts = self.runtime.monotonic_now();
223228
// Assume there were commits just before the backend restarted, so first do a
224229
// quick bump.
@@ -263,15 +268,7 @@ impl<RT: Runtime> Committer<RT> {
263268
last_bumped_repeatable_ts = self.runtime.monotonic_now();
264269
}
265270
result = self.persistence_writes.select_next_some() => {
266-
let pending_commit = match result {
267-
Ok(pending_commit) => pending_commit,
268-
Err(err) => {
269-
self.shutdown.signal(err.context("Write failed. Unsure if transaction committed to disk."));
270-
// Exit the go routine, while we are shutting down.
271-
tracing::error!("Shutting down committer");
272-
return;
273-
},
274-
};
271+
let pending_commit = result.context("Write failed. Unsure if transaction committed to disk.")?;
275272
let pending_commit_id = pending_commit.commit_id();
276273
match pending_commit {
277274
PersistenceWrite::Commit {
@@ -303,7 +300,7 @@ impl<RT: Runtime> Committer<RT> {
303300
} => {
304301
let span = committer_span.as_ref().map(|root| Span::enter_with_parent("publish_max_repeatable_ts", root)).unwrap_or_else(Span::noop);
305302
span.set_local_parent();
306-
self.publish_max_repeatable_ts(new_max_repeatable);
303+
self.publish_max_repeatable_ts(new_max_repeatable)?;
307304
next_bump_wait = Some(*MAX_REPEATABLE_TIMESTAMP_IDLE_FREQUENCY);
308305
let _ = result.send(new_max_repeatable);
309306
drop(timer);
@@ -325,7 +322,7 @@ impl<RT: Runtime> Committer<RT> {
325322
match maybe_message {
326323
None => {
327324
tracing::info!("All clients have gone away, shutting down committer...");
328-
return;
325+
return Ok(());
329326
},
330327
Some(CommitterMessage::Commit {
331328
queue_timer,
@@ -616,17 +613,18 @@ impl<RT: Runtime> Committer<RT> {
616613
);
617614
}
618615

619-
fn publish_max_repeatable_ts(&mut self, new_max_repeatable: Timestamp) {
616+
fn publish_max_repeatable_ts(&mut self, new_max_repeatable: Timestamp) -> anyhow::Result<()> {
620617
// Bump the latest snapshot in snapshot_manager so reads on this leader
621618
// can know this timestamp is repeatable.
622619
let mut snapshot_manager = self.snapshot_manager.write();
623-
if snapshot_manager.bump_persisted_max_repeatable_ts(new_max_repeatable) {
620+
if snapshot_manager.bump_persisted_max_repeatable_ts(new_max_repeatable)? {
624621
self.log.append(
625622
new_max_repeatable,
626623
WithHeapSize::default(),
627624
"publish_max_repeatable_ts".into(),
628625
);
629626
}
627+
Ok(())
630628
}
631629

632630
/// First, check that it's valid to apply this transaction in-memory. If it

crates/database/src/snapshot_manager.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -565,14 +565,20 @@ impl SnapshotManager {
565565
self.versions.push_back((ts, snapshot));
566566
}
567567

568-
pub fn bump_persisted_max_repeatable_ts(&mut self, ts: Timestamp) -> bool {
568+
pub fn bump_persisted_max_repeatable_ts(&mut self, ts: Timestamp) -> anyhow::Result<bool> {
569+
anyhow::ensure!(
570+
ts >= self.persisted_max_repeatable_ts,
571+
"persisted_max_repeatable_ts went backward from {:?} to {:?}",
572+
self.persisted_max_repeatable_ts,
573+
ts
574+
);
575+
self.persisted_max_repeatable_ts = ts;
569576
let (latest_ts, snapshot) = self.latest();
570577
if ts > *latest_ts {
571578
self.push(ts, snapshot);
572-
self.persisted_max_repeatable_ts = ts;
573-
true
579+
Ok(true)
574580
} else {
575-
false
581+
Ok(false)
576582
}
577583
}
578584
}

0 commit comments

Comments
 (0)