Skip to content

Commit d8b7609

Browse files
authored
Refactor worker status calculation (#2173)
* refactor worker status computation * test fixes * Apply suggestions from code review
1 parent f8d48d4 commit d8b7609

File tree

55 files changed

+1877
-2371
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1877
-2371
lines changed

cli/golem-cli/src/model/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ pub struct WorkerMetadataView {
463463
pub env: HashMap<String, String>,
464464
pub status: golem_client::model::WorkerStatus,
465465
pub component_version: u64,
466-
pub retry_count: u64,
466+
pub retry_count: u32,
467467

468468
pub pending_invocation_count: u64,
469469
pub updates: Vec<golem_client::model::UpdateRecord>,
@@ -516,7 +516,7 @@ pub struct WorkerMetadata {
516516
pub env: HashMap<String, String>,
517517
pub status: golem_client::model::WorkerStatus,
518518
pub component_version: u64,
519-
pub retry_count: u64,
519+
pub retry_count: u32,
520520
pub pending_invocation_count: u64,
521521
pub updates: Vec<golem_client::model::UpdateRecord>,
522522
pub created_at: DateTime<Utc>,

cli/golem-cli/src/model/text/fmt.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ pub fn format_status(status: &WorkerStatus) -> String {
221221
.to_string()
222222
}
223223

224-
pub fn format_retry_count(retry_count: &u64) -> String {
224+
pub fn format_retry_count(retry_count: &u32) -> String {
225225
if *retry_count == 0 {
226226
retry_count.to_string()
227227
} else {

golem-api-grpc/proto/golem/worker/worker_metadata.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ message WorkerMetadata {
1919
map<string, string> env = 4;
2020
WorkerStatus status = 5;
2121
uint64 component_version = 6;
22-
uint64 retry_count = 7;
22+
uint32 retry_count = 7;
2323
uint64 pending_invocation_count = 8;
2424
repeated UpdateRecord updates = 9;
2525
google.protobuf.Timestamp created_at = 10;

golem-common/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ pub mod retries;
5959
#[cfg(feature = "serialization")]
6060
pub mod serialization;
6161

62+
pub mod read_only_lock;
63+
6264
#[cfg(feature = "observability")]
6365
pub mod tracing;
6466

golem-common/src/model/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,7 @@ pub struct WorkerStatusRecord {
625625
/// The component version at the starting point of the replay. Will be the version of the Create oplog entry
626626
/// if only automatic updates were used or the version of the latest snapshot-based update
627627
pub component_version_for_replay: ComponentVersion,
628+
pub current_retry_count: u32,
628629
}
629630

630631
impl<Context> bincode::Decode<Context> for WorkerStatusRecord {
@@ -647,6 +648,7 @@ impl<Context> bincode::Decode<Context> for WorkerStatusRecord {
647648
active_plugins: Decode::decode(decoder)?,
648649
deleted_regions: Decode::decode(decoder)?,
649650
component_version_for_replay: Decode::decode(decoder)?,
651+
current_retry_count: Decode::decode(decoder)?,
650652
})
651653
}
652654
}
@@ -672,6 +674,7 @@ impl<'de, Context> BorrowDecode<'de, Context> for WorkerStatusRecord {
672674
active_plugins: BorrowDecode::borrow_decode(decoder)?,
673675
deleted_regions: BorrowDecode::borrow_decode(decoder)?,
674676
component_version_for_replay: BorrowDecode::borrow_decode(decoder)?,
677+
current_retry_count: BorrowDecode::borrow_decode(decoder)?,
675678
})
676679
}
677680
}
@@ -696,6 +699,7 @@ impl Default for WorkerStatusRecord {
696699
active_plugins: HashSet::new(),
697700
deleted_regions: DeletedRegions::new(),
698701
component_version_for_replay: 0,
702+
current_retry_count: 0,
699703
}
700704
}
701705
}

golem-common/src/model/oplog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl OplogIndexRange {
6464
}
6565
}
6666

