Skip to content

Commit 8651d32

Browse files
authored
Merge pull request #179 from cloudblue/LITE-28586-simplify-transformations-manager
LITE-28586 simplify tr manager logic
2 parents 73f26ec + a3d6fcb commit 8651d32

File tree

6 files changed

+184
-388
lines changed

6 files changed

+184
-388
lines changed

connect/eaas/runner/managers/transformation.py

Lines changed: 50 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import functools
23
import inspect
34
import json
45
import logging
@@ -56,9 +57,6 @@
5657
from connect.eaas.runner.managers.base import (
5758
TasksManagerBase,
5859
)
59-
from connect.eaas.runner.managers.utils import (
60-
ResultStore,
61-
)
6260

6361

6462
logger = logging.getLogger(__name__)
@@ -190,7 +188,6 @@ async def _fail_task(self, task_data, message):
190188

191189
async def process_transformation(self, task_data, tfn_request, method):
192190
extension_logger = self.get_extension_logger(task_data)
193-
semaphore = asyncio.Semaphore(TRANSFORMATION_TASK_MAX_PARALLEL_LINES)
194191
input_file = await asyncio.get_running_loop().run_in_executor(
195192
self.executor,
196193
self.download_excel,
@@ -203,7 +200,7 @@ async def process_transformation(self, task_data, tfn_request, method):
203200
)
204201

205202
read_queue = asyncio.Queue(TRANSFORMATION_TASK_MAX_PARALLEL_LINES)
206-
result_store = ResultStore()
203+
write_queue = asyncio.Queue()
207204

208205
loop = asyncio.get_event_loop()
209206

