Skip to content

Commit e13e836

Browse files
author
Francesco Faraone
committed
LITE-28771 increase transformation task timeout to 16h; fix logging mem consumption; improve exception handling
1 parent b4bf8f4 commit e13e836

File tree

12 files changed

+141
-39
lines changed

12 files changed

+141
-39
lines changed

connect/eaas/runner/handlers/anvil.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@
1010
from connect.client import (
1111
ConnectClient,
1212
)
13-
from connect.eaas.core.logging import (
14-
RequestLogger,
15-
)
1613
from connect.eaas.runner.config import (
1714
ConfigHelper,
1815
)
1916
from connect.eaas.runner.handlers.base import (
2017
ApplicationHandlerBase,
2118
)
19+
from connect.eaas.runner.logging import (
20+
RequestLogger,
21+
)
2222

2323

2424
logger = logging.getLogger(__name__)

connect/eaas/runner/handlers/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414
)
1515
from connect.eaas.core.logging import (
1616
ExtensionLogHandler,
17-
RequestLogger,
1817
)
1918
from connect.eaas.runner.helpers import (
2019
iter_entry_points,
2120
)
21+
from connect.eaas.runner.logging import (
22+
RequestLogger,
23+
)
2224

2325

2426
class ApplicationHandlerBase(ABC):

connect/eaas/runner/logging.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from connect.eaas.core.logging import (
2+
RequestLogger as _RequestLogger,
3+
)
4+
5+
6+
class RequestLogger(_RequestLogger):
7+
def log_request(self, method, url, kwargs):
8+
if not self.logger.isEnabledFor(self.level):
9+
return
10+
super().log_request(method, url, kwargs)
11+
12+
def log_response(self, response):
13+
if not self.logger.isEnabledFor(self.level):
14+
return
15+
super().log_response(response)

connect/eaas/runner/managers/transformation.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@
3535
from connect.eaas.core.enums import (
3636
ResultType,
3737
)
38-
from connect.eaas.core.logging import (
39-
RequestLogger,
40-
)
4138
from connect.eaas.core.proto import (
4239
Task,
4340
TaskOutput,
@@ -54,6 +51,9 @@
5451
TRANSFORMATION_TASK_MAX_PARALLEL_LINES,
5552
UPLOAD_CHUNK_SIZE,
5653
)
54+
from connect.eaas.runner.logging import (
55+
RequestLogger,
56+
)
5757
from connect.eaas.runner.managers.base import (
5858
TasksManagerBase,
5959
)
@@ -163,7 +163,7 @@ async def build_response(self, task_data, future):
163163
except Exception as e:
164164
cause = (
165165
str(e) if not isinstance(e, asyncio.TimeoutError)
166-
else 'timed out after {timeout} s'
166+
else f'timed out after {timeout} s'
167167
)
168168
self.log_exception(task_data, e)
169169
await self._fail_task(task_data, cause)
@@ -251,7 +251,10 @@ async def process_transformation(self, task_data, tfn_request, method):
251251
await client.conversations[task_data.input.object_id].messages.create(
252252
payload={
253253
'type': 'message',
254-
'text': f'Transformation request processing failed: {str(e) or "timed out"}',
254+
'text': (
255+
'Transformation request processing failed: '
256+
f'{self.format_exception_message(e)}'
257+
),
255258
},
256259
)
257260
return TransformationResponse.fail(output=str(e))
@@ -381,7 +384,6 @@ async def process_rows(self, read_queue, result_store, method, tfn_request, logg
381384
async def transform_row(self, method, row_idx, row, row_styles):
382385
try:
383386
if ROW_DELETED_MARKER in list(row.values()):
384-
# await result_store.put((row_idx, RowTransformationResponse.delete()))
385387
return RowTransformationResponse.delete()
386388
kwargs = {}
387389
if 'row_styles' in inspect.signature(method).parameters:
@@ -394,19 +396,24 @@ async def transform_row(self, method, row_idx, row, row_styles):
394396
self.executor,
395397
functools.partial(method, row, **kwargs),
396398
)
399+
timeout = self.config.get_timeout('row_transformation')
397400
response = await asyncio.wait_for(
398401
awaitable,
399-
timeout=self.config.get_timeout('row_transformation'),
402+
timeout=timeout,
400403
)
401404
if not isinstance(response, RowTransformationResponse):
402405
raise RowTransformationError(f'invalid row tranformation response: {response}')
403406
if response.status == ResultType.FAIL:
404407
raise RowTransformationError(f'row transformation failed: {response.output}')
405408
return response
406409
except Exception as e:
410+
cause = (
411+
str(e) if not isinstance(e, asyncio.TimeoutError)
412+
else f'timed out after {timeout} s'
413+
)
407414
raise RowTransformationError(
408415
f'Error applying transformation function {method.__name__} '
409-
f'to row #{row_idx}: {str(e)}.',
416+
f'to row #{row_idx}: {cause}.',
410417
) from e
411418

