Skip to content

Commit b6cf581

Browse files
authored
Merge pull request #2175 from golemcloud/retry-improvements
Retry logic improvements
2 parents 032f799 + e7350bb commit b6cf581

File tree

68 files changed

+1370
-746
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+1370
-746
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -481,10 +481,11 @@ dependencies = [
481481
[tasks.integration-tests-group1]
482482
description = "Runs integration tests only"
483483
dependencies = ["build-bins"]
484-
env = { "RUST_LOG" = "info", "RUST_BACKTRACE" = "1" }
484+
env = { "RUST_LOG" = "debug", "RUST_BACKTRACE" = "1" }
485485
script = '''
486-
cargo test --package integration-tests --test integration -- --nocapture --report-time $JUNIT_OPTS
486+
cargo test --package integration-tests --test integration -- --nocapture --test-threads=1 --report-time
487487
'''
488+
# cargo test --package integration-tests --test integration -- --nocapture --report-time $JUNIT_OPTS
488489

489490
[tasks.integration-tests-group2]
490491
description = "Runs component service integration tests only"

golem-api-grpc/proto/golem/worker/public_oplog.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ message ExportedFunctionCompletedParameters {
119119
message ErrorParameters {
120120
google.protobuf.Timestamp timestamp = 1;
121121
string error = 2;
122+
uint64 retry_from = 3;
122123
}
123124

124125
message JumpParameters {

golem-common/src/base_model.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,3 +305,40 @@ impl From<String> for TransactionId {
305305
TransactionId(value)
306306
}
307307
}
308+
309+
#[cfg(feature = "sql")]
310+
mod sql {
311+
use crate::model::TransactionId;
312+
use sqlx::encode::IsNull;
313+
use sqlx::error::BoxDynError;
314+
use sqlx::postgres::PgTypeInfo;
315+
use sqlx::{Database, Postgres, Type};
316+
use std::io::Write;
317+
318+
impl sqlx::Decode<'_, Postgres> for TransactionId {
319+
fn decode(value: <Postgres as Database>::ValueRef<'_>) -> Result<Self, BoxDynError> {
320+
let bytes = value.as_bytes()?;
321+
Ok(TransactionId(
322+
u64::from_be_bytes(bytes.try_into()?).to_string(),
323+
))
324+
}
325+
}
326+
327+
impl sqlx::Encode<'_, Postgres> for TransactionId {
328+
fn encode_by_ref(
329+
&self,
330+
buf: &mut <Postgres as Database>::ArgumentBuffer<'_>,
331+
) -> Result<IsNull, BoxDynError> {
332+
let u64 = self.0.parse::<u64>()?;
333+
let bytes = u64.to_be_bytes();
334+
buf.write_all(&bytes)?;
335+
Ok(IsNull::No)
336+
}
337+
}
338+
339+
impl Type<Postgres> for TransactionId {
340+
fn type_info() -> PgTypeInfo {
341+
PgTypeInfo::with_name("xid8")
342+
}
343+
}
344+
}

golem-common/src/model/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,9 @@ pub struct WorkerStatusRecord {
625625
/// The component version at the starting point of the replay. Will be the version of the Create oplog entry
626626
/// if only automatic updates were used or the version of the latest snapshot-based update
627627
pub component_version_for_replay: ComponentVersion,
628-
pub current_retry_count: u32,
628+
/// The number of encountered error entries grouped by their 'retry_from' index, calculated from
629+
/// the last invocation boundary.
630+
pub current_retry_count: HashMap<OplogIndex, u32>,
629631
}
630632

631633
impl<Context> bincode::Decode<Context> for WorkerStatusRecord {
@@ -699,7 +701,7 @@ impl Default for WorkerStatusRecord {
699701
active_plugins: HashSet::new(),
700702
deleted_regions: DeletedRegions::new(),
701703
component_version_for_replay: 0,
702-
current_retry_count: 0,
704+
current_retry_count: HashMap::new(),
703705
}
704706
}
705707
}

golem-common/src/model/oplog.rs