67-
#[derive(Clone)]
67+
#[derive(Debug, Clone)]
6868
pub struct AtomicOplogIndex(Arc<AtomicU64>);
6969

7070
impl AtomicOplogIndex {

golem-common/src/read_only_lock.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2024-2025 Golem Cloud
2+
//
3+
// Licensed under the Golem Source License v1.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://license.golem.cloud/LICENSE
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
pub mod std {
16+
use std::sync::Arc;
17+
use std::sync::{RwLock, RwLockReadGuard};
18+
19+
/// Version of std::sync::RwLock that only allows read access to the underlying data.
20+
/// Useful if you want to limit which parts of the code are allowed to modify certain data.
21+
pub struct ReadOnlyLock<T> {
22+
inner: Arc<RwLock<T>>,
23+
}
24+
25+
impl<T> ReadOnlyLock<T> {
26+
pub fn new(underlying: Arc<RwLock<T>>) -> Self {
27+
Self {
28+
inner: underlying.clone(),
29+
}
30+
}
31+
32+
pub fn read(&self) -> RwLockReadGuard<'_, T> {
33+
self.inner.read().unwrap()
34+
}
35+
}
36+
37+
impl<T> Clone for ReadOnlyLock<T> {
38+
fn clone(&self) -> Self {
39+
Self {
40+
inner: self.inner.clone(),
41+
}
42+
}
43+
}
44+
}
45+
46+
#[cfg(feature = "tokio")]
47+
pub mod tokio {
48+
use std::sync::Arc;
49+
use tokio::sync::{RwLock, RwLockReadGuard};
50+
51+
/// Version of tokio::sync::RwLock that only allows read access to the underlying data.
52+
/// Useful if you want to limit which parts of the code are allowed to modify certain data.
53+
pub struct ReadOnlyLock<T> {
54+
inner: Arc<RwLock<T>>,
55+
}
56+
57+
impl<T> ReadOnlyLock<T> {
58+
pub fn new(underlying: Arc<RwLock<T>>) -> Self {
59+
Self {
60+
inner: underlying.clone(),
61+
}
62+
}
63+
64+
pub async fn read(&self) -> RwLockReadGuard<'_, T> {
65+
self.inner.read().await
66+
}
67+
}
68+
69+
impl<T> Clone for ReadOnlyLock<T> {
70+
fn clone(&self) -> Self {
71+
Self {
72+
inner: self.inner.clone(),
73+
}
74+
}
75+
}
76+
}

