Skip to content

Commit d91f94d

Browse files
authored
Merge pull request #18 from cloudblue/LITE-19459-result-sender-wait-ws-reconnect
LITE-19459 wait ws to reconnect before sending results
2 parents 913c5df + c74c760 commit d91f94d

File tree

2 files changed

+50
-0
lines changed

2 files changed

+50
-0
lines changed

connect/eaas/manager.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,13 @@ async def result_sender(self): # noqa: CCR001
157157
return
158158
await asyncio.sleep(.1)
159159
continue
160+
if self.worker.ws is None or self.worker.ws.closed:
161+
if not self.run_event.is_set() and self.running_tasks == 0:
162+
return
163+
logger.debug('wait WS reconnection before resuming result sender')
164+
await asyncio.sleep(.1)
165+
continue
166+
160167
result = await self.results_queue.get()
161168
logger.debug(f'got result from queue: {result}')
162169
retries = 0

tests/test_manager.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ async def test_background_task_sync(mocker, extension_cls, task_type):
4545
extension = extension_class(None, None, None)
4646

4747
worker = mocker.MagicMock()
48+
worker.ws = mocker.MagicMock(closed=False)
4849
worker.get_extension.return_value = extension
4950
worker.send = mocker.AsyncMock()
5051
worker.capabilities = {task_type: ['pending']}
@@ -79,6 +80,7 @@ async def test_background_task_sync_unsupported_status(mocker, extension_cls):
7980
extension = extension_class(None, None, None)
8081

8182
worker = mocker.MagicMock()
83+
worker.ws = mocker.MagicMock(closed=False)
8284
worker.get_extension.return_value = extension
8385
worker.send = mocker.AsyncMock()
8486
worker.capabilities = {TaskType.ASSET_PURCHASE_REQUEST_PROCESSING: ['pending']}
@@ -116,6 +118,7 @@ async def test_background_task_async(mocker, extension_cls, task_type):
116118
extension = extension_class(None, None, None)
117119

118120
worker = mocker.MagicMock()
121+
worker.ws = mocker.MagicMock(closed=False)
119122
worker.get_extension.return_value = extension
120123
worker.send = mocker.AsyncMock()
121124
worker.capabilities = {task_type: ['pending']}
@@ -155,6 +158,7 @@ async def test_interactive_task_sync(mocker, extension_cls, task_type):
155158
extension = extension_class(None, None, None)
156159

157160
worker = mocker.MagicMock()
161+
worker.ws = mocker.MagicMock(closed=False)
158162
worker.get_extension.return_value = extension
159163
worker.send = mocker.AsyncMock()
160164

@@ -194,6 +198,7 @@ async def test_interactive_task_async(mocker, extension_cls, task_type):
194198
extension = extension_class(None, None, None)
195199

196200
worker = mocker.MagicMock()
201+
worker.ws = mocker.MagicMock(closed=False)
197202
worker.get_extension.return_value = extension
198203
worker.send = mocker.AsyncMock()
199204

@@ -225,6 +230,7 @@ async def test_background_task_request_error(mocker, extension_cls):
225230
extension = extension_class(None, None, None)
226231

227232
worker = mocker.MagicMock()
233+
worker.ws = mocker.MagicMock(closed=False)
228234
worker.get_extension.return_value = extension
229235
worker.send = mocker.AsyncMock()
230236

@@ -259,6 +265,7 @@ async def test_result_sender_retries(mocker, extension_cls):
259265
extension = extension_class(None, None, None)
260266

261267
worker = mocker.MagicMock()
268+
worker.ws = mocker.MagicMock(closed=False)
262269
worker.get_extension.return_value = extension
263270
worker.send = mocker.AsyncMock(side_effect=[Exception('retry'), None])
264271
worker.capabilities = {TaskType.ASSET_PURCHASE_REQUEST_PROCESSING: ['pending']}
@@ -293,6 +300,7 @@ async def test_result_sender_max_retries_exceeded(mocker, extension_cls, caplog)
293300
extension = extension_class(None, None, None)
294301

295302
worker = mocker.MagicMock()
303+
worker.ws = mocker.MagicMock(closed=False)
296304
worker.get_extension.return_value = extension
297305
worker.send = mocker.AsyncMock(
298306
side_effect=[Exception('retry') for _ in range(RESULT_SENDER_MAX_RETRIES)],
@@ -316,3 +324,38 @@ async def test_result_sender_max_retries_exceeded(mocker, extension_cls, caplog)
316324
await manager.stop()
317325

318326
assert 'max retries exceeded for sending results of task TQ-000' in caplog.text
327+
328+
329+
@pytest.mark.asyncio
330+
async def test_result_sender_wait_reconnection(mocker, extension_cls, caplog):
331+
extension_class = extension_cls(
332+
'process_asset_purchase_request',
333+
result=ProcessingResponse.done(),
334+
)
335+
extension = extension_class(None, None, None)
336+
337+
worker = mocker.MagicMock()
338+
worker.ws = mocker.MagicMock(closed=True)
339+
worker.get_extension.return_value = extension
340+
worker.send = mocker.AsyncMock()
341+
worker.capabilities = {TaskType.ASSET_PURCHASE_REQUEST_PROCESSING: ['pending']}
342+
343+
manager = TasksManager(worker)
344+
manager.get_request = mocker.AsyncMock(return_value={'id': 'PR-000', 'status': 'pending'})
345+
346+
manager.start()
347+
348+
task = TaskPayload(
349+
'TQ-000',
350+
TaskCategory.BACKGROUND,
351+
TaskType.ASSET_PURCHASE_REQUEST_PROCESSING,
352+
'PR-000',
353+
)
354+
with caplog.at_level(logging.DEBUG):
355+
await manager.submit_task(task)
356+
await asyncio.sleep(.2)
357+
await manager.stop()
358+
359+
assert 'wait WS reconnection before resuming result sender' in [
360+
record.message for record in caplog.records
361+
]

0 commit comments

Comments
 (0)