diff --git a/stun/src/agent.rs b/stun/src/agent.rs index 5938c7917..1ddad2b7b 100644 --- a/stun/src/agent.rs +++ b/stun/src/agent.rs @@ -2,7 +2,7 @@ mod agent_test; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use rand::Rng; use tokio::sync::mpsc; @@ -34,7 +34,7 @@ pub struct Agent { /// data races via unexpected concurrent access. transactions: HashMap, /// all calls are invalid if true - closed: bool, + closed: Mutex, /// handles transactions handler: Handler, } @@ -118,7 +118,7 @@ impl Agent { pub fn new(handler: Handler) -> Self { Agent { transactions: HashMap::new(), - closed: false, + closed: Mutex::new(false), handler, } } @@ -126,7 +126,8 @@ impl Agent { /// stop_with_error removes transaction from list and calls handler with /// provided error. Can return ErrTransactionNotExists and ErrAgentClosed. pub fn stop_with_error(&mut self, id: TransactionId, error: Error) -> Result<()> { - if self.closed { + let closed = self.closed.lock().unwrap(); + if *closed { return Err(Error::ErrAgentClosed); } @@ -135,6 +136,8 @@ impl Agent { .remove(&id) .ok_or(Error::ErrTransactionNotExists)?; + drop(closed); + if let Some(handler) = &self.handler { handler.send(Event { event_type: EventType::Callback(t.id), @@ -146,12 +149,15 @@ impl Agent { /// process incoming message, synchronously passing it to handler. pub fn process(&mut self, message: Message) -> Result<()> { - if self.closed { + let closed = self.closed.lock().unwrap(); + if *closed { return Err(Error::ErrAgentClosed); } self.transactions.remove(&message.transaction_id); + drop(closed); + let e = Event { event_type: EventType::Callback(message.transaction_id), event_body: Ok(message), @@ -167,7 +173,8 @@ impl Agent { /// close terminates all transactions with ErrAgentClosed and renders Agent to /// closed state. pub fn close(&mut self) -> Result<()> { - if self.closed { + let mut closed = self.closed.lock().unwrap(); + if *closed { return Err(Error::ErrAgentClosed); } @@ -181,7 +188,7 @@ impl Agent { } } self.transactions = HashMap::new(); - self.closed = true; + *closed = true; self.handler = noop_handler(); Ok(()) @@ -192,7 +199,8 @@ impl Agent { /// /// Agent handler is guaranteed to be eventually called. pub fn start(&mut self, id: TransactionId, deadline: Instant) -> Result<()> { - if self.closed { + let closed = self.closed.lock().unwrap(); + if *closed { return Err(Error::ErrAgentClosed); } if self.transactions.contains_key(&id) { @@ -217,7 +225,8 @@ impl Agent { /// /// It is safe to call Collect concurrently but makes no sense. pub fn collect(&mut self, deadline: Instant) -> Result<()> { - if self.closed { + let closed = self.closed.lock().unwrap(); + if *closed { // Doing nothing if agent is closed. // All transactions should be already closed // during Close() call. @@ -240,6 +249,8 @@ impl Agent { self.transactions.remove(id); } + drop(closed); + for id in to_remove { let event = Event { event_type: EventType::Callback(id), @@ -255,7 +266,8 @@ impl Agent { /// set_handler sets agent handler to h. pub fn set_handler(&mut self, h: Handler) -> Result<()> { - if self.closed { + let closed = self.closed.lock().unwrap(); + if *closed { return Err(Error::ErrAgentClosed); } self.handler = h;