Skip to content

Commit 9585196

Browse files
author
Francesco Faraone
committed
Catch generic exception to have last chance to exit clearly.
Improve giveup handler. Apply backoff policy to generic websocket errors.
1 parent 110b86e commit 9585196

File tree

3 files changed

+59
-6
lines changed

3 files changed

+59
-6
lines changed

connect/eaas/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,7 @@ class MaintenanceError(EaaSError):
1313

1414
class CommunicationError(EaaSError):
1515
pass
16+
17+
18+
class StopBackoffError(EaaSError):
19+
pass

connect/eaas/worker.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from connect.eaas.exceptions import (
3535
CommunicationError,
3636
MaintenanceError,
37+
StopBackoffError,
3738
)
3839
from connect.eaas.handler import ExtensionHandler
3940
from connect.eaas.managers import (
@@ -168,7 +169,7 @@ def get_capabilities(self):
168169
),
169170
)
170171

171-
async def communicate(self):
172+
async def communicate(self): # noqa: CCR001
172173
@backoff.on_exception(
173174
backoff.expo,
174175
CommunicationError,
@@ -177,7 +178,7 @@ async def communicate(self):
177178
max_value=_get_max_retry_delay_time,
178179
jitter=backoff.random_jitter,
179180
on_backoff=_on_communication_backoff,
180-
giveup=lambda _: not self.run_event.is_set(),
181+
giveup=self._backoff_shutdown,
181182
)
182183
@backoff.on_exception(
183184
backoff.expo,
@@ -187,7 +188,7 @@ async def communicate(self):
187188
max_value=_get_max_retry_delay_time,
188189
jitter=backoff.random_jitter,
189190
on_backoff=_on_communication_backoff,
190-
giveup=lambda _: not self.run_event.is_set(),
191+
giveup=self._backoff_shutdown,
191192
)
192193
async def _do_communicate():
193194
try:
@@ -209,6 +210,12 @@ async def _do_communicate():
209210
else:
210211
logger.warning(f'InvalidStatusCode {ic.status_code} raised.')
211212
raise CommunicationError()
213+
except WebSocketException as wse:
214+
logger.warning(f'Unexpected websocket exception {wse}.')
215+
raise CommunicationError()
216+
except Exception as e:
217+
logger.warning(f'Unexpected error in communicate: {e}.')
218+
raise CommunicationError()
212219

213220
await _do_communicate()
214221

@@ -225,9 +232,6 @@ async def run(self): # noqa: CCR001
225232
await self.communicate()
226233
except ConnectionClosedOK:
227234
self.run_event.clear()
228-
except WebSocketException:
229-
logger.exception('Unexpected websocket exception. Retrying in 2 seconds.')
230-
await asyncio.sleep(2)
231235
except CommunicationError:
232236
logger.error(
233237
f'Max retries exceeded after {MAX_RETRY_TIME_GENERIC_SECONDS} seconds',
@@ -239,6 +243,8 @@ async def run(self): # noqa: CCR001
239243
'seconds',
240244
)
241245
self.run_event.clear()
246+
except StopBackoffError:
247+
pass
242248
if self.ws:
243249
await self.ws.close()
244250

@@ -370,3 +376,8 @@ def stop(self):
370376
"""
371377
logger.info('Stopping control worker...')
372378
self.run_event.clear()
379+
380+
def _backoff_shutdown(self, _):
381+
if not self.run_event.is_set():
382+
logger.info('Worker exiting, stop backoff loop')
383+
raise StopBackoffError()

tests/test_worker.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,3 +1083,41 @@ def test__on_communication_backoff(caplog, tries, ordinal):
10831083
with caplog.at_level(logging.INFO):
10841084
_on_communication_backoff(details)
10851085
assert expected in caplog.records[0].message
1086+
1087+
1088+
@pytest.mark.asyncio
1089+
async def test_connection_unexpected_error(mocker, ws_server, unused_port, caplog):
1090+
mocker.patch('connect.eaas.worker.MAX_RETRY_TIME_MAINTENANCE_SECONDS', 1)
1091+
mocker.patch('connect.eaas.worker.MAX_RETRY_DELAY_TIME_SECONDS', 1)
1092+
mocker.patch('connect.eaas.handler.get_extension_class')
1093+
mocker.patch('connect.eaas.handler.get_extension_type')
1094+
mocker.patch(
1095+
'connect.eaas.config.get_environment',
1096+
return_value={
1097+
'ws_address': f'127.0.0.1:{unused_port}',
1098+
'api_address': f'127.0.0.1:{unused_port}',
1099+
'api_key': 'SU-000:XXXX',
1100+
'environment_id': 'ENV-000-0001',
1101+
'instance_id': 'INS-000-0002',
1102+
'background_task_max_execution_time': 300,
1103+
'interactive_task_max_execution_time': 120,
1104+
'scheduled_task_max_execution_time': 43200,
1105+
},
1106+
)
1107+
handler = WSHandler(
1108+
'/public/v1/devops/ws/ENV-000-0001/INS-000-0002?running_tasks=0&running_scheduled_tasks=0',
1109+
None,
1110+
[],
1111+
)
1112+
1113+
async with ws_server(handler):
1114+
worker = Worker(secure=False)
1115+
worker.send = mocker.AsyncMock(side_effect=RuntimeError('generic error'))
1116+
with caplog.at_level(logging.INFO):
1117+
task = asyncio.create_task(worker.start())
1118+
await asyncio.sleep(.5)
1119+
worker.stop()
1120+
await task
1121+
1122+
assert 'Unexpected error in communicate: generic error' in caplog.text
1123+
assert 'Backing off ' in caplog.text

0 commit comments

Comments
 (0)