22mod agent_test;
33
44use std:: collections:: HashMap ;
5- use std:: sync:: Arc ;
5+ use std:: sync:: { Arc , Mutex } ;
66
77use rand:: Rng ;
88use tokio:: sync:: mpsc;
@@ -34,7 +34,7 @@ pub struct Agent {
3434 /// data races via unexpected concurrent access.
3535 transactions : HashMap < TransactionId , AgentTransaction > ,
3636 /// all calls are invalid if true
37- closed : bool ,
37+ closed : Mutex < bool > ,
3838 /// handles transactions
3939 handler : Handler ,
4040}
@@ -118,15 +118,16 @@ impl Agent {
118118 pub fn new ( handler : Handler ) -> Self {
119119 Agent {
120120 transactions : HashMap :: new ( ) ,
121- closed : false ,
121+ closed : Mutex :: new ( false ) ,
122122 handler,
123123 }
124124 }
125125
126126 /// stop_with_error removes transaction from list and calls handler with
127127 /// provided error. Can return ErrTransactionNotExists and ErrAgentClosed.
128128 pub fn stop_with_error ( & mut self , id : TransactionId , error : Error ) -> Result < ( ) > {
129- if self . closed {
129+ let closed = self . closed . lock ( ) . unwrap ( ) ;
130+ if * closed {
130131 return Err ( Error :: ErrAgentClosed ) ;
131132 }
132133
@@ -135,6 +136,8 @@ impl Agent {
135136 . remove ( & id)
136137 . ok_or ( Error :: ErrTransactionNotExists ) ?;
137138
139+ drop ( closed) ;
140+
138141 if let Some ( handler) = & self . handler {
139142 handler. send ( Event {
140143 event_type : EventType :: Callback ( t. id ) ,
@@ -146,12 +149,15 @@ impl Agent {
146149
147150 /// process incoming message, synchronously passing it to handler.
148151 pub fn process ( & mut self , message : Message ) -> Result < ( ) > {
149- if self . closed {
152+ let closed = self . closed . lock ( ) . unwrap ( ) ;
153+ if * closed {
150154 return Err ( Error :: ErrAgentClosed ) ;
151155 }
152156
153157 self . transactions . remove ( & message. transaction_id ) ;
154158
159+ drop ( closed) ;
160+
155161 let e = Event {
156162 event_type : EventType :: Callback ( message. transaction_id ) ,
157163 event_body : Ok ( message) ,
@@ -167,7 +173,8 @@ impl Agent {
167173 /// close terminates all transactions with ErrAgentClosed and renders Agent to
168174 /// closed state.
169175 pub fn close ( & mut self ) -> Result < ( ) > {
170- if self . closed {
176+ let mut closed = self . closed . lock ( ) . unwrap ( ) ;
177+ if * closed {
171178 return Err ( Error :: ErrAgentClosed ) ;
172179 }
173180
@@ -181,7 +188,7 @@ impl Agent {
181188 }
182189 }
183190 self . transactions = HashMap :: new ( ) ;
184- self . closed = true ;
191+ * closed = true ;
185192 self . handler = noop_handler ( ) ;
186193
187194 Ok ( ( ) )
@@ -192,7 +199,8 @@ impl Agent {
192199 ///
193200 /// Agent handler is guaranteed to be eventually called.
194201 pub fn start ( & mut self , id : TransactionId , deadline : Instant ) -> Result < ( ) > {
195- if self . closed {
202+ let closed = self . closed . lock ( ) . unwrap ( ) ;
203+ if * closed {
196204 return Err ( Error :: ErrAgentClosed ) ;
197205 }
198206 if self . transactions . contains_key ( & id) {
@@ -217,7 +225,8 @@ impl Agent {
217225 ///
218226 /// It is safe to call Collect concurrently but makes no sense.
219227 pub fn collect ( & mut self , deadline : Instant ) -> Result < ( ) > {
220- if self . closed {
228+ let closed = self . closed . lock ( ) . unwrap ( ) ;
229+ if * closed {
221230 // Doing nothing if agent is closed.
222231 // All transactions should be already closed
223232 // during Close() call.
@@ -240,6 +249,8 @@ impl Agent {
240249 self . transactions . remove ( id) ;
241250 }
242251
252+ drop ( closed) ;
253+
243254 for id in to_remove {
244255 let event = Event {
245256 event_type : EventType :: Callback ( id) ,
@@ -255,7 +266,8 @@ impl Agent {
255266
256267 /// set_handler sets agent handler to h.
257268 pub fn set_handler ( & mut self , h : Handler ) -> Result < ( ) > {
258- if self . closed {
269+ let closed = self . closed . lock ( ) . unwrap ( ) ;
270+ if * closed {
259271 return Err ( Error :: ErrAgentClosed ) ;
260272 }
261273 self . handler = h;
0 commit comments