Lines changed: 84 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,11 @@ pub enum OplogEntry {
287287
Error {
288288
timestamp: Timestamp,
289289
error: WorkerError,
290+
/// Points to the oplog index where the retry should start from. Normally this can be just the
291+
/// current oplog index (after the last persisted side-effect). When failing in an atomic region
292+
/// or batched remote writes, this should point to the start of the region.
293+
/// When counting the number of retries for a specific error, the error entries are grouped by this index.
294+
retry_from: OplogIndex,
290295
},
291296
/// Marker entry added when get-oplog-index is called from the worker, to make the jumping behavior
292297
/// more predictable.
@@ -455,22 +460,31 @@ pub enum OplogEntry {
455460
timestamp: Timestamp,
456461
level: PersistenceLevel,
457462
},
463+
/// Marks the beginning of a remote transaction
458464
BeginRemoteTransaction {
459465
timestamp: Timestamp,
460466
transaction_id: TransactionId,
467+
/// BeginRemoteTransaction entries need to be repeated on retries, because they may need a new
468+
/// transaction_id. The `begin_index` field always points to the original, first entry. This makes
469+
/// error grouping work. When None, this is the original begin entry.
470+
original_begin_index: Option<OplogIndex>,
461471
},
472+
/// Marks the point before a remote transaction is committed
462473
PreCommitRemoteTransaction {
463474
timestamp: Timestamp,
464475
begin_index: OplogIndex,
465476
},
477+
/// Marks the point before a remote transaction is rolled back
466478
PreRollbackRemoteTransaction {
467479
timestamp: Timestamp,
468480
begin_index: OplogIndex,
469481
},
482+
/// Marks the point after a remote transaction is committed
470483
CommittedRemoteTransaction {
471484
timestamp: Timestamp,
472485
begin_index: OplogIndex,
473486
},
487+
/// Marks the point after a remote transaction is rolled back
474488
RolledBackRemoteTransaction {
475489
timestamp: Timestamp,
476490
begin_index: OplogIndex,
@@ -526,10 +540,11 @@ impl OplogEntry {
526540
}
527541
}
528542

529-
pub fn error(error: WorkerError) -> OplogEntry {
543+
pub fn error(error: WorkerError, retry_from: OplogIndex) -> OplogEntry {
530544
OplogEntry::Error {
531545
timestamp: Timestamp::now_utc(),
532546
error,
547+
retry_from,
533548
}
534549
}
535550

@@ -718,10 +733,14 @@ impl OplogEntry {
718733
}
719734
}
720735

721-
pub fn begin_remote_transaction(transaction_id: TransactionId) -> OplogEntry {
736+
pub fn begin_remote_transaction(
737+
transaction_id: TransactionId,
738+
original_begin_index: Option<OplogIndex>,
739+
) -> OplogEntry {
722740
OplogEntry::BeginRemoteTransaction {
723741
timestamp: Timestamp::now_utc(),
724742
transaction_id,
743+
original_begin_index,
725744
}
726745
}
727746

@@ -761,6 +780,10 @@ impl OplogEntry {
761780
matches!(self, OplogEntry::EndRemoteWrite { begin_index, .. } if *begin_index == idx)
762781
}
763782

783+
pub fn is_end_remote_write_s<S>(&self, idx: OplogIndex, _: &S) -> bool {
784+
matches!(self, OplogEntry::EndRemoteWrite { begin_index, .. } if *begin_index == idx)
785+
}
786+
764787
pub fn is_pre_commit_remote_transaction(&self, idx: OplogIndex) -> bool {
765788
matches!(self, OplogEntry::PreCommitRemoteTransaction { begin_index, .. } if *begin_index == idx)
766789
}
@@ -773,43 +796,79 @@ impl OplogEntry {
773796
self.is_pre_commit_remote_transaction(idx) || self.is_pre_rollback_remote_transaction(idx)
774797
}
775798

799+
pub fn is_pre_remote_transaction_s<S>(&self, idx: OplogIndex, _: &S) -> bool {
800+
self.is_pre_commit_remote_transaction(idx) || self.is_pre_rollback_remote_transaction(idx)
801+
}
802+
776803
pub fn is_committed_remote_transaction(&self, idx: OplogIndex) -> bool {
777804
matches!(self, OplogEntry::CommittedRemoteTransaction { begin_index, .. } if *begin_index == idx)
778805
}
779806

807+
pub fn is_committed_remote_transaction_s<S>(&self, idx: OplogIndex, _: &S) -> bool {
808+
matches!(self, OplogEntry::CommittedRemoteTransaction { begin_index, .. } if *begin_index == idx)
809+
}
810+
780811
pub fn is_rolled_back_remote_transaction(&self, idx: OplogIndex) -> bool {
781812
matches!(self, OplogEntry::RolledBackRemoteTransaction { begin_index, .. } if *begin_index == idx)
782813
}
783814

815+
pub fn is_rolled_back_remote_transaction_s<S>(&self, idx: OplogIndex, _: &S) -> bool {
816+
matches!(self, OplogEntry::RolledBackRemoteTransaction { begin_index, .. } if *begin_index == idx)
817+
}
818+
784819
pub fn is_end_remote_transaction(&self, idx: OplogIndex) -> bool {
785820
self.is_committed_remote_transaction(idx) || self.is_rolled_back_remote_transaction(idx)
786821
}
787822

823+
pub fn is_end_remote_transaction_s<S>(&self, idx: OplogIndex, s: &S) -> bool {
824+
self.is_committed_remote_transaction_s(idx, s)
825+
|| self.is_rolled_back_remote_transaction_s(idx, s)
826+
}
827+
788828
/// Checks that an "intermediate oplog entry" between a `BeginRemoteWrite` and an `EndRemoteWrite`
789829
/// is not a RemoteWrite entry which does not belong to the batched remote write started at `idx`.
790-
pub fn no_concurrent_side_effect(&self, idx: OplogIndex) -> bool {
791-
match self {
792-
OplogEntry::ImportedFunctionInvoked {
793-
durable_function_type,
794-
..
795-
} => match durable_function_type {
796-
DurableFunctionType::WriteRemoteBatched(Some(begin_index))
797-
if *begin_index == idx =>
798-
{
799-
true
800-
}
801-
DurableFunctionType::WriteRemoteTransaction(Some(begin_index))
802-
if *begin_index == idx =>
803-
{
804-
true
805-
}
806-
DurableFunctionType::ReadLocal => true,
807-
DurableFunctionType::WriteLocal => true,
808-
DurableFunctionType::ReadRemote => true,
809-
_ => false,
810-
},
811-
OplogEntry::ExportedFunctionCompleted { .. } => false,
812-
_ => true,
830+
/// Side effects in a PersistenceLevel::PersistNothing region are ignored.
831+
pub fn no_concurrent_side_effect(
832+
&self,
833+
idx: OplogIndex,
834+
persistence_level: &PersistenceLevel,
835+
) -> bool {
836+
if persistence_level == &PersistenceLevel::PersistNothing {
837+
true
838+
} else {
839+
match self {
840+
OplogEntry::ImportedFunctionInvoked {
841+
durable_function_type,
842+
..
843+
} => match durable_function_type {
844+
DurableFunctionType::WriteRemoteBatched(Some(begin_index))
845+
if *begin_index == idx =>
846+
{
847+
true
848+
}
849+
DurableFunctionType::WriteRemoteTransaction(Some(begin_index))
850+
if *begin_index == idx =>
851+
{
852+
true
853+
}
854+
DurableFunctionType::ReadLocal => true,
855+
DurableFunctionType::WriteLocal => true,
856+
DurableFunctionType::ReadRemote => true,
857+
_ => false,
858+
},
859+
OplogEntry::ExportedFunctionCompleted { .. } => false,
860+
_ => true,
861+
}
862+
}
863+
}
864+
865+
pub fn track_persistence_level(
866+
&self,
867+
_idx: OplogIndex,
868+
persistence_level: &mut PersistenceLevel,
869+
) {
870+
if let OplogEntry::ChangePersistenceLevel { level, .. } = self {
871+
*persistence_level = *level
813872
}
814873
}
815874

golem-common/src/model/public_oplog/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,8 @@ pub struct TimestampParameter {
390390
pub struct ErrorParameters {
391391
pub timestamp: Timestamp,
392392
pub error: String,
393+
#[wit_field(skip)]
394+
pub retry_from: OplogIndex,
393395
}
394396

395397
#[derive(Clone, Debug, Serialize, PartialEq, Deserialize)]

golem-common/src/model/public_oplog/protobuf.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ impl TryFrom<golem_api_grpc::proto::golem::worker::OplogEntry> for PublicOplogEn
226226
oplog_entry::Entry::Error(error) => Ok(PublicOplogEntry::Error(ErrorParameters {
227227
timestamp: error.timestamp.ok_or("Missing timestamp field")?.into(),
228228
error: error.error,
229+
retry_from: OplogIndex::from_u64(error.retry_from),
229230
})),
230231
oplog_entry::Entry::NoOp(no_op) => Ok(PublicOplogEntry::NoOp(TimestampParameter {
231232
timestamp: no_op.timestamp.ok_or("Missing timestamp field")?.into(),
@@ -602,6 +603,7 @@ impl TryFrom<PublicOplogEntry> for golem_api_grpc::proto::golem::worker::OplogEn
602603
golem_api_grpc::proto::golem::worker::ErrorParameters {
603604
timestamp: Some(error.timestamp.into()),
604605
error: error.error,
606+
retry_from: error.retry_from.0,
605607
},
606608
)),
607609
},

golem-common/src/model/public_oplog/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ fn error_serialization_poem_serde_equivalence() {
180180
let entry = PublicOplogEntry::Error(ErrorParameters {
181181
timestamp: rounded_ts(Timestamp::now_utc()),
182182
error: "test".to_string(),
183+
retry_from: OplogIndex::INITIAL,
183184
});
184185
let serialized = entry.to_json_string();
185186
let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap();

golem-debugging-service/src/debug_context.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use crate::additional_deps::AdditionalDeps;
1616
use anyhow::Error;
1717
use async_trait::async_trait;
18+
use golem_common::base_model::OplogIndex;
1819
use golem_common::model::agent::AgentId;
1920
use golem_common::model::invocation_context::{
2021
self, AttributeValue, InvocationContextStack, SpanId,
@@ -237,6 +238,10 @@ impl InvocationHooks for DebugContext {
237238
.on_invocation_success(full_function_name, function_input, consumed_fuel, output)
238239
.await
239240
}
241+
242+
async fn get_current_retry_point(&self) -> OplogIndex {
243+
self.durable_ctx.get_current_retry_point().await
244+
}
240245
}
241246

242247
#[async_trait]

0 commit comments

Comments
 (0)