Skip to content

Commit 4f26691

Browse files
Merge pull request #22 from omnia-network/dev
Polling timeout
2 parents f8384de + 3673c8a commit 4f26691

File tree

7 files changed

+119
-47
lines changed

7 files changed

+119
-47
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/canister-utils/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub struct WebsocketMessage {
9393
}
9494

9595
/// Element of the list of messages returned to the WS Gateway after polling.
96-
#[derive(CandidType, Clone, Deserialize, Serialize, Eq, PartialEq)]
96+
#[derive(Debug, CandidType, Clone, Deserialize, Serialize, Eq, PartialEq)]
9797
pub struct CanisterOutputMessage {
9898
/// The client that the gateway will forward the message to or that sent the message.
9999
pub client_key: ClientKey,
@@ -122,7 +122,7 @@ pub enum CanisterServiceMessage {
122122
}
123123

124124
/// List of messages returned to the WS Gateway after polling.
125-
#[derive(CandidType, Clone, Deserialize, Serialize, Eq, PartialEq)]
125+
#[derive(Debug, CandidType, Clone, Deserialize, Serialize, Eq, PartialEq)]
126126
pub struct CanisterOutputCertifiedMessages {
127127
pub messages: Vec<CanisterOutputMessage>, // List of messages.
128128
#[serde(with = "serde_bytes")]

src/ic-websocket-gateway/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ic_websocket_gateway"
3-
version = "1.3.1"
3+
version = "1.3.2"
44
edition.workspace = true
55
rust-version.workspace = true
66
repository.workspace = true

src/ic-websocket-gateway/src/canister_poller.rs

Lines changed: 61 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,18 @@ use canister_utils::{
66
use gateway_state::{CanisterEntry, CanisterPrincipal, ClientSender, GatewayState, PollerState};
77
use ic_agent::{Agent, AgentError};
88
use std::{sync::Arc, time::Duration};
9-
use tokio::sync::mpsc::Sender;
9+
use tokio::{sync::mpsc::Sender, time::timeout};
1010
use tracing::{error, span, trace, warn, Instrument, Level, Span};
1111

12-
enum PollingStatus {
12+
pub(crate) const POLLING_TIMEOUT_MS: u64 = 5_000;
13+
14+
type PollingTimeout = Duration;
15+
16+
#[derive(Debug, PartialEq, Eq)]
17+
pub(crate) enum PollingStatus {
1318
NoMessagesPolled,
1419
MessagesPolled(CanisterOutputCertifiedMessages),
20+
PollerTimedOut,
1521
}
1622

1723
/// Poller which periodically queries a canister for new messages and relays them to the client
@@ -59,7 +65,7 @@ impl CanisterPoller {
5965
// initially set to None as the first iteration will not have a previous span
6066
let mut previous_polling_iteration_span: Option<Span> = None;
6167
loop {
62-
let polling_iteration_span = span!(Level::TRACE, "Polling Iteration", canister_id = %self.canister_id, polling_iteration = self.polling_iteration);
68+
let polling_iteration_span = span!(Level::TRACE, "Polling Iteration", canister_id = %self.canister_id, polling_iteration = self.polling_iteration, cargo_version = env!("CARGO_PKG_VERSION"));
6369
if let Some(previous_polling_iteration_span) = previous_polling_iteration_span {
6470
// create a follow from relationship between the current and previous polling iteration
6571
// this enables to crawl polling iterations in reverse chronological order
@@ -97,33 +103,39 @@ impl CanisterPoller {
97103
pub async fn poll_and_relay(&mut self) -> Result<(), String> {
98104
let start_polling_instant = tokio::time::Instant::now();
99105

100-
if let PollingStatus::MessagesPolled(certified_canister_output) =
101-
self.poll_canister().await?
102-
{
103-
let relay_messages_span =
104-
span!(parent: &Span::current(), Level::TRACE, "Relay Canister Messages");
105-
let end_of_queue_reached = {
106-
match certified_canister_output.is_end_of_queue {
107-
Some(is_end_of_queue_reached) => is_end_of_queue_reached,
108-
// if 'is_end_of_queue' is None, the CDK version is < 0.3.1 and does not have such a field
109-
// in this case, assume that the queue is fully drained and therefore will be polled again
110-
// after waiting for 'polling_interval_ms'
111-
None => true,
106+
match self.poll_canister().await? {
107+
PollingStatus::MessagesPolled(certified_canister_output) => {
108+
let relay_messages_span =
109+
span!(parent: &Span::current(), Level::TRACE, "Relay Canister Messages");
110+
let end_of_queue_reached = {
111+
match certified_canister_output.is_end_of_queue {
112+
Some(is_end_of_queue_reached) => is_end_of_queue_reached,
113+
// if 'is_end_of_queue' is None, the CDK version is < 0.3.1 and does not have such a field
114+
// in this case, assume that the queue is fully drained and therefore will be polled again
115+
// after waiting for 'polling_interval_ms'
116+
None => true,
117+
}
118+
};
119+
self.update_nonce(&certified_canister_output)?;
120+
// relaying of messages cannot be done in a separate task for each polling iteration
121+
// as they might interleave and break the correct ordering of messages
122+
// TODO: create a separate task dedicated to relaying messages which receives the messages from the poller via a queue
123+
// and relays them in FIFO order
124+
self.relay_messages(certified_canister_output)
125+
.instrument(relay_messages_span)
126+
.await;
127+
if !end_of_queue_reached {
128+
// if the queue is not fully drained, return immediately so that the next polling iteration can be started
129+
warn!("Canister queue is not fully drained. Polling immediately");
130+
return Ok(());
112131
}
113-
};
114-
self.update_nonce(&certified_canister_output)?;
115-
// relaying of messages cannot be done in a separate task for each polling iteration
116-
// as they might interleave and break the correct ordering of messages
117-
// TODO: create a separate task dedicated to relaying messages which receives the messages from the poller via a queue
118-
// and relays them in FIFO order
119-
self.relay_messages(certified_canister_output)
120-
.instrument(relay_messages_span)
121-
.await;
122-
if !end_of_queue_reached {
123-
// if the queue is not fully drained, return immediately so that the next polling iteration can be started
124-
warn!("Canister queue is not fully drained. Polling immediately");
132+
},
133+
PollingStatus::PollerTimedOut => {
134+
// if the poller timed out, it already waited way too long... return immediately so that the next polling iteration can be started
135+
warn!("Poller timed out. Polling immediately");
125136
return Ok(());
126-
}
137+
},
138+
PollingStatus::NoMessagesPolled => (),
127139
}
128140

129141
// compute the amout of time to sleep for before polling again
@@ -135,20 +147,26 @@ impl CanisterPoller {
135147
}
136148

137149
/// Polls the canister for messages
138-
async fn poll_canister(&mut self) -> Result<PollingStatus, String> {
150+
pub(crate) async fn poll_canister(&mut self) -> Result<PollingStatus, String> {
139151
trace!("Started polling iteration");
140152

141153
// get messages to be relayed to clients from canister (starting from 'message_nonce')
142-
match ws_get_messages(
143-
&self.agent,
144-
&self.canister_id,
145-
CanisterWsGetMessagesArguments {
146-
nonce: self.next_message_nonce,
147-
},
154+
// the response timeout of the IC CDK is 2 minutes which implies that the poller would be stuck for that long waiting for a response
155+
// to prevent this, we set a timeout of 5 seconds, if the poller does not receive a response in time, it polls immediately
156+
// in case of a timeout, the message nonce is not updated so that no messages are lost by polling immediately again
157+
match timeout(
158+
PollingTimeout::from_millis(POLLING_TIMEOUT_MS),
159+
ws_get_messages(
160+
&self.agent,
161+
&self.canister_id,
162+
CanisterWsGetMessagesArguments {
163+
nonce: self.next_message_nonce,
164+
},
165+
),
148166
)
149167
.await
150168
{
151-
Ok(certified_canister_output) => {
169+
Ok(Ok(certified_canister_output)) => {
152170
let number_of_polled_messages = certified_canister_output.messages.len();
153171
if number_of_polled_messages == 0 {
154172
trace!("No messages polled from canister");
@@ -161,7 +179,7 @@ impl CanisterPoller {
161179
Ok(PollingStatus::MessagesPolled(certified_canister_output))
162180
}
163181
},
164-
Err(IcError::Agent(e)) => {
182+
Ok(Err(IcError::Agent(e))) => {
165183
if is_recoverable_error(&e) {
166184
// if the error is due to a replica which is either actively malicious or simply unavailable
167185
// or to a malfunctioning boundary node,
@@ -174,8 +192,12 @@ impl CanisterPoller {
174192
Err(format!("Unrecoverable agent error: {:?}", e))
175193
}
176194
},
177-
Err(IcError::Candid(e)) => Err(format!("Unrecoverable candid error: {:?}", e)),
178-
Err(IcError::Cdk(e)) => Err(format!("Unrecoverable CDK error: {:?}", e)),
195+
Ok(Err(IcError::Candid(e))) => Err(format!("Unrecoverable candid error: {:?}", e)),
196+
Ok(Err(IcError::Cdk(e))) => Err(format!("Unrecoverable CDK error: {:?}", e)),
197+
Err(e) => {
198+
warn!("Poller took too long to retrieve messages: {:?}", e);
199+
Ok(PollingStatus::PollerTimedOut)
200+
},
179201
}
180202
}
181203

src/ic-websocket-gateway/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ async fn main() -> Result<(), String> {
8484

8585
// must be printed after initializing tracing to ensure that the info are captured
8686
info!("Deployment info: {:?}", deployment_info);
87+
info!("Cargo version: {}", env!("CARGO_PKG_VERSION"));
8788
info!("Gateway Agent principal: {}", gateway_principal);
8889

8990
let tls_config = if deployment_info.tls_certificate_pem_path.is_some()

src/ic-websocket-gateway/src/tests/canister_poller.rs

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,15 @@ mod test {
1111
use lazy_static::lazy_static;
1212
use std::{
1313
sync::{Arc, Mutex},
14+
thread,
1415
time::Duration,
1516
};
1617
use tokio::sync::mpsc::{self, Receiver, Sender};
1718
use tracing::Span;
1819

19-
use crate::canister_poller::{get_nonce_from_message, CanisterPoller};
20+
use crate::canister_poller::{
21+
get_nonce_from_message, CanisterPoller, PollingStatus, POLLING_TIMEOUT_MS,
22+
};
2023

2124
struct MockCanisterOutputCertifiedMessages(CanisterOutputCertifiedMessages);
2225

@@ -237,7 +240,7 @@ mod test {
237240
let end_polling_instant = tokio::time::Instant::now();
238241
let elapsed = end_polling_instant - start_polling_instant;
239242
// run 'cargo test -- --nocapture' to see the elapsed time
240-
println!("Elapsed: {:?}", elapsed);
243+
println!("Elapsed after relaying (should not sleep): {:?}", elapsed);
241244
assert!(
242245
elapsed > Duration::from_millis(polling_interval_ms)
243246
// Reasonable to expect that the time it takes to sleep
@@ -294,7 +297,7 @@ mod test {
294297
poller.poll_and_relay().await.expect("Failed to poll");
295298
let end_polling_instant = tokio::time::Instant::now();
296299
let elapsed = end_polling_instant - start_polling_instant;
297-
println!("Elapsed: {:?}", elapsed);
300+
println!("Elapsed after relaying (should sleep): {:?}", elapsed);
298301
assert!(
299302
// The `poll_and_relay` function should not sleep for `polling_interval_ms`
300303
// if the queue is not empty.
@@ -319,6 +322,51 @@ mod test {
319322
drop(guard);
320323
}
321324

325+
#[tokio::test]
326+
async fn should_not_sleep_after_timeout() {
327+
let server = &*MOCK_SERVER;
328+
let path = "/ws_get_messages";
329+
let mut guard = server.lock().unwrap();
330+
// do not drop the guard until the end of this test to make sure that no other test interleaves and overwrites the mock response
331+
let mock = guard
332+
.mock("GET", path)
333+
.with_chunked_body(|w| {
334+
thread::sleep(Duration::from_millis(POLLING_TIMEOUT_MS + 10));
335+
w.write_all(&vec![])
336+
})
337+
.expect(2)
338+
.create_async()
339+
.await;
340+
341+
let polling_interval_ms = 100;
342+
let (client_channel_tx, _): (Sender<IcWsCanisterMessage>, Receiver<IcWsCanisterMessage>) =
343+
mpsc::channel(100);
344+
345+
let mut poller = create_poller(polling_interval_ms, client_channel_tx);
346+
347+
// check that the poller times out
348+
assert_eq!(
349+
Ok(PollingStatus::PollerTimedOut),
350+
poller.poll_canister().await
351+
);
352+
353+
// check that the poller does not wait for a polling interval after timing out
354+
let start_polling_instant = tokio::time::Instant::now();
355+
poller.poll_and_relay().await.expect("Failed to poll");
356+
let end_polling_instant = tokio::time::Instant::now();
357+
let elapsed = end_polling_instant - start_polling_instant;
358+
println!("Elapsed due to timeout: {:?}", elapsed);
359+
assert!(
360+
// The `poll_canister` function should not sleep for `polling_interval_ms`
361+
// after the poller times out.
362+
elapsed < Duration::from_millis(POLLING_TIMEOUT_MS + polling_interval_ms)
363+
);
364+
365+
mock.assert_async().await;
366+
// just to make it explicit that the guard should be kept for the whole duration of the test
367+
drop(guard);
368+
}
369+
322370
#[tokio::test]
323371
async fn should_terminate_polling_with_error() {
324372
let server = &*MOCK_SERVER;

src/ic-websocket-gateway/src/ws_listener.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ impl WsListener {
146146
Level::DEBUG,
147147
"Accept Connection",
148148
client_addr = ?client_addr.ip(),
149-
client_id = self.next_client_id
149+
client_id = self.next_client_id,
150+
cargo_version = env!("CARGO_PKG_VERSION"),
150151
);
151152
let client_id = self.next_client_id;
152153
let tls_acceptor = self.tls_acceptor.clone();

0 commit comments

Comments
 (0)