3434from connect .eaas .exceptions import (
3535 CommunicationError ,
3636 MaintenanceError ,
37+ StopBackoffError ,
3738)
3839from connect .eaas .handler import ExtensionHandler
3940from connect .eaas .managers import (
@@ -168,7 +169,7 @@ def get_capabilities(self):
168169 ),
169170 )
170171
171- async def communicate (self ):
172+ async def communicate (self ): # noqa: CCR001
172173 @backoff .on_exception (
173174 backoff .expo ,
174175 CommunicationError ,
@@ -177,7 +178,7 @@ async def communicate(self):
177178 max_value = _get_max_retry_delay_time ,
178179 jitter = backoff .random_jitter ,
179180 on_backoff = _on_communication_backoff ,
180- giveup = lambda _ : not self .run_event . is_set () ,
181+ giveup = self ._backoff_shutdown ,
181182 )
182183 @backoff .on_exception (
183184 backoff .expo ,
@@ -187,7 +188,7 @@ async def communicate(self):
187188 max_value = _get_max_retry_delay_time ,
188189 jitter = backoff .random_jitter ,
189190 on_backoff = _on_communication_backoff ,
190- giveup = lambda _ : not self .run_event . is_set () ,
191+ giveup = self ._backoff_shutdown ,
191192 )
192193 async def _do_communicate ():
193194 try :
@@ -209,6 +210,12 @@ async def _do_communicate():
209210 else :
210211 logger .warning (f'InvalidStatusCode { ic .status_code } raised.' )
211212 raise CommunicationError ()
213+ except WebSocketException as wse :
214+ logger .warning (f'Unexpected websocket exception { wse } .' )
215+ raise CommunicationError ()
216+ except Exception as e :
217+ logger .warning (f'Unexpected error in communicate: { e } .' )
218+ raise CommunicationError ()
212219
213220 await _do_communicate ()
214221
@@ -225,9 +232,6 @@ async def run(self): # noqa: CCR001
225232 await self .communicate ()
226233 except ConnectionClosedOK :
227234 self .run_event .clear ()
228- except WebSocketException :
229- logger .exception ('Unexpected websocket exception. Retrying in 2 seconds.' )
230- await asyncio .sleep (2 )
231235 except CommunicationError :
232236 logger .error (
233237 f'Max retries exceeded after { MAX_RETRY_TIME_GENERIC_SECONDS } seconds' ,
@@ -239,6 +243,8 @@ async def run(self): # noqa: CCR001
239243 'seconds' ,
240244 )
241245 self .run_event .clear ()
246+ except StopBackoffError :
247+ pass
242248 if self .ws :
243249 await self .ws .close ()
244250
@@ -370,3 +376,8 @@ def stop(self):
370376 """
371377 logger .info ('Stopping control worker...' )
372378 self .run_event .clear ()
379+
380+ def _backoff_shutdown (self , _ ):
381+ if not self .run_event .is_set ():
382+ logger .info ('Worker exiting, stop backoff loop' )
383+ raise StopBackoffError ()
0 commit comments