Skip to content

Commit 1313a57

Browse files
authored
Merge pull request #41 from omnia-network/fix/duplicate-client-messages
fix: ignore duplicated messages from clients
2 parents 8808870 + bb17f9f commit 1313a57

File tree

2 files changed

+55
-51
lines changed

2 files changed

+55
-51
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,5 @@ scripts/gateway_test.log
1414
.idea
1515

1616
prometheus-prod.yml
17+
18+
.DS_Store

src/ic-websocket-gateway/src/client_session.rs

Lines changed: 53 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use ic_agent::{
1313
};
1414
use serde::{Deserialize, Serialize};
1515
use serde_cbor::{from_slice, to_vec};
16-
use std::sync::Arc;
16+
use std::{fmt::Display, sync::Arc};
1717
use tokio::{
1818
io::{AsyncRead, AsyncWrite},
1919
select,
@@ -47,6 +47,25 @@ pub enum IcWsSessionState {
4747
Closed,
4848
}
4949

50+
impl Display for IcWsSessionState {
51+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52+
let label = match self {
53+
IcWsSessionState::Init => "Init",
54+
IcWsSessionState::Setup(_) => "Setup",
55+
IcWsSessionState::Open => "Open",
56+
IcWsSessionState::Closed => "Closed",
57+
};
58+
59+
write!(f, "{label}")
60+
}
61+
}
62+
63+
impl IcWsSessionState {
64+
pub fn is_closed(&self) -> bool {
65+
matches!(self, IcWsSessionState::Closed)
66+
}
67+
}
68+
5069
/// Possible errors that can occur during an IC WebSocket session
5170
#[derive(Debug, Clone)]
5271
#[allow(dead_code)]
@@ -146,61 +165,47 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ClientSession<S> {
146165
&mut self,
147166
client_update: Result<Message, Error>,
148167
) -> Result<(), IcWsError> {
168+
// upon receiving an update, check if the message is valid and is not a close message
169+
let ws_message = self.check_client_update(client_update)?;
170+
if ws_message.is_close() && !self.session_state.is_closed() {
171+
trace!("Client disconnected while in {} state", self.session_state);
172+
self.session_state = IcWsSessionState::Closed;
173+
return Ok(());
174+
}
175+
149176
match self.session_state {
150177
IcWsSessionState::Init => {
151-
let ws_message = self.handle_ws_errors(client_update)?;
152-
if !ws_message.is_close() {
153-
// upon receiving a message while the session is Init, check if the message is valid
154-
// if not return an error, otherwise set the session state to Setup
155-
// if multiple messages are received while in Init state, 'handle_setup_transition' will
156-
// return an error as the client shall not send more than one message while in Init state
157-
let setup_state = self.handle_setup_transition(ws_message).await?;
158-
self.session_state = setup_state;
159-
Ok(())
160-
} else {
161-
trace!("Client disconnected while in Init state");
162-
self.session_state = IcWsSessionState::Closed;
163-
Ok(())
164-
}
178+
// upon receiving a message while the session is Init, check if the message is valid
179+
// if not return an error, otherwise set the session state to Setup
180+
// if multiple messages are received while in Init state, 'handle_setup_transition' will
181+
// ignore them as the client shall not send more than one message while in Init state
182+
let setup_state = self.handle_setup_transition(ws_message).await?;
183+
self.session_state = setup_state;
165184
},
166185
IcWsSessionState::Setup(_) => {
167-
// upon receiving an update while the session is Setup, check if it is due to
168-
// the client disconnecting without a closing handshake
169-
let _ = self.handle_ws_errors(client_update)?;
170-
// upon receiving a message while the session is Setup, discard the message
171-
// and return an error as the client shall not send a message while in Setup state
172-
// this implies a bug in the client SDK
173-
error!("Received client message while in Setup state");
174-
self.session_state = IcWsSessionState::Closed;
175-
Err(IcWsError::IcWsProtocol(String::from(
176-
"Client shall not send messages while in Setup state",
177-
)))
186+
// upon receiving a message while the session is Setup, ignore the message as the
187+
// client shall not send a message while in Setup state
188+
warn!("Received client message while in Setup state");
178189
},
179190
IcWsSessionState::Open => {
180-
let ws_message = self.handle_ws_errors(client_update)?;
181-
if !ws_message.is_close() {
182-
// upon receiving a message while the session is Open, immediately relay the client messages to the IC
183-
// this does not result in a state transition, which shall remain in Open state
184-
self.relay_client_message(ws_message)
185-
.instrument(Span::current())
186-
.await?;
187-
Ok(())
188-
} else {
189-
trace!("Client disconnected while in Open state");
190-
self.session_state = IcWsSessionState::Closed;
191-
Ok(())
192-
}
191+
// upon receiving a message while the session is Open, immediately relay the client messages to the IC
192+
// this does not result in a state transition, which shall remain in Open state
193+
self.relay_client_message(ws_message)
194+
.instrument(Span::current())
195+
.await?;
193196
},
194197
IcWsSessionState::Closed => {
195198
// upon receiving a message while the session is Closed, discard the message
196199
// and return an error as this shall not be possible
197200
// this implies a bug in the WS Gateway
198201
error!("Received client message while in Closed state");
199-
Err(IcWsError::IcWsProtocol(String::from(
202+
return Err(IcWsError::IcWsProtocol(String::from(
200203
"Client shall not send messages while in Closed state",
201-
)))
204+
)));
202205
},
203206
}
207+
208+
Ok(())
204209
}
205210

