Skip to content

Commit 6e32ef4

Browse files
authored
Merge pull request #162 from cloudblue/log_transformation_progress_to_logz
Log transformations progress using extension logger; relax openpyxl d…
2 parents aae6af1 + 3747e31 commit 6e32ef4

File tree

3 files changed

+73
-22
lines changed

3 files changed

+73
-22
lines changed

connect/eaas/runner/managers/transformation.py

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ def get_client(self, task_data):
7575

7676
return self.client
7777

78+
def get_extension_logger(self, task_data):
79+
return self.handler.get_logger(
80+
extra={'task_id': task_data.options.task_id},
81+
)
82+
7883
def get_sync_client(self, task_data):
7984
return ConnectClient(
8085
task_data.options.api_key,
@@ -173,12 +178,14 @@ async def _fail_task(self, task_data, message):
173178
logger.exception(f'Cannot fail the transformation request {task_data.input.object_id}')
174179

175180
async def process_transformation(self, task_data, tfn_request, method):
181+
extension_logger = self.get_extension_logger(task_data)
176182
semaphore = asyncio.Semaphore(TRANSFORMATION_TASK_MAX_PARALLEL_LINES)
177183
input_file = await asyncio.get_running_loop().run_in_executor(
178184
self.executor,
179185
self.download_excel,
180186
tfn_request,
181187
task_data.options.api_key,
188+
extension_logger,
182189
)
183190
output_file = NamedTemporaryFile(
184191
suffix=f'.{tfn_request["files"]["input"]["name"].split(".")[-1]}',
@@ -192,8 +199,10 @@ async def process_transformation(self, task_data, tfn_request, method):
192199
reader_task = loop.run_in_executor(
193200
self.executor,
194201
self.read_excel,
202+
tfn_request,
195203
input_file,
196204
read_queue,
205+
extension_logger,
197206
loop,
198207
)
199208
writer_task = loop.run_in_executor(
@@ -204,14 +213,16 @@ async def process_transformation(self, task_data, tfn_request, method):
204213
tfn_request['stats']['rows']['total'],
205214
tfn_request['transformation']['columns']['output'],
206215
task_data,
216+
extension_logger,
207217
loop,
208218
)
209219
processor_task = asyncio.create_task(self.process_rows(
210220
semaphore,
211221
read_queue,
212222
result_store,
213223
method,
214-
tfn_request['stats']['rows']['total'],
224+
tfn_request,
225+
extension_logger,
215226
))
216227

217228
tasks = [reader_task, writer_task, processor_task]
@@ -237,15 +248,20 @@ async def process_transformation(self, task_data, tfn_request, method):
237248
)
238249
return TransformationResponse.fail(output=str(e))
239250

240-
await self.send_output_file(task_data, tfn_request['batch']['id'], output_file)
251+
await self.send_output_file(
252+
task_data, tfn_request['batch']['id'], output_file, extension_logger,
253+
)
241254
input_file.close()
242255
output_file.close()
243256
return TransformationResponse.done()
244257

245-
def download_excel(self, tfn_request, api_key):
258+
def download_excel(self, tfn_request, api_key, logger):
246259
input_file_name = tfn_request['files']['input']['name']
247260
input_file = NamedTemporaryFile(suffix=f'.{input_file_name.split(".")[-1]}')
248-
261+
logger.info(
262+
f'Downloading input file for {tfn_request["id"]} '
263+
f'from {self.config.get_api_address()}{input_file_name}',
264+
)
249265
with requests.get(
250266
url=f'{self.config.get_api_address()}{input_file_name}',
251267
stream=True,
@@ -255,16 +271,27 @@ def download_excel(self, tfn_request, api_key):
255271
},
256272
) as response:
257273
response.raise_for_status()
274+
content_length = response.headers.get('Content-Length')
275+
progress = 0
258276
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
259277
input_file.write(chunk)
260-
278+
progress += len(chunk)
279+
logger.debug(
280+
f'Input file download progress for {tfn_request["id"]}:'
281+
f' {progress}/{content_length} bytes',
282+
)
283+
logger.info(
284+
f'Input file for {tfn_request["id"]} '
285+
f'from {self.config.get_api_address()}{input_file_name} downloaded',
286+
)
261287
return input_file
262288

