Skip to content
Open
Show file tree
Hide file tree
Changes from 88 commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
d628cac
work towards failovering
segfault-magnet Mar 31, 2025
fed8b0f
signers are now cloneable
segfault-magnet Mar 31, 2025
6d48af2
fix mock setup
segfault-magnet Mar 31, 2025
aa09ebc
working almost as expected
segfault-magnet Mar 31, 2025
b490ae5
make client a generic param
segfault-magnet Apr 2, 2025
806907b
refactoring
segfault-magnet Apr 2, 2025
3532968
shift to mutex
segfault-magnet Apr 2, 2025
7fff72e
track state in arc
segfault-magnet Apr 2, 2025
ca56b27
move state under one mutex
segfault-magnet Apr 2, 2025
10d81a4
implement l1 provider trait for failover client
segfault-magnet Apr 2, 2025
4ded54a
use initializer as associated method
segfault-magnet Apr 2, 2025
fc550a6
max transient errors is configurable
segfault-magnet Apr 2, 2025
c221709
websocket client factory
segfault-magnet Apr 2, 2025
d9a1819
inline websocket connection
segfault-magnet Apr 2, 2025
97cb0ff
cleanup
segfault-magnet Apr 2, 2025
0318913
extracted metrics
segfault-magnet Apr 2, 2025
f05db45
extracted factory
segfault-magnet Apr 2, 2025
c0e6e55
fix imports
segfault-magnet Apr 2, 2025
4f58a56
group imports
segfault-magnet Apr 2, 2025
bdad767
rename
segfault-magnet Apr 2, 2025
1ea96f4
health checker
segfault-magnet Apr 2, 2025
ffca714
enable logs
segfault-magnet Apr 3, 2025
8aa7b3d
propagate multiple rpcs to config
segfault-magnet Apr 3, 2025
086c06b
window for detecting too many failed transactions
segfault-magnet Apr 3, 2025
580669d
tests
segfault-magnet Apr 3, 2025
584dc7c
remove tests
segfault-magnet Apr 3, 2025
cb79a53
fix expectations on mock
segfault-magnet Apr 6, 2025
8131fe4
add tests for note_tx_failure
segfault-magnet Apr 6, 2025
cb4b54d
remove multiple pending tx test since that currently cannot happen
segfault-magnet Apr 6, 2025
871ba18
tests compiling
segfault-magnet Apr 6, 2025
7b207dd
fix metrics not being tracked on transient errors and fatal
segfault-magnet Apr 6, 2025
af370ac
fix num of expectations
segfault-magnet Apr 6, 2025
d469d78
tests for parsing rpc config
segfault-magnet Apr 6, 2025
0ad90fc
update readme with failover info
segfault-magnet Apr 6, 2025
548c1c7
make failover config externally configurable
segfault-magnet Apr 6, 2025
b020580
update readme
segfault-magnet Apr 6, 2025
bb1b69b
use url instead of string in config
segfault-magnet Apr 6, 2025
5cb9127
remove unused imports
segfault-magnet Apr 6, 2025
f81b8ca
remove unused new method
segfault-magnet Apr 6, 2025
8b9d4b7
update comments
segfault-magnet Apr 6, 2025
e43e2b5
cleanup tests for rpc config reading
segfault-magnet Apr 6, 2025
0df3e8b
remove arc from signers
segfault-magnet Apr 6, 2025
f34e149
remove comment
segfault-magnet Apr 6, 2025
76a575e
remove comments
segfault-magnet Apr 6, 2025
737640f
remove unused setters on e2e committer wrapper
segfault-magnet Apr 6, 2025
7f25e96
remove debug statements
segfault-magnet Apr 6, 2025
07cc3dd
turn off logs in e2e
segfault-magnet Apr 6, 2025
aa42d4d
remove tracing subscriber dep
segfault-magnet Apr 6, 2025
3c34e67
revert uneccessary changes
segfault-magnet Apr 6, 2025
d594afb
dedup rpc config structs
segfault-magnet Apr 6, 2025
3cbf396
move failover env variables to separate struct
segfault-magnet Apr 7, 2025
79ee807
uncomment e2e tests
segfault-magnet Apr 7, 2025
818efcc
change naming 'rpc configs' in config to 'endpoints'
segfault-magnet Apr 7, 2025
4a20fa2
change http to wss
segfault-magnet Apr 7, 2025
ab76f76
endpoints to be NonEmpty
segfault-magnet Apr 7, 2025
e56f6a6
remove is empty check
segfault-magnet Apr 7, 2025
eb7ec2d
rename RpcEndpoint to endpoint
segfault-magnet Apr 7, 2025
7d3270b
group imports
segfault-magnet Apr 7, 2025
9485ab3
make naming around endpoints consistent between fuel and eth
segfault-magnet Apr 7, 2025
661f8ac
nits
segfault-magnet Apr 7, 2025
3d3b59c
move helpers below tests
segfault-magnet Apr 7, 2025
eac6f19
dry up tests
segfault-magnet Apr 7, 2025
00073f6
fix all providers not available failover test
segfault-magnet Apr 7, 2025
2c4df07
remove unused pub methods, improve tests
segfault-magnet Apr 7, 2025
24425e0
group health related fields into struct
segfault-magnet Apr 7, 2025
6fa14ca
move config to helpers
segfault-magnet Apr 7, 2025
6d1a00f
make eth transient error configurable
segfault-magnet Apr 7, 2025
336226d
endpoints to be part of the failover config
segfault-magnet Apr 7, 2025
cca73d1
add missing config in readme
segfault-magnet Apr 7, 2025
2a67aa1
update readme
segfault-magnet Apr 7, 2025
2df2440
fix compilation
segfault-magnet Apr 7, 2025
fa4e0bd
cqrs
segfault-magnet Apr 7, 2025
2a02cca
don't test internals
segfault-magnet Apr 7, 2025
7c17a6c
rwlock on window of tx times
segfault-magnet Apr 7, 2025
3db2806
move endpoints out of the failover config
segfault-magnet Apr 8, 2025
9437370
rename config structs
segfault-magnet Apr 8, 2025
d660432
remove unneeded comments
segfault-magnet Apr 8, 2025
6f38c01
add label to metrics to track errors per endpoint
segfault-magnet Apr 8, 2025
59b4c6e
separate out into files
segfault-magnet Apr 8, 2025
33e6074
cleaning up tests
segfault-magnet Apr 8, 2025
f08dbe7
fix test
segfault-magnet Apr 8, 2025
39fb050
cleanup tests
segfault-magnet Apr 8, 2025
9c6fb1a
don't access internals in test
segfault-magnet Apr 8, 2025
19e0a6b
remove redundant once
segfault-magnet Apr 8, 2025
2229f5a
remove unused method
segfault-magnet Apr 8, 2025
080cb43
remove comment from run tests script
segfault-magnet Apr 8, 2025
8d26bcb
Merge branch 'master' into feat/failover_api
segfault-magnet Apr 8, 2025
ebad1d2
final nits
segfault-magnet Apr 8, 2025
1c7c56c
Update e2e/helpers/src/committer.rs
segfault-magnet Apr 10, 2025
70316e5
bump transient errors in e2e committer setup
segfault-magnet Apr 10, 2025
b5cf1f3
rename to mempool drop
segfault-magnet Apr 10, 2025
69df084
merge usage of crate and super
segfault-magnet Apr 10, 2025
d7b9e8c
optimize window retention in the error tracker of the failover client
segfault-magnet Apr 11, 2025
eb64834
optimize too-many-tx-drops detection
segfault-magnet Apr 11, 2025
75fa395
Update packages/adapters/eth/src/failover/client.rs
segfault-magnet Apr 12, 2025
0a4c61f
save iterator instead of whole colection
segfault-magnet Apr 12, 2025
941911a
health tracking to be moved out of the client into a separate struct
segfault-magnet Apr 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 40 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The Fuel Block Committer is a standalone service dedicated to uploading Fuel Blo
- [Bundle Configuration](#bundle-configuration)
- [Configuration Validation](#configuration-validation)
- [Running the Fee Algo Simulator](#running-the-fee-algo-simulator)
- [RPC Failover](#rpc-failover)

## Building

Expand Down Expand Up @@ -77,20 +78,38 @@ The Fuel Block Committer is configured primarily through environment variables.
- **Format:** `Kms(<KEY_ARN>)` or `Private(<PRIVATE_KEY>)`
- **Example:** `Kms(arn:aws:kms:us-east-1:123456789012:key/efgh-5678)`

- **`COMMITTER__ETH__RPC`**
- **`COMMITTER__ETH__ENDPOINTS`**

- **Description:** URL to the Ethereum RPC endpoint.
- **Example:** `https://mainnet.infura.io/v3/YOUR_INFURA_PROJECT_ID`
- **Description:** Multiple Ethereum WebSocket RPC endpoints specified as a JSON array.
- **Format:** JSON array of objects with `name` and `url` fields
- **Example:** `'[{"name":"main","url":"wss://eth.example.com/ws"},{"name":"backup","url":"wss://backup.example.com/ws"}]'`

- **`COMMITTER__ETH__STATE_CONTRACT_ADDRESS`**

- **Description:** Ethereum address of the Fuel chain state contract.
- **Example:** `0xYourStateContractAddress`

- **`COMMITTER__ETH__FAILOVER__TX_FAILURE_THRESHOLD`**

- **Description:** Maximum number of transaction failures within the specified time window before marking a provider as unhealthy.
- **Type:** Positive integer
- **Example:** `10`

- **`COMMITTER__ETH__FAILOVER__TX_FAILURE_TIME_WINDOW`**
- **Description:** Time window to track transaction failures in.
- **Format:** Human-readable duration
- **Example:** `30m`

- **`COMMITTER__ETH__FAILOVER__TRANSIENT_ERROR_THRESHOLD`**
- **Description:** Maximum number of transient errors before a provider is considered unhealthy.
- **Type:** Positive integer
- **Example:** `3`

#### Fuel Configuration

- **`COMMITTER__FUEL__GRAPHQL_ENDPOINT`**
- **`COMMITTER__FUEL__ENDPOINT`**

- **Description:** URL to a Fuel Core GraphQL endpoint.
- **Description:** URL to a Fuel Core endpoint.
- **Example:** `http://localhost:4000/graphql`

- **`COMMITTER__FUEL__NUM_BUFFERED_REQUESTS`**
Expand Down Expand Up @@ -345,3 +364,19 @@ To run the Fee Algo Simulator, execute the following command:
```shell
cargo run --release --bin fee_algo_simulation
```

## RPC Failover

The fuel-block-committer supports automatic failover between multiple Ethereum WebSocket RPC endpoints. The failover mechanism is controlled by the following configuration parameters:

1. **Transient Error Threshold** (`COMMITTER__ETH__FAILOVER__TRANSIENT_ERROR_THRESHOLD`): Maximum number of transient errors (such as temporary network issues or rate limiting) before a provider is considered unhealthy. Unlike fatal errors that immediately trigger failover, transient errors accumulate until this threshold is reached.

2. **Transaction Failure Threshold** (`COMMITTER__ETH__FAILOVER__TX_FAILURE_THRESHOLD`): Maximum number of transaction failures within a configurable time window that will mark a provider as unhealthy. Transaction failures are specifically tracked when transactions disappear from the mempool without being mined (i.e., they are "squeezed out"), not when transactions are mined but fail execution.

3. **Transaction Failure Time Window** (`COMMITTER__ETH__FAILOVER__TX_FAILURE_TIME_WINDOW`): Time window in which transaction failures are counted. If the number of transaction failures within this window exceeds the transaction failure threshold, the provider is marked as unhealthy.

When a provider is marked as unhealthy, the system automatically switches to the next available provider in the list. However, once all providers in the list become unhealthy, the system will remain in an unhealthy state and requires a restart to recover. There is no automatic periodic checking of previously unhealthy providers.

The service will signal its unhealthy state to external monitoring systems (via health checks), which can then trigger an automatic restart of the service.

To configure RPC failover, specify multiple WebSocket RPC endpoints using the `COMMITTER__ETH__ENDPOINTS` environment variable (using `wss://` URLs) and set the appropriate failover configuration parameters.
2 changes: 1 addition & 1 deletion committer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub async fn launch_api_server(

#[get("/health")]
async fn health(data: web::Data<Arc<HealthReporter>>) -> impl Responder {
let report = data.report();
let report = data.report().await;

let mut response = if report.healthy() {
HttpResponse::Ok()
Expand Down
172 changes: 162 additions & 10 deletions committer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ use std::{

use byte_unit::Byte;
use clap::{Parser, command};
use eth::{Address, L1Keys};
use eth::{Address, Endpoint, L1Keys};
use fuel_block_committer_encoding::bundle::CompressionLevel;
use serde::Deserialize;
use services::state_committer::{AlgoConfig, FeeMultiplierRange, FeeThresholds, SmaPeriods};
use services::{
state_committer::{AlgoConfig, FeeMultiplierRange, FeeThresholds, SmaPeriods},
types::NonEmpty,
};
use storage::DbConfig;
use url::Url;

Expand Down Expand Up @@ -79,13 +82,33 @@ impl Config {
};
Ok(algo_config)
}

pub fn eth_provider_health_thresholds(&self) -> eth::ProviderHealthThresholds {
eth::ProviderHealthThresholds {
transient_error_threshold: self.eth.failover.transient_error_threshold,
tx_failure_threshold: self.eth.failover.tx_failure_threshold,
tx_failure_time_window: self.eth.failover.tx_failure_time_window,
}
}

pub fn eth_tx_config(&self) -> eth::TxConfig {
eth::TxConfig {
tx_max_fee: u128::from(self.app.tx_fees.max),
send_tx_request_timeout: self.app.send_tx_request_timeout,
acceptable_priority_fee_percentage: eth::AcceptablePriorityFeePercentages::new(
self.app.tx_fees.min_reward_perc,
self.app.tx_fees.max_reward_perc,
)
.expect("already validated via `validate` in main"),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opinion: Ideally we'd leverage the type system to guarantee this property, as validation logic is prone to change over time. While I think this is fine for now, in similar situations I'd consider defining a ValidatedConfig struct that holds the converted types directly and have validate convert to this struct.

}
}
}

#[derive(Debug, Clone, Deserialize)]
pub struct Fuel {
/// Fuel-core GraphQL endpoint URL.
/// Fuel-core endpoint URL.
#[serde(deserialize_with = "parse_url")]
pub graphql_endpoint: Url,
pub endpoint: Url,
/// Number of concurrent requests.
pub num_buffered_requests: NonZeroU32,
}
Expand All @@ -94,11 +117,51 @@ pub struct Fuel {
pub struct Eth {
/// L1 keys for state contract calls and postings.
pub l1_keys: L1Keys,
/// Ethereum RPC endpoint URL.
#[serde(deserialize_with = "parse_url")]
pub rpc: Url,
/// Multiple Ethereum RPC endpoints as a JSON array.
/// Format: '[{"name":"main","url":"wss://ethereum.example.com"}, {"name":"backup","url":"wss://backup.example.com"}]'
#[serde(deserialize_with = "parse_endpoints")]
pub endpoints: NonEmpty<Endpoint>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice use of NonEmpty 🤩

/// Ethereum address of the fuel chain state contract.
pub state_contract_address: Address,
/// Configuration for RPC failover behavior
pub failover: FailoverConfig,
}

/// Configuration for managing RPC failover behavior
#[derive(Debug, Clone, Deserialize)]
pub struct FailoverConfig {
/// Maximum number of transient errors before considering a provider unhealthy
pub transient_error_threshold: usize,
/// Maximum number of transaction failures within the specified time window before marking a provider as unhealthy.
pub tx_failure_threshold: usize,
/// Time window to track transaction failures in.
/// Format: Human-readable duration (e.g., `5m`, `30m`)
#[serde(deserialize_with = "human_readable_duration")]
pub tx_failure_time_window: Duration,
}

fn parse_endpoints<'de, D>(deserializer: D) -> Result<NonEmpty<Endpoint>, D::Error>
where
D: serde::Deserializer<'de>,
{
let value: String = Deserialize::deserialize(deserializer)?;
if value.is_empty() {
return Err(serde::de::Error::custom(
"Ethereum endpoints cannot be empty",
));
}

let configs: Vec<Endpoint> = serde_json::from_str(&value).map_err(|err| {
serde::de::Error::custom(format!("Invalid JSON format for Ethereum endpoints: {err}",))
})?;

let Some(configs) = NonEmpty::from_vec(configs) else {
return Err(serde::de::Error::custom(
"At least one Ethereum endpoint must be configured",
));
};

Ok(configs)
}

fn parse_url<'de, D>(deserializer: D) -> Result<Url, D::Error>
Expand Down Expand Up @@ -187,7 +250,7 @@ pub struct BundleConfig {
/// Time to wait for additional blocks before starting bundling.
///
/// This timeout starts from the last time a bundle was created or from app startup.
/// Bundling will occur when this timeout expires, even if byte or block thresholds arent met.
/// Bundling will occur when this timeout expires, even if byte or block thresholds aren't met.
#[serde(deserialize_with = "human_readable_duration")]
pub accumulation_timeout: Duration,

Expand Down Expand Up @@ -289,7 +352,6 @@ where
#[derive(Debug, Clone)]
pub struct Internal {
pub fuel_errors_before_unhealthy: usize,
pub eth_errors_before_unhealthy: usize,
pub balance_update_interval: Duration,
pub cost_request_limit: usize,
pub l1_blocks_cached_for_fee_metrics_tracker: usize,
Expand Down Expand Up @@ -319,7 +381,6 @@ impl Default for Internal {
const ETH_BLOCKS_PER_DAY: usize = 24 * 3600 / ETH_BLOCK_TIME;
Self {
fuel_errors_before_unhealthy: 3,
eth_errors_before_unhealthy: 3,
balance_update_interval: Duration::from_secs(10),
cost_request_limit: 1000,
l1_blocks_cached_for_fee_metrics_tracker: ETH_BLOCKS_PER_DAY,
Expand Down Expand Up @@ -352,3 +413,94 @@ pub fn parse() -> crate::errors::Result<Config> {

Ok(config.try_deserialize()?)
}

#[cfg(test)]
mod tests {
use serde::de::value::{Error as SerdeError, StringDeserializer};
use serde_json::json;
use services::types::nonempty;

use super::*;

#[test]
fn test_parse_endpoints() {
// given
let valid_configs = json!([
{"name": "main", "url": "https://ethereum.example.com"},
{"name": "backup", "url": "https://backup.example.com"}
])
.to_string();

let expected_configs = nonempty![
Endpoint {
name: "main".to_string(),
url: Url::parse("https://ethereum.example.com").unwrap(),
},
Endpoint {
name: "backup".to_string(),
url: Url::parse("https://backup.example.com").unwrap(),
},
];

let deserializer = StringDeserializer::<SerdeError>::new(valid_configs);

// when
let result = parse_endpoints(deserializer);

// then
let configs = result.unwrap();
assert_eq!(configs, expected_configs);
}

#[test]
fn test_parse_endpoints_invalid_json() {
// given
let invalid_json = "not a valid json";

let deserializer = StringDeserializer::<SerdeError>::new(invalid_json.to_string());

// when
let result = parse_endpoints(deserializer);

// then
let err_msg = result
.expect_err("should have failed since the json is invalid")
.to_string();
assert!(err_msg.contains("Invalid JSON format"));
}

#[test]
fn test_parse_endpoints_empty_array() {
// given
let empty_array = json!([]).to_string();

let deserializer = StringDeserializer::<SerdeError>::new(empty_array);

// when
let result = parse_endpoints(deserializer);

// then
let err_msg = result
.expect_err("should have failed since the array is empty")
.to_string();
assert!(err_msg.contains("At least one Ethereum endpoint must be configured"));
}

#[test]
fn test_parse_endpoints_invalid_url() {
// given
let invalid_url = json!([
{"name": "main", "url": "not a valid url"}
])
.to_string();

let deserializer = StringDeserializer::<SerdeError>::new(invalid_url);

// when
let result = parse_endpoints(deserializer);

// then
let err_msg = result.expect_err("because url was not valid").to_string();
assert!(err_msg.contains("Invalid JSON format"));
}
}
14 changes: 14 additions & 0 deletions committer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ impl From<services::Error> for Error {
}
}

impl From<eth::Error> for Error {
fn from(value: eth::Error) -> Self {
match value {
eth::Error::Network { msg, recoverable } => {
Self::Network(format!("{msg}: recoverable={recoverable}"))
}
eth::Error::TxExecution(msg) => {
Self::Other(format!("Transaction execution error: {msg}"))
}
eth::Error::Other(msg) => Self::Other(msg),
}
}
}

impl From<config::ConfigError> for Error {
fn from(error: config::ConfigError) -> Self {
Self::Other(error.to_string())
Expand Down
11 changes: 5 additions & 6 deletions committer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio_util::sync::CancellationToken;

use crate::setup::shut_down;

pub type L1 = eth::WebsocketClient;
pub type L1 = eth::FailoverClient<eth::WebsocketClientFactory>;
pub type AwsClient = eth::AwsClient;
pub type Database = storage::Postgres;
pub type FuelApi = fuel::HttpClient;
Expand All @@ -40,10 +40,9 @@ async fn main() -> Result<()> {
let (fuel_adapter, fuel_health_check) =
setup::fuel_adapter(&config, &internal_config, &metrics_registry);

let (ethereum_rpc, eth_health_check) =
setup::l1_adapter(&config, &internal_config, &metrics_registry)
.await
.with_context(|| "could not setup l1 adapter")?;
let (ethereum_rpc, eth_health_check) = setup::l1_adapter(&config, &metrics_registry)
.await
.with_context(|| "could not setup l1 adapter")?;

let wallet_balance_tracker_handle = setup::wallet_balance_tracker(
&internal_config,
Expand Down Expand Up @@ -83,7 +82,7 @@ async fn main() -> Result<()> {
cancel_token.clone(),
&config,
&metrics_registry,
)?;
);

let state_committer_handle = setup::state_committer(
fuel_adapter.clone(),
Expand Down
Loading
Loading