-
Notifications
You must be signed in to change notification settings - Fork 26
feat: failover for eth rpcs #217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
AurelienFT
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's my first look at this repository and so I don't have the full vision but it seems that making everything using sync primitives everywhere only for the HealthChecker and send operation seems a lot of overhead but maybe there is part i'm missing or constraints I don't understand. Overall, I liked reviewed this code that was well organized and provide a good customized solution to failovers
Co-authored-by: AurelienFT <[email protected]>
Co-authored-by: AurelienFT <[email protected]>
dedicated for that
AurelienFT
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice improvement to the reliability of the code, really enjoyed review this. Nice one !
netrome
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some opinions and suggestions from my side, otherwise it looks good to me. Only strong opinion I have is about the naming of operation_factory. Apologies for taking so long with the review.
| self.app.tx_fees.min_reward_perc, | ||
| self.app.tx_fees.max_reward_perc, | ||
| ) | ||
| .expect("already validated via `validate` in main"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opinion: Ideally we'd leverage the type system to guarantee this property, as validation logic is prone to change over time. While I think this is fine for now, in similar situations I'd consider defining a ValidatedConfig struct that holds the converted types directly and have validate convert to this struct.
| /// Multiple Ethereum RPC endpoints as a JSON array. | ||
| /// Format: '[{"name":"main","url":"wss://ethereum.example.com"}, {"name":"backup","url":"wss://backup.example.com"}]' | ||
| #[serde(deserialize_with = "parse_endpoints")] | ||
| pub endpoints: NonEmpty<Endpoint>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice use of NonEmpty 🤩
| async fn execute_operation<F, Fut, T>(&self, operation_factory: F) -> EthResult<T> | ||
| where | ||
| F: FnOnce(Arc<I::Provider>) -> Fut + Send + Sync, | ||
| Fut: std::future::Future<Output = EthResult<T>> + Send, | ||
| T: Send, | ||
| { | ||
| let provider = self.get_healthy_provider().await?; | ||
| let provider_name = provider.name.as_str(); | ||
|
|
||
| debug!("Executing operation on provider '{}'", provider_name); | ||
|
|
||
| let result = operation_factory(Arc::clone(&provider.provider)).await; | ||
|
|
||
| match result { | ||
| Ok(value) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If operation_factory was a factory, I'd expect the output of the function to be an operation, not the result of calling an operation. It seems like this is just the operation, and hence I think we should rename it.
| async fn execute_operation<F, Fut, T>(&self, operation_factory: F) -> EthResult<T> | |
| where | |
| F: FnOnce(Arc<I::Provider>) -> Fut + Send + Sync, | |
| Fut: std::future::Future<Output = EthResult<T>> + Send, | |
| T: Send, | |
| { | |
| let provider = self.get_healthy_provider().await?; | |
| let provider_name = provider.name.as_str(); | |
| debug!("Executing operation on provider '{}'", provider_name); | |
| let result = operation_factory(Arc::clone(&provider.provider)).await; | |
| match result { | |
| Ok(value) => { | |
| async fn execute_operation<F, Fut, T>(&self, operation: F) -> EthResult<T> | |
| where | |
| F: FnOnce(Arc<I::Provider>) -> Fut + Send + Sync, | |
| Fut: std::future::Future<Output = EthResult<T>> + Send, | |
| T: Send, | |
| { | |
| let provider = self.get_healthy_provider().await?; | |
| let provider_name = provider.name.as_str(); | |
| debug!("Executing operation on provider '{}'", provider_name); | |
| let result = operation(Arc::clone(&provider.provider)).await; | |
| match result { | |
| Ok(value) => { |
| pub type WsProvider = alloy::providers::fillers::FillProvider< | ||
| alloy::providers::fillers::JoinFill< | ||
| alloy::providers::fillers::JoinFill< | ||
| alloy::providers::Identity, | ||
| alloy::providers::fillers::JoinFill< | ||
| alloy::providers::fillers::GasFiller, | ||
| alloy::providers::fillers::JoinFill< | ||
| alloy::providers::fillers::BlobGasFiller, | ||
| alloy::providers::fillers::JoinFill< | ||
| alloy::providers::fillers::NonceFiller, | ||
| alloy::providers::fillers::ChainIdFiller, | ||
| >, | ||
| >, | ||
| >, | ||
| >, | ||
| alloy::providers::fillers::WalletFiller<EthereumWallet>, | ||
| >, | ||
| alloy::providers::RootProvider<alloy::pubsub::PubSubFrontend>, | ||
| alloy::pubsub::PubSubFrontend, | ||
| Ethereum, | ||
| >; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh... This will render horribly in error messages and any place that isn't aware of the type alias. Perhaps we should define a newtype over it to make it more maintainable? Or any other ideas on how to manage this behemoth of a type signature?
closes: #55
How the Failover Logic Works
The
FailoverClientchecks provider health whenever we're about to send a request. If the active provider is marked unhealthy, it automatically connects to the next one. If no providers are healthy, it returns an error. Here's the general flow:Initialization:
When a Request Arrives:
get_block_numberorsubmiton theFailoverClient.Health Check:
from like alloy permanently dropping the background task which keeps the websocket
connection alive.
with Alchemy -- transactions continually being squeezed out.
Forward or Switch:
Mempool Drops:
note_mempool_dropis called. This bumps the failure counter for the active provider, potentially making it unhealthy for future requests.Below are two diagrams to illustrate this:
High-Level:
graph LR Committer --> FailoverClient; FailoverClient --> ProviderA["Provider A (Active)"]; FailoverClient -.-> ProviderB["Provider B (Standby)"]; FailoverClient -.-> ProviderC["Provider C (Standby)"]; ProviderA --> L1NodeA["L1 Node A"]; ProviderB --> L1NodeB["L1 Node B"]; ProviderC --> L1NodeC["L1 Node C"]; subgraph "L1 Providers" ProviderA ProviderB ProviderC endOn-Demand Failover Sequence:
sequenceDiagram participant C as Committer participant FC as FailoverClient participant PA as Provider A participant PB as Provider B C->>FC: Send L1 Request (e.g., submit) activate FC FC->>FC: Check Health of Provider A alt Provider A is Healthy FC->>PA: Forward Request activate PA alt Request Succeeds PA-->>FC: Success FC->>FC: Reset transient error count for PA else Request Fails (Transient Error) PA-->>FC: Error FC->>FC: Increment transient error count for PA end deactivate PA FC-->>C: Response (Success or Error) else Provider A is Unhealthy Note right of FC: Provider A marked Unhealthy FC->>FC: Try connecting to next provider (Provider B) alt Connection to Provider B Successful FC->>PB: Set Provider B as Active FC->>PB: Forward Original Request activate PB PB-->>FC: Success deactivate PB FC-->>C: Response (Success) else Connection to Provider B Fails Note right of FC: No healthy providers left! FC-->>C: Error (All providers unhealthy) end end deactivate FCNotable Changes
Failover Client (
FailoverClient)packages/adapters/eth/src/failover.rs(main logic infailover/client.rs).Refactored WebSocket Adapter
packages/adapters/eth/src/websocket/.websocket.rshas merged with the oldconnection.rsHealth Tracking
HealthTrackingMiddleware.FailoverClient(execute_operationandget_healthy_provider).note_mempool_drop)Mempool Drop Reporting
StateListenerinpackages/services/src/state_listener.rscallsl1_adapter.note_mempool_drop()when it suspects a transaction is lost.Async Health Checks
HealthChecktrait inpackages/metricsusesasync fn healthy().Network Error Metric
eth_network_errorsis now a counter labeled by the provider name.Deployment Considerations
Configuration:
ETH_ENDPOINTSas[{"name":"main","url":"wss://ethereum.example.com"}, {"name":"backup","url":"wss://backup.example.com"}]ETH_FAILOVER_TRANSIENT_ERROR_THRESHOLDETH_FAILOVER_MEMPOOL_DROP_THRESHOLDETH_FAILOVER_MEMPOOL_DROP_WINDOWMetrics Updates:
eth_mempool_drops(counter, labeled by provider)eth_network_errorsnow increments inside theFailoverClient, labeled by the provider name.Action Required
provider_namelabels and the neweth_mempool_dropsmetric.