99import logging
1010from asyncio .exceptions import TimeoutError
1111
12+ import backoff
1213import websockets
1314from websockets .exceptions import (
1415 ConnectionClosedError ,
1516 ConnectionClosedOK ,
1617 InvalidStatusCode ,
17- WebSocketException ,
1818)
19- import backoff
19+
2020
2121from connect .eaas .config import ConfigHelper
2222from connect .eaas .constants import (
23+ DELAY_ON_CONNECT_EXCEPTION_SECONDS ,
2324 MAX_RETRY_DELAY_TIME_SECONDS ,
2425 MAX_RETRY_TIME_GENERIC_SECONDS ,
2526 MAX_RETRY_TIME_MAINTENANCE_SECONDS ,
3738 StopBackoffError ,
3839)
3940from connect .eaas .handler import ExtensionHandler
41+ from connect .eaas .helpers import to_ordinal
4042from connect .eaas .managers import (
4143 BackgroundTasksManager ,
4244 InteractiveTasksManager ,
@@ -59,28 +61,6 @@ def _get_max_retry_delay_time():
5961 return MAX_RETRY_DELAY_TIME_SECONDS
6062
6163
62- _ORDINAL_DICT = {
63- 1 : 'st' ,
64- 2 : 'nd' ,
65- 3 : 'rd' ,
66- 11 : 'th' ,
67- 12 : 'th' ,
68- 13 : 'th' ,
69- }
70-
71-
72- def _on_communication_backoff (details ):
73- if details ['tries' ] > 14 :
74- ordinal_attempt = _ORDINAL_DICT .get (int (str (details ['tries' ])[- 1 ]), 'th' )
75- else :
76- ordinal_attempt = _ORDINAL_DICT .get (details ['tries' ], 'th' )
77- logger .info (
78- f'{ details ["tries" ]} { ordinal_attempt } communication attempt failed, backing off waiting '
79- f'{ details ["wait" ]:.2f} seconds after next retry. Elapsed time: { details ["elapsed" ]:.2f} '
80- ' seconds.' ,
81- )
82-
83-
8464class Worker :
8565 """
8666 The Worker is responsible to handle the websocket connection
@@ -126,18 +106,50 @@ def get_url(self):
126106 url = f'{ url } ?running_tasks={ self .background_manager .running_tasks } '
127107 return f'{ url } &running_scheduled_tasks={ self .scheduled_manager .running_tasks } '
128108
129- async def ensure_connection (self ):
109+ async def ensure_connection (self ): # noqa: CCR001
130110 """
131111 Ensure that a websocket connection is established.
132112 """
133- if self .ws is None or self .ws .closed :
134- url = self .get_url ()
135- self .ws = await websockets .connect (
136- url ,
137- extra_headers = self .config .get_headers (),
138- )
139- await (await self .ws .ping ())
140- logger .info (f'Connected to { url } ' )
113+ @backoff .on_exception (
114+ backoff .expo ,
115+ CommunicationError ,
116+ max_time = _get_max_retry_time_generic ,
117+ max_value = _get_max_retry_delay_time ,
118+ on_backoff = self ._backoff_log ,
119+ giveup = self ._backoff_shutdown ,
120+ )
121+ @backoff .on_exception (
122+ backoff .expo ,
123+ MaintenanceError ,
124+ max_time = _get_max_retry_time_maintenance ,
125+ max_value = _get_max_retry_delay_time ,
126+ on_backoff = self ._backoff_log ,
127+ giveup = self ._backoff_shutdown ,
128+ )
129+ async def _connect ():
130+ if self .ws is None or self .ws .closed :
131+ try :
132+ url = self .get_url ()
133+ self .ws = await websockets .connect (
134+ url ,
135+ extra_headers = self .config .get_headers (),
136+ )
137+ await (await self .ws .ping ())
138+ logger .info (f'Connected to { url } ' )
139+ except InvalidStatusCode as ic :
140+ if ic .status_code == 502 :
141+ logger .warning ('Maintenance in progress...' )
142+ raise MaintenanceError ()
143+ else :
144+ logger .warning (
145+ f'Received an unexpected status from server: { ic .status_code } ...' ,
146+ )
147+ raise CommunicationError ()
148+ except Exception as e :
149+ logger .warning (f'Received an unexpected exception: { e } ...' )
150+ raise CommunicationError ()
151+
152+ await _connect ()
141153
142154 async def send (self , message ):
143155 """
@@ -169,82 +181,51 @@ def get_capabilities(self):
169181 ),
170182 )
171183
172- async def communicate (self ): # noqa: CCR001
173- @backoff .on_exception (
174- backoff .expo ,
175- CommunicationError ,
176- factor = 10 ,
177- max_time = _get_max_retry_time_generic ,
178- max_value = _get_max_retry_delay_time ,
179- jitter = backoff .random_jitter ,
180- on_backoff = _on_communication_backoff ,
181- giveup = self ._backoff_shutdown ,
182- )
183- @backoff .on_exception (
184- backoff .expo ,
185- MaintenanceError ,
186- factor = 10 ,
187- max_time = _get_max_retry_time_maintenance ,
188- max_value = _get_max_retry_delay_time ,
189- jitter = backoff .random_jitter ,
190- on_backoff = _on_communication_backoff ,
191- giveup = self ._backoff_shutdown ,
192- )
193- async def _do_communicate ():
184+ async def run (self ): # noqa: CCR001
185+ """
186+ Main loop for the websocket connection.
187+ Once started, this worker will send the capabilities message to
188+ the websocket server and start a loop to receive messages from the
189+ websocket server.
190+ """
191+ await self .run_event .wait ()
192+ while self .run_event .is_set ():
194193 try :
195194 await self .ensure_connection ()
196195 await self .send (self .get_capabilities ())
197196 while self .run_event .is_set ():
198- await self .ensure_connection ()
199197 message = await self .receive ()
200198 if not message :
201199 continue
202200 await self .process_message (message )
203- except ConnectionClosedError as e :
204- logger .warning (f'Connection closed with code { e .rcvd } from: { self .get_url ()} ' )
205- raise CommunicationError ()
201+ except (ConnectionClosedOK , StopBackoffError ):
202+ break
203+ except ConnectionClosedError :
204+ logger .warning (
205+ f'Disconnected from: { self .get_url ()} '
206+ f', try to reconnect in { DELAY_ON_CONNECT_EXCEPTION_SECONDS } s' ,
207+ )
208+ await asyncio .sleep (DELAY_ON_CONNECT_EXCEPTION_SECONDS )
206209 except InvalidStatusCode as ic :
207210 if ic .status_code == 502 :
208- logger .warning ('InvalidStatusCode 502 raised. Maintenance in progress.' )
209- raise MaintenanceError ()
211+ logger .warning (
212+ 'Maintenance in progress'
213+ f', try to reconnect in { DELAY_ON_CONNECT_EXCEPTION_SECONDS } s' ,
214+ )
215+ await asyncio .sleep (DELAY_ON_CONNECT_EXCEPTION_SECONDS )
210216 else :
211- logger .warning (f'InvalidStatusCode { ic . status_code } raised.' )
212- raise CommunicationError ()
213- except WebSocketException as wse :
214- logger . warning ( f'Unexpected websocket exception { wse } .' )
215- raise CommunicationError ( )
217+ logger .warning (
218+ f'Received an unexpected status from server: { ic . status_code } '
219+ f', try to reconnect in { DELAY_ON_CONNECT_EXCEPTION_SECONDS } s' ,
220+ )
221+ await asyncio . sleep ( DELAY_ON_CONNECT_EXCEPTION_SECONDS )
216222 except Exception as e :
217- logger .warning (f'Unexpected error in communicate: { e } .' )
218- raise CommunicationError ()
219-
220- await _do_communicate ()
221-
222- async def run (self ): # noqa: CCR001
223- """
224- Main loop for the websocket connection.
225- Once started, this worker will send the capabilities message to
226- the websocket server and start a loop to receive messages from the
227- websocket server.
228- """
229- await self .run_event .wait ()
230- while self .run_event .is_set ():
231- try :
232- await self .communicate ()
233- except ConnectionClosedOK :
234- self .run_event .clear ()
235- except CommunicationError :
236- logger .error (
237- f'Max retries exceeded after { MAX_RETRY_TIME_GENERIC_SECONDS } seconds' ,
223+ logger .exception (
224+ f'Unexpected exception { e } '
225+ f', try to reconnect in { DELAY_ON_CONNECT_EXCEPTION_SECONDS } s' ,
238226 )
239- self .run_event .clear ()
240- except MaintenanceError :
241- logger .error (
242- f'Max retries exceeded after { MAX_RETRY_TIME_MAINTENANCE_SECONDS } '
243- 'seconds' ,
244- )
245- self .run_event .clear ()
246- except StopBackoffError :
247- pass
227+ await asyncio .sleep (DELAY_ON_CONNECT_EXCEPTION_SECONDS )
228+
248229 if self .ws :
249230 await self .ws .close ()
250231
@@ -381,3 +362,11 @@ def _backoff_shutdown(self, _):
381362 if not self .run_event .is_set ():
382363 logger .info ('Worker exiting, stop backoff loop' )
383364 raise StopBackoffError ()
365+
366+ def _backoff_log (self , details ):
367+ logger .info (
368+ f'{ to_ordinal (details ["tries" ])} communication attempt failed, backing off waiting '
369+ f'{ details ["wait" ]:.2f} seconds after next retry. '
370+ f'Elapsed time: { details ["elapsed" ]:.2f} '
371+ ' seconds.' ,
372+ )
0 commit comments