-
Notifications
You must be signed in to change notification settings - Fork 26
feat: failover for eth rpcs #217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
segfault-magnet
wants to merge
97
commits into
master
Choose a base branch
from
feat/failover_api
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
97 commits
Select commit
Hold shift + click to select a range
d628cac
work towards failovering
segfault-magnet fed8b0f
signers are now cloneable
segfault-magnet 6d48af2
fix mock setup
segfault-magnet aa09ebc
working almost as expected
segfault-magnet b490ae5
make client a generic param
segfault-magnet 806907b
refactoring
segfault-magnet 3532968
shift to mutex
segfault-magnet 7fff72e
track state in arc
segfault-magnet ca56b27
move state under one mutex
segfault-magnet 10d81a4
implement l1 provider trait for failover client
segfault-magnet 4ded54a
use initializer as associated method
segfault-magnet fc550a6
max transient errors is configurable
segfault-magnet c221709
websocket client factory
segfault-magnet d9a1819
inline websocket connection
segfault-magnet 97cb0ff
cleanup
segfault-magnet 0318913
extracted metrics
segfault-magnet f05db45
extracted factory
segfault-magnet c0e6e55
fix imports
segfault-magnet 4f58a56
group imports
segfault-magnet bdad767
rename
segfault-magnet 1ea96f4
health checker
segfault-magnet ffca714
enable logs
segfault-magnet 8aa7b3d
propagate multiple rpcs to config
segfault-magnet 086c06b
window for detecting too many failed transactions
segfault-magnet 580669d
tests
segfault-magnet 584dc7c
remove tests
segfault-magnet cb79a53
fix expectations on mock
segfault-magnet 8131fe4
add tests for note_tx_failure
segfault-magnet cb4b54d
remove multiple pending tx test since that currently cannot happen
segfault-magnet 871ba18
tests compiling
segfault-magnet 7b207dd
fix metrics not being tracked on transient errors and fatal
segfault-magnet af370ac
fix num of expectations
segfault-magnet d469d78
tests for parsing rpc config
segfault-magnet 0ad90fc
update readme with failover info
segfault-magnet 548c1c7
make failover config externally configurable
segfault-magnet b020580
update readme
segfault-magnet bb1b69b
use url instead of string in config
segfault-magnet 5cb9127
remove unused imports
segfault-magnet f81b8ca
remove unused new method
segfault-magnet 8b9d4b7
update comments
segfault-magnet e43e2b5
cleanup tests for rpc config reading
segfault-magnet 0df3e8b
remove arc from signers
segfault-magnet f34e149
remove comment
segfault-magnet 76a575e
remove comments
segfault-magnet 737640f
remove unused setters on e2e committer wrapper
segfault-magnet 7f25e96
remove debug statements
segfault-magnet 07cc3dd
turn off logs in e2e
segfault-magnet aa42d4d
remove tracing subscriber dep
segfault-magnet 3c34e67
revert uneccessary changes
segfault-magnet d594afb
dedup rpc config structs
segfault-magnet 3cbf396
move failover env variables to separate struct
segfault-magnet 79ee807
uncomment e2e tests
segfault-magnet 818efcc
change naming 'rpc configs' in config to 'endpoints'
segfault-magnet 4a20fa2
change http to wss
segfault-magnet ab76f76
endpoints to be NonEmpty
segfault-magnet e56f6a6
remove is empty check
segfault-magnet eb7ec2d
rename RpcEndpoint to endpoint
segfault-magnet 7d3270b
group imports
segfault-magnet 9485ab3
make naming around endpoints consistent between fuel and eth
segfault-magnet 661f8ac
nits
segfault-magnet 3d3b59c
move helpers below tests
segfault-magnet eac6f19
dry up tests
segfault-magnet 00073f6
fix all providers not available failover test
segfault-magnet 2c4df07
remove unused pub methods, improve tests
segfault-magnet 24425e0
group health related fields into struct
segfault-magnet 6fa14ca
move config to helpers
segfault-magnet 6d1a00f
make eth transient error configurable
segfault-magnet 336226d
endpoints to be part of the failover config
segfault-magnet cca73d1
add missing config in readme
segfault-magnet 2a67aa1
update readme
segfault-magnet 2df2440
fix compilation
segfault-magnet fa4e0bd
cqrs
segfault-magnet 2a02cca
don't test internals
segfault-magnet 7c17a6c
rwlock on window of tx times
segfault-magnet 3db2806
move endpoints out of the failover config
segfault-magnet 9437370
rename config structs
segfault-magnet d660432
remove unneeded comments
segfault-magnet 6f38c01
add label to metrics to track errors per endpoint
segfault-magnet 59b4c6e
separate out into files
segfault-magnet 33e6074
cleaning up tests
segfault-magnet f08dbe7
fix test
segfault-magnet 39fb050
cleanup tests
segfault-magnet 9c6fb1a
don't access internals in test
segfault-magnet 19e0a6b
remove redundant once
segfault-magnet 2229f5a
remove unused method
segfault-magnet 080cb43
remove comment from run tests script
segfault-magnet 8d26bcb
Merge branch 'master' into feat/failover_api
segfault-magnet ebad1d2
final nits
segfault-magnet 1c7c56c
Update e2e/helpers/src/committer.rs
segfault-magnet 70316e5
bump transient errors in e2e committer setup
segfault-magnet b5cf1f3
rename to mempool drop
segfault-magnet 69df084
merge usage of crate and super
segfault-magnet d7b9e8c
optimize window retention in the error tracker of the failover client
segfault-magnet eb64834
optimize too-many-tx-drops detection
segfault-magnet 75fa395
Update packages/adapters/eth/src/failover/client.rs
segfault-magnet 0a4c61f
save iterator instead of whole colection
segfault-magnet 941911a
health tracking to be moved out of the client into a separate struct
segfault-magnet File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,10 +7,13 @@ use std::{ | |
|
|
||
| use byte_unit::Byte; | ||
| use clap::{Parser, command}; | ||
| use eth::{Address, L1Keys}; | ||
| use eth::{Address, Endpoint, L1Keys}; | ||
| use fuel_block_committer_encoding::bundle::CompressionLevel; | ||
| use serde::Deserialize; | ||
| use services::state_committer::{AlgoConfig, FeeMultiplierRange, FeeThresholds, SmaPeriods}; | ||
| use services::{ | ||
| state_committer::{AlgoConfig, FeeMultiplierRange, FeeThresholds, SmaPeriods}, | ||
| types::NonEmpty, | ||
| }; | ||
| use storage::DbConfig; | ||
| use url::Url; | ||
|
|
||
|
|
@@ -79,13 +82,33 @@ impl Config { | |
| }; | ||
| Ok(algo_config) | ||
| } | ||
|
|
||
| pub fn eth_provider_health_thresholds(&self) -> eth::ProviderHealthThresholds { | ||
| eth::ProviderHealthThresholds { | ||
| transient_error_threshold: self.eth.failover.transient_error_threshold, | ||
| mempool_drop_threshold: self.eth.failover.mempool_drop_threshold, | ||
| mempool_drop_window: self.eth.failover.mempool_drop_window, | ||
| } | ||
| } | ||
|
|
||
| pub fn eth_tx_config(&self) -> eth::TxConfig { | ||
| eth::TxConfig { | ||
| tx_max_fee: u128::from(self.app.tx_fees.max), | ||
| send_tx_request_timeout: self.app.send_tx_request_timeout, | ||
| acceptable_priority_fee_percentage: eth::AcceptablePriorityFeePercentages::new( | ||
| self.app.tx_fees.min_reward_perc, | ||
| self.app.tx_fees.max_reward_perc, | ||
| ) | ||
| .expect("already validated via `validate` in main"), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, Clone, Deserialize)] | ||
| pub struct Fuel { | ||
| /// Fuel-core GraphQL endpoint URL. | ||
| /// Fuel-core endpoint URL. | ||
| #[serde(deserialize_with = "parse_url")] | ||
| pub graphql_endpoint: Url, | ||
| pub endpoint: Url, | ||
| /// Number of concurrent requests. | ||
| pub num_buffered_requests: NonZeroU32, | ||
| } | ||
|
|
@@ -94,11 +117,51 @@ pub struct Fuel { | |
| pub struct Eth { | ||
| /// L1 keys for state contract calls and postings. | ||
| pub l1_keys: L1Keys, | ||
| /// Ethereum RPC endpoint URL. | ||
| #[serde(deserialize_with = "parse_url")] | ||
| pub rpc: Url, | ||
| /// Multiple Ethereum RPC endpoints as a JSON array. | ||
| /// Format: '[{"name":"main","url":"wss://ethereum.example.com"}, {"name":"backup","url":"wss://backup.example.com"}]' | ||
| #[serde(deserialize_with = "parse_endpoints")] | ||
| pub endpoints: NonEmpty<Endpoint>, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice use of |
||
| /// Ethereum address of the fuel chain state contract. | ||
| pub state_contract_address: Address, | ||
| /// Configuration for RPC failover behavior | ||
| pub failover: FailoverConfig, | ||
| } | ||
|
|
||
| /// Configuration for managing RPC failover behavior | ||
| #[derive(Debug, Clone, Deserialize)] | ||
| pub struct FailoverConfig { | ||
| /// Maximum number of transient errors before considering a provider unhealthy | ||
| pub transient_error_threshold: usize, | ||
| /// Maximum number of mempool drops within the specified time window before marking a provider as unhealthy. | ||
| pub mempool_drop_threshold: usize, | ||
| /// Time window to track mempool drops in. | ||
| /// Format: Human-readable duration (e.g., `5m`, `30m`) | ||
| #[serde(deserialize_with = "human_readable_duration")] | ||
| pub mempool_drop_window: Duration, | ||
| } | ||
|
|
||
| fn parse_endpoints<'de, D>(deserializer: D) -> Result<NonEmpty<Endpoint>, D::Error> | ||
| where | ||
| D: serde::Deserializer<'de>, | ||
| { | ||
| let value: String = Deserialize::deserialize(deserializer)?; | ||
| if value.is_empty() { | ||
| return Err(serde::de::Error::custom( | ||
| "Ethereum endpoints cannot be empty", | ||
| )); | ||
| } | ||
|
|
||
| let configs: Vec<Endpoint> = serde_json::from_str(&value).map_err(|err| { | ||
| serde::de::Error::custom(format!("Invalid JSON format for Ethereum endpoints: {err}",)) | ||
| })?; | ||
|
|
||
| let Some(configs) = NonEmpty::from_vec(configs) else { | ||
| return Err(serde::de::Error::custom( | ||
| "At least one Ethereum endpoint must be configured", | ||
| )); | ||
| }; | ||
|
|
||
| Ok(configs) | ||
| } | ||
|
|
||
| fn parse_url<'de, D>(deserializer: D) -> Result<Url, D::Error> | ||
|
|
@@ -187,7 +250,7 @@ pub struct BundleConfig { | |
| /// Time to wait for additional blocks before starting bundling. | ||
| /// | ||
| /// This timeout starts from the last time a bundle was created or from app startup. | ||
| /// Bundling will occur when this timeout expires, even if byte or block thresholds aren’t met. | ||
| /// Bundling will occur when this timeout expires, even if byte or block thresholds aren't met. | ||
| #[serde(deserialize_with = "human_readable_duration")] | ||
| pub accumulation_timeout: Duration, | ||
|
|
||
|
|
@@ -289,7 +352,6 @@ where | |
| #[derive(Debug, Clone)] | ||
| pub struct Internal { | ||
| pub fuel_errors_before_unhealthy: usize, | ||
| pub eth_errors_before_unhealthy: usize, | ||
| pub balance_update_interval: Duration, | ||
| pub cost_request_limit: usize, | ||
| pub l1_blocks_cached_for_fee_metrics_tracker: usize, | ||
|
|
@@ -319,7 +381,6 @@ impl Default for Internal { | |
| const ETH_BLOCKS_PER_DAY: usize = 24 * 3600 / ETH_BLOCK_TIME; | ||
| Self { | ||
| fuel_errors_before_unhealthy: 3, | ||
| eth_errors_before_unhealthy: 3, | ||
| balance_update_interval: Duration::from_secs(10), | ||
| cost_request_limit: 1000, | ||
| l1_blocks_cached_for_fee_metrics_tracker: ETH_BLOCKS_PER_DAY, | ||
|
|
@@ -352,3 +413,94 @@ pub fn parse() -> crate::errors::Result<Config> { | |
|
|
||
| Ok(config.try_deserialize()?) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use serde::de::value::{Error as SerdeError, StringDeserializer}; | ||
| use serde_json::json; | ||
| use services::types::nonempty; | ||
|
|
||
| use super::*; | ||
|
|
||
| #[test] | ||
| fn test_parse_endpoints() { | ||
| // given | ||
| let valid_configs = json!([ | ||
| {"name": "main", "url": "https://ethereum.example.com"}, | ||
| {"name": "backup", "url": "https://backup.example.com"} | ||
| ]) | ||
| .to_string(); | ||
|
|
||
| let expected_configs = nonempty![ | ||
| Endpoint { | ||
| name: "main".to_string(), | ||
| url: Url::parse("https://ethereum.example.com").unwrap(), | ||
| }, | ||
| Endpoint { | ||
| name: "backup".to_string(), | ||
| url: Url::parse("https://backup.example.com").unwrap(), | ||
| }, | ||
| ]; | ||
|
|
||
| let deserializer = StringDeserializer::<SerdeError>::new(valid_configs); | ||
|
|
||
| // when | ||
| let result = parse_endpoints(deserializer); | ||
|
|
||
| // then | ||
| let configs = result.unwrap(); | ||
| assert_eq!(configs, expected_configs); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_parse_endpoints_invalid_json() { | ||
| // given | ||
| let invalid_json = "not a valid json"; | ||
|
|
||
| let deserializer = StringDeserializer::<SerdeError>::new(invalid_json.to_string()); | ||
|
|
||
| // when | ||
| let result = parse_endpoints(deserializer); | ||
|
|
||
| // then | ||
| let err_msg = result | ||
| .expect_err("should have failed since the json is invalid") | ||
| .to_string(); | ||
| assert!(err_msg.contains("Invalid JSON format")); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_parse_endpoints_empty_array() { | ||
| // given | ||
| let empty_array = json!([]).to_string(); | ||
|
|
||
| let deserializer = StringDeserializer::<SerdeError>::new(empty_array); | ||
|
|
||
| // when | ||
| let result = parse_endpoints(deserializer); | ||
|
|
||
| // then | ||
| let err_msg = result | ||
| .expect_err("should have failed since the array is empty") | ||
| .to_string(); | ||
| assert!(err_msg.contains("At least one Ethereum endpoint must be configured")); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_parse_endpoints_invalid_url() { | ||
| // given | ||
| let invalid_url = json!([ | ||
| {"name": "main", "url": "not a valid url"} | ||
| ]) | ||
| .to_string(); | ||
|
|
||
| let deserializer = StringDeserializer::<SerdeError>::new(invalid_url); | ||
|
|
||
| // when | ||
| let result = parse_endpoints(deserializer); | ||
|
|
||
| // then | ||
| let err_msg = result.expect_err("because url was not valid").to_string(); | ||
| assert!(err_msg.contains("Invalid JSON format")); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opinion: Ideally we'd leverage the type system to guarantee this property, as validation logic is prone to change over time. While I think this is fine for now, in similar situations I'd consider defining a
ValidatedConfigstruct that holds the converted types directly and havevalidateconvert to this struct.