Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions stun/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
mod agent_test;

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use rand::Rng;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -34,7 +34,7 @@ pub struct Agent {
/// data races via unexpected concurrent access.
transactions: HashMap<TransactionId, AgentTransaction>,
/// all calls are invalid if true
closed: bool,
closed: Mutex<bool>,
/// handles transactions
handler: Handler,
}
Expand Down Expand Up @@ -118,15 +118,16 @@ impl Agent {
pub fn new(handler: Handler) -> Self {
Agent {
transactions: HashMap::new(),
closed: false,
closed: Mutex::new(false),
handler,
}
}

/// stop_with_error removes transaction from list and calls handler with
/// provided error. Can return ErrTransactionNotExists and ErrAgentClosed.
pub fn stop_with_error(&mut self, id: TransactionId, error: Error) -> Result<()> {
if self.closed {
let closed = self.closed.lock().unwrap();
if *closed {
return Err(Error::ErrAgentClosed);
}

Expand All @@ -135,6 +136,8 @@ impl Agent {
.remove(&id)
.ok_or(Error::ErrTransactionNotExists)?;

drop(closed);

if let Some(handler) = &self.handler {
handler.send(Event {
event_type: EventType::Callback(t.id),
Expand All @@ -146,12 +149,15 @@ impl Agent {

/// process incoming message, synchronously passing it to handler.
pub fn process(&mut self, message: Message) -> Result<()> {
if self.closed {
let closed = self.closed.lock().unwrap();
if *closed {
return Err(Error::ErrAgentClosed);
}

self.transactions.remove(&message.transaction_id);

drop(closed);

let e = Event {
event_type: EventType::Callback(message.transaction_id),
event_body: Ok(message),
Expand All @@ -167,7 +173,8 @@ impl Agent {
/// close terminates all transactions with ErrAgentClosed and renders Agent to
/// closed state.
pub fn close(&mut self) -> Result<()> {
if self.closed {
let mut closed = self.closed.lock().unwrap();
if *closed {
return Err(Error::ErrAgentClosed);
}

Expand All @@ -181,7 +188,7 @@ impl Agent {
}
}
self.transactions = HashMap::new();
self.closed = true;
*closed = true;
self.handler = noop_handler();

Ok(())
Expand All @@ -192,7 +199,8 @@ impl Agent {
///
/// Agent handler is guaranteed to be eventually called.
pub fn start(&mut self, id: TransactionId, deadline: Instant) -> Result<()> {
if self.closed {
let closed = self.closed.lock().unwrap();
if *closed {
return Err(Error::ErrAgentClosed);
}
if self.transactions.contains_key(&id) {
Expand All @@ -217,7 +225,8 @@ impl Agent {
///
/// It is safe to call Collect concurrently but makes no sense.
pub fn collect(&mut self, deadline: Instant) -> Result<()> {
if self.closed {
let closed = self.closed.lock().unwrap();
if *closed {
// Doing nothing if agent is closed.
// All transactions should be already closed
// during Close() call.
Expand All @@ -240,6 +249,8 @@ impl Agent {
self.transactions.remove(id);
}

drop(closed);

for id in to_remove {
let event = Event {
event_type: EventType::Callback(id),
Expand All @@ -255,7 +266,8 @@ impl Agent {

/// set_handler sets agent handler to h.
pub fn set_handler(&mut self, h: Handler) -> Result<()> {
if self.closed {
let closed = self.closed.lock().unwrap();
if *closed {
return Err(Error::ErrAgentClosed);
}
self.handler = h;
Expand Down
Loading