206211
async fn handle_canister_update(
@@ -238,7 +243,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ClientSession<S> {
238243
// this implies a bug in the WS Gateway
239244
error!("Received canister message while in Closed state");
240245
Err(IcWsError::IcWsProtocol(String::from(
241-
"Poller shall not be able tosend messages while the session is in Closed state",
246+
"Poller shall not be able to send messages while the session is in Closed state",
242247
)))
243248
},
244249
}
@@ -253,11 +258,11 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ClientSession<S> {
253258
}
254259
}
255260

256-
fn handle_ws_errors(
261+
fn check_client_update(
257262
&mut self,
258-
canister_update: Result<Message, Error>,
263+
client_update: Result<Message, Error>,
259264
) -> Result<Message, IcWsError> {
260-
match canister_update {
265+
match client_update {
261266
Ok(ws_message) => Ok(ws_message),
262267
Err(e) => {
263268
// no need to update the session state to Closed as 'update_state' will return an error
@@ -284,17 +289,14 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ClientSession<S> {
284289
Ok((client_key, canister_id)) => {
285290
// replace the field with the canister_id received in the first envelope
286291
// this shall not be updated anymore
287-
// if canister_id is already set in the struct, we return an error as inspect_ic_ws_open_message shall only be called once
292+
// if canister_id is already set in the struct, we log a warning as inspect_ic_ws_open_message shall only be called once
288293
if self.canister_id.replace(canister_id).is_some()
289294
|| self.client_key.replace(client_key.clone()).is_some()
290295
{
291296
// if the canister_id or client_key field was already set,
292297
// it means that the client sent the WS open message twice,
293298
// which it shall not do
294-
// therefore, return an error
295-
return Err(IcWsError::IcWsProtocol(String::from(
296-
"canister_id or client_key field was set twice",
297-
)));
299+
warn!("canister_id or client_key field was set twice");
298300
}
299301
trace!("Validated WS open message");
300302

@@ -366,7 +368,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ClientSession<S> {
366368
.await
367369
.map_err(|e| IcWsError::IcWsProtocol(e.to_string()))?;
368370

369-
// there is no need to relay the response back to the client as the response to a request to the /call enpoint is not certified by the canister
371+
// there is no need to relay the response back to the client as the response to a request to the /call endpoint is not certified by the canister
370372
// and therefore could be manufactured by the gateway
371373

372374
trace!("Relayed client message to canister");

0 commit comments

Comments
 (0)