golem-common/src/retries.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,16 @@ use tracing::{error, info, warn, Level};
2626
/// Returns the delay to be waited before the next retry attempt.
2727
/// To be called after a failed attempt, with the number of attempts so far.
2828
/// Returns None if the maximum number of attempts has been reached.
29-
pub fn get_delay(config: &RetryConfig, attempts: u64) -> Option<Duration> {
29+
pub fn get_delay(config: &RetryConfig, attempts: u32) -> Option<Duration> {
3030
// Exponential backoff algorithm inspired by fred::pool::ReconnectPolicy::Exponential
3131
// Unlike fred, max jitter is not a static value, rather proportional to the current calculated delay
32-
if attempts >= (config.max_attempts as u64) {
32+
if attempts >= config.max_attempts {
3333
return None;
3434
}
3535

3636
let delay_with_opt_jitter = {
3737
let base_delay = (config.multiplier as u64)
38-
.saturating_pow(attempts.saturating_sub(1).try_into().unwrap_or(0))
38+
.saturating_pow(attempts.saturating_sub(1))
3939
.saturating_mul(config.min_delay.as_millis() as u64);
4040

4141
match config.max_jitter_factor {
@@ -61,7 +61,7 @@ pub fn get_delay(config: &RetryConfig, attempts: u64) -> Option<Duration> {
6161
/// Before attempting to perform the retriable action, call `start_attempt`. If it fails,
6262
/// call `failed_attempt` and if that returns true, start a new attempt immediately.
6363
pub struct RetryState<'a> {
64-
attempts: u64,
64+
attempts: u32,
6565
retry_config: &'a RetryConfig,
6666
}
6767

@@ -309,7 +309,7 @@ mod tests {
309309
}
310310
}
311311

312-
fn capture_delays(config: &RetryConfig, attempts: &mut u64, delays: &mut Vec<Duration>) {
312+
fn capture_delays(config: &RetryConfig, attempts: &mut u32, delays: &mut Vec<Duration>) {
313313
loop {
314314
*attempts += 1;
315315
let delay = super::get_delay(config, *attempts);

golem-debugging-service/src/debug_context.rs

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@ use golem_common::model::agent::AgentId;
1919
use golem_common::model::invocation_context::{
2020
self, AttributeValue, InvocationContextStack, SpanId,
2121
};
22-
use golem_common::model::oplog::UpdateDescription;
22+
use golem_common::model::oplog::{TimestampedUpdateDescription, UpdateDescription};
2323
use golem_common::model::{
2424
AccountId, ComponentFilePath, ComponentVersion, GetFileSystemNodeResult, IdempotencyKey,
25-
OwnedWorkerId, PluginInstallationId, ProjectId, WorkerId, WorkerMetadata, WorkerStatus,
26-
WorkerStatusRecord,
25+
OwnedWorkerId, PluginInstallationId, ProjectId, WorkerId, WorkerStatusRecord,
2726
};
2827
use golem_service_base::error::worker_executor::{InterruptKind, WorkerExecutorError};
2928
use golem_wasm_rpc::golem_rpc_0_2_x::types::{
@@ -58,7 +57,7 @@ use golem_worker_executor::services::worker::WorkerService;
5857
use golem_worker_executor::services::worker_event::WorkerEventService;
5958
use golem_worker_executor::services::worker_fork::WorkerForkService;
6059
use golem_worker_executor::services::worker_proxy::WorkerProxy;
61-
use golem_worker_executor::services::{worker_enumeration, HasAll, HasConfig, HasOplogService};
60+
use golem_worker_executor::services::{worker_enumeration, HasAll};
6261
use golem_worker_executor::worker::{RetryDecision, Worker};
6362
use golem_worker_executor::workerctx::{
6463
DynamicLinking, ExternalOperations, FileSystemReading, FuelManagement, HasWasiConfigVars,
@@ -121,14 +120,6 @@ impl ExternalOperations<Self> for DebugContext {
121120
.await
122121
}
123122

124-
async fn compute_latest_worker_status<This: HasOplogService + HasConfig + Send + Sync>(
125-
this: &This,
126-
worker_id: &OwnedWorkerId,
127-
metadata: &Option<WorkerMetadata>,
128-
) -> Result<WorkerStatusRecord, WorkerExecutorError> {
129-
DurableWorkerCtx::<Self>::compute_latest_worker_status(this, worker_id, metadata).await
130-
}
131-
132123
async fn resume_replay(
133124
store: &mut (impl AsContextMut<Data = Self> + Send),
134125
instance: &Instance,
@@ -166,23 +157,6 @@ impl ExternalOperations<Self> for DebugContext {
166157
) -> Result<(), Error> {
167158
DurableWorkerCtx::<Self>::on_shard_assignment_changed(this).await
168159
}
169-
170-
async fn on_worker_update_failed_to_start<T: HasAll<Self> + Send + Sync>(
171-
this: &T,
172-
account_id: &AccountId,
173-
owned_worker_id: &OwnedWorkerId,
174-
target_version: ComponentVersion,
175-
details: Option<String>,
176-
) -> Result<(), WorkerExecutorError> {
177-
DurableWorkerCtx::<Self>::on_worker_update_failed_to_start(
178-
this,
179-
account_id,
180-
owned_worker_id,
181-
target_version,
182-
details,
183-
)
184-
.await
185-
}
186160
}
187161

188162
#[async_trait]
@@ -227,29 +201,13 @@ impl StatusManagement for DebugContext {
227201
}
228202
}
229203

230-
async fn set_suspended(&self) -> Result<(), WorkerExecutorError> {
231-
self.durable_ctx.set_suspended().await
204+
fn set_suspended(&self) {
205+
self.durable_ctx.set_suspended()
232206
}
233207

234208
fn set_running(&self) {
235209
self.durable_ctx.set_running()
236210
}
237-
238-
async fn get_worker_status(&self) -> WorkerStatus {
239-
self.durable_ctx.get_worker_status().await
240-
}
241-
242-
async fn store_worker_status(&self, status: WorkerStatus) {
243-
self.durable_ctx.store_worker_status(status).await
244-
}
245-
246-
async fn update_pending_invocations(&self) {
247-
self.durable_ctx.update_pending_invocations().await
248-
}
249-
250-
async fn update_pending_updates(&self) {
251-
self.durable_ctx.update_pending_updates().await
252-
}
253211
}
254212

255213
#[async_trait]
@@ -588,6 +546,7 @@ impl WorkerCtx for DebugContext {
588546
project_service: Arc<dyn ProjectService>,
589547
agent_types_service: Arc<dyn AgentTypesService>,
590548
shard_service: Arc<dyn ShardService>,
549+
pending_update: Option<TimestampedUpdateDescription>,
591550
) -> Result<Self, WorkerExecutorError> {
592551
let golem_ctx = DurableWorkerCtx::create(
593552
owned_worker_id,
@@ -615,6 +574,7 @@ impl WorkerCtx for DebugContext {
615574
project_service,
616575
agent_types_service,
617576
shard_service,
577+
pending_update,
618578
)
619579
.await?;
620580
Ok(Self {

golem-debugging-service/src/oplog/debug_oplog.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@ use crate::debug_session::{DebugSessionId, DebugSessions};
1616
use async_trait::async_trait;
1717
use bytes::Bytes;
1818
use golem_common::model::oplog::{OplogEntry, OplogIndex, OplogPayload};
19-
use golem_common::model::WorkerMetadata;
20-
use golem_worker_executor::model::ExecutionStatus;
2119
use golem_worker_executor::services::oplog::{CommitLevel, Oplog};
22-
use std::collections::HashMap;
20+
use std::collections::{BTreeMap, HashMap};
2321
use std::fmt::Debug;
2422
use std::sync::Arc;
2523
use std::time::Duration;
@@ -34,14 +32,10 @@ impl DebugOplog {
3432
inner: Arc<dyn Oplog>,
3533
debug_session_id: DebugSessionId,
3634
debug_session: Arc<dyn DebugSessions>,
37-
execution_status: Arc<std::sync::RwLock<ExecutionStatus>>,
38-
initial_worker_metadata: WorkerMetadata,
3935
) -> Self {
4036
let oplog_state = DebugOplogState {
4137
debug_session_id,
4238
debug_session,
43-
_execution_status: execution_status,
44-
_initial_worker_metadata: initial_worker_metadata,
4539
};
4640

4741
Self { inner, oplog_state }
@@ -69,8 +63,6 @@ impl Debug for DebugOplog {
6963
pub struct DebugOplogState {
7064
debug_session_id: DebugSessionId,
7165
debug_session: Arc<dyn DebugSessions + Send + Sync>,
72-
_execution_status: Arc<std::sync::RwLock<ExecutionStatus>>,
73-
_initial_worker_metadata: WorkerMetadata,
7466
}
7567

7668
#[async_trait]
@@ -82,7 +74,9 @@ impl Oplog for DebugOplog {
8274
async fn drop_prefix(&self, _last_dropped_id: OplogIndex) {}
8375

8476
// There is no need to commit anything to the indexed storage
85-
async fn commit(&self, _level: CommitLevel) {}
77+
async fn commit(&self, _level: CommitLevel) -> BTreeMap<OplogIndex, OplogEntry> {
78+
BTreeMap::new()
79+
}
8680

8781
// Current Oplog Index acts as the Replay Target
8882
// In a new worker, ReplayState begins with last_replayed_index

0 commit comments

Comments
 (0)