@@ -220,17 +217,16 @@ async def process_transformation(self, task_data, tfn_request, method):
220217
self.executor,
221218
self.write_excel,
222219
output_file.name,
223-
result_store,
220+
write_queue,
224221
tfn_request['stats']['rows']['total'],
225222
tfn_request['transformation']['columns']['output'],
226223
task_data,
227224
extension_logger,
228225
loop,
229226
)
230227
processor_task = asyncio.create_task(self.process_rows(
231-
semaphore,
232228
read_queue,
233-
result_store,
229+
write_queue,
234230
method,
235231
tfn_request,
236232
extension_logger,
@@ -255,7 +251,7 @@ async def process_transformation(self, task_data, tfn_request, method):
255251
await client.conversations[task_data.input.object_id].messages.create(
256252
payload={
257253
'type': 'message',
258-
'text': f'Transformation request processing failed: {str(e)}',
254+
'text': f'Transformation request processing failed: {str(e) or "timed out"}',
259255
},
260256
)
261257
return TransformationResponse.fail(output=str(e))
@@ -322,10 +318,11 @@ def read_excel(self, tfn_request, filename, queue, logger, loop):
322318
style.number_format = col_value.number_format
323319
row_styles[lookup_columns[col_idx]] = style
324320

325-
asyncio.run_coroutine_threadsafe(
321+
future = asyncio.run_coroutine_threadsafe(
326322
queue.put((idx, row_data, row_styles)),
327323
loop,
328324
)
325+
future.result()
329326
if idx % delta == 0 or idx == total_rows:
330327
logger.info(
331328
f'Input file read progress for {tfn_request["id"]}:'
@@ -335,112 +332,82 @@ def read_excel(self, tfn_request, filename, queue, logger, loop):
335332
logger.info(f'Input file read complete for {tfn_request["id"]}')
336333
wb.close()
337334

338-
async def process_rows(self, semaphore, read_queue, result_store, method, tfn_request, logger):
335+
async def process_rows(self, read_queue, result_store, method, tfn_request, logger):
339336
rows_processed = 0
340337
tasks = []
341338
total_rows = tfn_request['stats']['rows']['total']
342339
delta = 1 if total_rows <= 10 else round(total_rows / 10)
343340
while rows_processed < total_rows:
344-
await semaphore.acquire()
345341
row_idx, row, row_styles = await read_queue.get()
346-
if inspect.iscoroutinefunction(method):
347-
tasks.append(
348-
asyncio.create_task(
349-
asyncio.wait_for(
350-
self.async_process_row(
351-
semaphore,
352-
method,
353-
row_idx,
354-
row,
355-
row_styles,
356-
result_store,
357-
),
358-
self.config.get_timeout('row_transformation'),
359-
),
360-
),
361-
)
362-
else:
363-
loop = asyncio.get_running_loop()
364-
tasks.append(
365-
asyncio.create_task(
366-
asyncio.wait_for(
367-
loop.run_in_executor(
368-
self.executor,
369-
self.sync_process_row,
370-
semaphore,
371-
method,
372-
row_idx,
373-
row,
374-
row_styles,
375-
result_store,
376-
loop,
377-
),
378-
self.config.get_timeout('row_transformation'),
379-
),
342+
tasks.append(
343+
asyncio.create_task(
344+
self.transform_row(
345+
method,
346+
row_idx,
347+
row,
348+
row_styles,
380349
),
381-
)
350+
),
351+
)
382352

383353
rows_processed += 1
354+
if rows_processed % TRANSFORMATION_TASK_MAX_PARALLEL_LINES == 0:
355+
group = asyncio.gather(*tasks)
356+
try:
357+
results = await group
358+
for result in results:
359+
await result_store.put(result)
360+
except Exception as e:
361+
logger.exception('Error during applying transformations.')
362+
group.cancel()
363+
raise e
364+
tasks = []
384365
if rows_processed % delta == 0 or rows_processed == total_rows:
385366
logger.info(
386367
f'Starting transformation tasks for {tfn_request["id"]}:'
387368
f' {rows_processed}/{total_rows} started',
388369
)
389370

371+
group = asyncio.gather(*tasks)
390372
try:
391-
logger.debug('gathering transformation tasks...')
392-
await asyncio.gather(*tasks)
373+
results = await group
374+
for result in results:
375+
await result_store.put(result)
393376
except Exception as e:
394377
logger.exception('Error during applying transformations.')
395-
for task in tasks:
396-
task.cancel()
378+
group.cancel()
397379
raise e
398380

399-
async def async_process_row(self, semaphore, method, row_idx, row, row_styles, result_store):
381+
async def transform_row(self, method, row_idx, row, row_styles):
400382
try:
401383
if ROW_DELETED_MARKER in list(row.values()):
402-
await result_store.put(row_idx, RowTransformationResponse.delete())
403-
return
384+
# await result_store.put((row_idx, RowTransformationResponse.delete()))
385+
return RowTransformationResponse.delete()
404386
kwargs = {}
405387
if 'row_styles' in inspect.signature(method).parameters:
406388
kwargs['row_styles'] = row_styles
407-
response = await method(row, **kwargs)
408-
if not isinstance(response, RowTransformationResponse):
409-
raise RowTransformationError(f'invalid row tranformation response: {response}')
410-
if response.status == ResultType.FAIL:
411-
raise RowTransformationError(f'row transformation failed: {response.output}')
412-
await result_store.put(row_idx, response)
413-
except Exception as e:
414-
raise RowTransformationError(
415-
f'Error applying transformation function {method.__name__} '
416-
f'to row #{row_idx}: {str(e)}.',
417-
) from e
418-
finally:
419-
semaphore.release()
420-
421-
def sync_process_row(self, semaphore, method, row_idx, row, row_styles, result_store, loop):
422-
try:
423-
if ROW_DELETED_MARKER in list(row.values()):
424-
asyncio.run_coroutine_threadsafe(
425-
result_store.put(row_idx, RowTransformationResponse.delete()), loop,
389+
if inspect.iscoroutinefunction(method):
390+
awaitable = method(row, **kwargs)
391+
else:
392+
loop = asyncio.get_running_loop()
393+
awaitable = loop.run_in_executor(
394+
self.executor,
395+
functools.partial(method, row, **kwargs),
426396
)
427-
return
428-
kwargs = {}
429-
if 'row_styles' in inspect.signature(method).parameters:
430-
kwargs['row_styles'] = row_styles
431-
response = method(row, **kwargs)
397+
response = await asyncio.wait_for(
398+
awaitable,
399+
timeout=self.config.get_timeout('row_transformation'),
400+
)
432401
if not isinstance(response, RowTransformationResponse):
433402
raise RowTransformationError(f'invalid row tranformation response: {response}')
434403
if response.status == ResultType.FAIL:
435404
raise RowTransformationError(f'row transformation failed: {response.output}')
436-
asyncio.run_coroutine_threadsafe(result_store.put(row_idx, response), loop)
405+
return response
437406
except Exception as e:
438407
raise RowTransformationError(
439408
f'Error applying transformation function {method.__name__} '
440409
f'to row #{row_idx}: {str(e)}.',
441410
) from e
442-
finally:
443-
semaphore.release()
444411

445412
def write_excel(
446413
self, filename, result_store, total_rows, output_columns, task_data, logger, loop,
@@ -467,9 +434,9 @@ def write_excel(
467434
REPORT_EVERY_ROW_MAX,
468435
)
469436

470-
for idx in range(2, total_rows + 2):
437+
for _ in range(2, total_rows + 2):
471438
future = asyncio.run_coroutine_threadsafe(
472-
result_store.get(idx),
439+
result_store.get(),
473440
loop,
474441
)
475442
response = future.result(

connect/eaas/runner/managers/utils.py

Lines changed: 0 additions & 20 deletions
This file was deleted.

0 commit comments

Comments
 (0)