263-
def read_excel(self, filename, queue, loop):
289+
def read_excel(self, tfn_request, filename, queue, logger, loop):
264290
wb = load_workbook(filename=filename, read_only=True)
265291
ws = wb['Data']
266292
lookup_columns = {}
267-
293+
total_rows = tfn_request['stats']['rows']['total']
294+
delta = 1 if total_rows <= 10 else round(total_rows / 10)
268295
for idx, row in enumerate(ws.rows, start=1):
269296
if idx == 1:
270297
for col_idx, col_value in enumerate(row, start=1):
@@ -279,12 +306,20 @@ def read_excel(self, filename, queue, loop):
279306
queue.put((idx, row_data)),
280307
loop,
281308
)
309+
if idx % delta == 0 or idx == total_rows:
310+
logger.info(
311+
f'Input file read progress for {tfn_request["id"]}:'
312+
f' {idx}/{total_rows} rows',
313+
)
282314

315+
logger.info(f'Input file read complete for {tfn_request["id"]}')
283316
wb.close()
284317

285-
async def process_rows(self, semaphore, read_queue, result_store, method, total_rows):
318+
async def process_rows(self, semaphore, read_queue, result_store, method, tfn_request, logger):
286319
rows_processed = 0
287320
tasks = []
321+
total_rows = tfn_request['stats']['rows']['total']
322+
delta = 1 if total_rows <= 10 else round(total_rows / 10)
288323
while rows_processed < total_rows:
289324
await semaphore.acquire()
290325
row_idx, row = await read_queue.get()
@@ -324,8 +359,14 @@ async def process_rows(self, semaphore, read_queue, result_store, method, total_
324359
)
325360

326361
rows_processed += 1
362+
if rows_processed % delta == 0 or rows_processed == total_rows:
363+
logger.info(
364+
f'Starting transformation tasks for {tfn_request["id"]}:'
365+
f' {rows_processed}/{total_rows} started',
366+
)
327367

328368
try:
369+
logger.debug('gathering transformation tasks...')
329370
await asyncio.gather(*tasks)
330371
except Exception as e:
331372
logger.exception('Error during applying transformations.')
@@ -373,7 +414,9 @@ def sync_process_row(self, semaphore, method, row_idx, row, result_store, loop):
373414
finally:
374415
semaphore.release()
375416

376-
def write_excel(self, filename, result_store, total_rows, output_columns, task_data, loop):
417+
def write_excel(
418+
self, filename, result_store, total_rows, output_columns, task_data, logger, loop,
419+
):
377420
wb = Workbook(write_only=True)
378421

379422
ws = wb.create_sheet('Data')
@@ -407,9 +450,9 @@ def write_excel(self, filename, result_store, total_rows, output_columns, task_d
407450
rows_processed += 1
408451
if rows_processed % delta == 0 or rows_processed == total_rows:
409452
self.send_stat_update(task_data, rows_processed, total_rows)
410-
logger.debug(
411-
f'{task_data.input.object_id} processed {rows_processed}'
412-
f' of {total_rows} rows',
453+
logger.info(
454+
f'Writing to output file for {task_data.input.object_id}: {rows_processed}/'
455+
f'{total_rows} written',
413456
)
414457

415458
wb.save(filename)
@@ -436,7 +479,7 @@ def send_stat_update(self, task_data, rows_processed, total_rows):
436479
payload={'stats': {'rows': {'total': total_rows, 'processed': rows_processed}}},
437480
)
438481

439-
async def send_output_file(self, task_data, batch_id, output_file):
482+
async def send_output_file(self, task_data, batch_id, output_file, logger):
440483
client = self.get_client(task_data)
441484

442485
fileobj = open(output_file.name, 'rb')
@@ -452,8 +495,14 @@ async def send_output_file(self, task_data, batch_id, output_file):
452495
}
453496

454497
async def chunks_iterator(): # pragma: no cover
498+
progress = 0
455499
while data := fileobj.read(UPLOAD_CHUNK_SIZE):
456500
yield data
501+
progress += len(data)
502+
logger.debug(
503+
f'Output file upload progress for {task_data.input.object_id}:'
504+
f' {progress}/{file_size} bytes',
505+
)
457506

458507
media_file = await client.ns('media').ns('folders').collection(
459508
'streams_batches',
@@ -463,7 +512,9 @@ async def chunks_iterator(): # pragma: no cover
463512
content=chunks_iterator(),
464513
headers=headers,
465514
)
466-
515+
logger.info(
516+
f'Output file upload completed for {task_data.input.object_id}',
517+
)
467518
media_file_id = json.loads(media_file)['id']
468519
await client('billing').requests[task_data.input.object_id].update(
469520
payload={'files': {'output': {'id': media_file_id}}},

poetry.lock

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ rich = ">=12"
3737
pyfiglet = "^0.8.post1"
3838
devtools = "^0.10.0"
3939
watchfiles = "^0.19.0"
40-
openpyxl = "^3.1.2"
40+
openpyxl = ">=3.0.0,<4"
4141
lxml = "^4.9.2"
4242
uvloop = "^0.17.0"
4343

0 commit comments

Comments
 (0)