412419
def write_excel(
@@ -525,3 +532,11 @@ async def chunks_iterator(): # pragma: no cover
525532
payload={'files': {'output': {'id': media_file_id}}},
526533
)
527534
await client(ns).requests[task_data.input.object_id]('process').post()
535+
536+
def format_exception_message(self, e):
537+
if isinstance(e, asyncio.CancelledError):
538+
return 'cancelled'
539+
elif isinstance(e, asyncio.TimeoutError):
540+
return 'timed out'
541+
else:
542+
return str(e) or repr(e)

connect/eaas/runner/workers/anvil.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,6 @@
77
import logging
88
import signal
99

10-
from devtools import (
11-
pformat,
12-
)
13-
1410
from connect.eaas.core.proto import (
1511
Message,
1612
MessageType,
@@ -54,15 +50,15 @@ def get_setup_request(self):
5450
runner_version=get_version(),
5551
),
5652
)
57-
logger.debug(f'Sending setup request: {pformat(msg)}')
53+
logger.debug(f'Sending setup request: {self.prettify(msg)}')
5854
return msg.dict()
5955

6056
async def stopping(self):
6157
pass
6258

6359
async def process_message(self, data):
6460
message = Message.deserialize(data)
65-
logger.debug(f'Received message: {pformat(message)}')
61+
logger.debug(f'Received message: {self.prettify(message)}')
6662
if message.message_type == MessageType.SETUP_RESPONSE:
6763
await self.process_setup_response(message.data)
6864
self.handler.start()

connect/eaas/runner/workers/base.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ def stop(self):
303303
async def send_shutdown(self):
304304
try:
305305
msg = Message(version=2, message_type=MessageType.SHUTDOWN)
306-
logger.debug(f'Sending message: {pformat(msg)}')
306+
logger.debug(f'Sending message: {self.prettify(msg)}')
307307
await self.send(msg.serialize())
308308
except ConnectionClosedError:
309309
pass
@@ -331,6 +331,11 @@ async def stopping(self):
331331
async def process_message(self, data):
332332
raise NotImplementedError()
333333

334+
def prettify(self, msg):
335+
if logger.isEnabledFor(logging.DEBUG):
336+
return pformat(msg)
337+
return '<...>'
338+
334339
def _backoff_shutdown(self, _):
335340
if not self.run_event.is_set():
336341
logger.info(f'{self} exiting, stop backoff loop')

connect/eaas/runner/workers/events.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,6 @@
77
import logging
88
import signal
99

10-
from devtools import (
11-
pformat,
12-
)
13-
1410
from connect.eaas.core.proto import (
1511
Message,
1612
MessageType,
@@ -103,15 +99,15 @@ def get_setup_request(self):
10399
runner_version=get_version(),
104100
),
105101
)
106-
logger.debug(f'Sending setup request: {pformat(msg)}')
102+
logger.debug(f'Sending setup request: {self.prettify(msg)}')
107103
return msg.dict()
108104

109105
async def process_message(self, data):
110106
"""
111107
Process a message received from the websocket server.
112108
"""
113109
message = Message.deserialize(data)
114-
logger.debug(f'Received message: {pformat(message)}')
110+
logger.debug(f'Received message: {self.prettify(message)}')
115111
if message.message_type == MessageType.SETUP_RESPONSE:
116112
await self.process_setup_response(message.data)
117113
elif message.message_type == MessageType.TASK:
@@ -156,7 +152,7 @@ async def result_sender(self): # noqa: CCR001
156152
data=result,
157153
)
158154
await self.ensure_connection()
159-
logger.debug(f'Sending message: {pformat(message)}')
155+
logger.debug(f'Sending message: {self.prettify(message)}')
160156
await self.send(message.serialize())
161157
logger.info(f'Result for task {result.options.task_id} has been sent.')
162158
break

connect/eaas/runner/workers/transformations.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,6 @@
77
import logging
88
import signal
99

10-
from devtools import (
11-
pformat,
12-
)
13-
1410
from connect.eaas.core.proto import (
1511
Message,
1612
MessageType,
@@ -70,7 +66,7 @@ def get_setup_request(self):
7066
runner_version=get_version(),
7167
),
7268
)
73-
logger.debug(f'Sending setup request: {pformat(msg)}')
69+
logger.debug(f'Sending setup request: {self.prettify(msg)}')
7470
return msg.dict()
7571

7672
async def stopping(self):
@@ -92,7 +88,7 @@ async def stopping(self):
9288

9389
async def process_message(self, data):
9490
message = Message.deserialize(data)
95-
logger.debug(f'Received message: {pformat(message)}')
91+
logger.debug(f'Received message: {self.prettify(message)}')
9692
if message.message_type == MessageType.SETUP_RESPONSE:
9793
await self.process_setup_response(message.data)
9894
elif message.message_type == MessageType.TASK:
@@ -132,7 +128,7 @@ async def result_sender(self): # noqa: CCR001
132128
data=result,
133129
)
134130
await self.ensure_connection()
135-
logger.debug(f'Sending message: {pformat(message)}')
131+
logger.debug(f'Sending message: {self.prettify(message)}')
136132
await self.send(message.serialize())
137133
logger.info(
138134
f'Result for transformation task {result.options.task_id} has been sent.',

connect/eaas/runner/workers/web.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,6 @@
1111
import signal
1212

1313
import httpx
14-
from devtools import (
15-
pformat,
16-
)
1714

1815
from connect.eaas.core.proto import (
1916
HttpRequest,
@@ -63,15 +60,15 @@ def get_setup_request(self):
6360
proxied_connect_api=self.handler.proxied_connect_api,
6461
),
6562
)
66-
logger.debug(f'Sending setup request: {pformat(msg)}')
63+
logger.debug(f'Sending setup request: {self.prettify(msg)}')
6764
return msg.dict()
6865

6966
async def stopping(self):
7067
pass
7168

7269
async def process_message(self, data):
7370
message = Message.deserialize(data)
74-
logger.debug(f'Received message: {pformat(message)}')
71+
logger.debug(f'Received message: {self.prettify(message)}')
7572
if message.message_type == MessageType.SETUP_RESPONSE:
7673
await self.process_setup_response(message.data)
7774
elif message.message_type == MessageType.WEB_TASK:
@@ -184,7 +181,7 @@ def build_response(self, task, status, headers, body):
184181
message_type=MessageType.WEB_TASK,
185182
data=task_response,
186183
)
187-
logger.debug(f'Sending message: {pformat(message)}')
184+
logger.debug(f'Sending message: {self.prettify(message)}')
188185
return message.serialize()
189186

190187

tests/managers/test_transformation.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,3 +739,23 @@ def test_generate_output_row_invalid_status(mocker):
739739
manager.generate_output_row(mocker.MagicMock(), column_names, response)
740740

741741
assert str(cv.value) == 'Invalid row transformation response status: reschedule.'
742+
743+
744+
def test_format_exception_msg(mocker):
745+
manager = TransformationTasksManager(mocker.MagicMock(), mocker.MagicMock(), mocker.MagicMock())
746+
747+
assert manager.format_exception_message(
748+
asyncio.CancelledError(asyncio.Future()),
749+
) == 'cancelled'
750+
751+
assert manager.format_exception_message(
752+
asyncio.TimeoutError(20),
753+
) == 'timed out'
754+
755+
assert manager.format_exception_message(
756+
Exception('hello'),
757+
) == 'hello'
758+
759+
assert manager.format_exception_message(
760+
Exception(),
761+
) == 'Exception()'

0 commit comments

Comments
 (0)