Skip to content

Commit 50f6ab2

Browse files
committed
LITE-30582 Increase backoff factor between tasks generation
1 parent 3843454 commit 50f6ab2

File tree

6 files changed

+85
-10
lines changed

6 files changed

+85
-10
lines changed

connect_bi_reporter/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
# Delay in seconds for schedule to process Upload task
1010
SECONDS_DELAY = 120
1111
# Backoff factor in seconds between Upload tasks creation
12-
SECONDS_BACKOFF_FACTOR = 10
12+
SECONDS_BACKOFF_FACTOR = 120
1313
CREATE_UPLOADS_METHOD_NAME = 'create_uploads'
1414
PROCESS_UPLOADS_METHOD_NAME = 'process_upload'
1515
PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD = {

connect_bi_reporter/scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from datetime import datetime, timedelta
2-
from logging import Logger
32
import enum
3+
from logging import Logger
44
from typing import Any, Dict, Optional
55

66
from connect.client import ClientError, ConnectClient
@@ -23,6 +23,7 @@ class TriggerTypeEnum(str, enum.Enum):
2323
class ResponseTypeEnum(str, enum.Enum):
2424
SUCCESS = 'done'
2525
ERROR = 'reschedule'
26+
FAIL = 'fail'
2627

2728

2829
class TriggerType:

connect_bi_reporter/uploads/services.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from datetime import datetime, timedelta
2+
import copy
23

34
from sqlalchemy import util
45
from connect.client import ClientError
@@ -161,7 +162,7 @@ def create_uploads(db, client, logger, feeds):
161162

162163

163164
def get_process_upload_task_payload(installation_id, upload_id, account_id):
164-
payload = PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD
165+
payload = copy.deepcopy(PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD)
165166
payload.update({'name': f'Process Uploads - {account_id}'})
166167
parameters = {
167168
'installation_id': installation_id,

connect_bi_reporter/uploads/tasks.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import io
22
from datetime import datetime
3+
import time
34
from zipfile import ZipFile
45

56
from connect.client import ClientError
@@ -18,7 +19,7 @@
1819
from connect_bi_reporter.uploads.models import Upload
1920
from connect_bi_reporter.uploads.services import create_process_upload_tasks, create_uploads
2021
from connect_bi_reporter.uploads.storage_utils import upload_file
21-
from connect_bi_reporter.scheduler import Scheduler
22+
from connect_bi_reporter.scheduler import ResponseTypeEnum, Scheduler
2223

2324

2425
class UploadTaskApplicationMixin:
@@ -78,6 +79,7 @@ def process_upload(self, schedule):
7879
if 'installation_id' not in schedule['parameter']:
7980
return ScheduledExecutionResponse.fail(output='Parameter installation_id is missing.')
8081

82+
begin_time = time.monotonic()
8183
instalation_client = self.get_installation_admin_client(
8284
schedule['parameter']['installation_id'],
8385
)
@@ -90,15 +92,16 @@ def process_upload(self, schedule):
9092
if not upload:
9193
return ScheduledExecutionResponse.fail(output=f'Invalid upload `{upload_id}`.')
9294

93-
if upload.status != 'pending':
95+
if upload.status != Upload.STATUSES.pending:
9496
return ScheduledExecutionResponse.fail(
9597
output=f'Cannot process upload in status `{upload.status}`.',
9698
)
9799

98-
upload.status = 'processing'
100+
upload.status = Upload.STATUSES.processing
99101
db.add(upload)
100102
db.commit()
101103

104+
execution_method_result = ResponseTypeEnum.SUCCESS
102105
try:
103106
report_data = download_report(instalation_client, upload.report_id)
104107

@@ -112,14 +115,25 @@ def process_upload(self, schedule):
112115
)
113116
upload.size = uploaded_file_props.get('size', 0)
114117

115-
upload.status = 'uploaded'
118+
upload.status = Upload.STATUSES.uploaded
116119
upload.name = file_name
117120
db.add(upload)
118121
db.commit()
119-
return ScheduledExecutionResponse.done()
120122
except Exception:
121123
self.logger.exception(msg='Error processing upload')
122-
upload.status = 'failed'
124+
upload.status = Upload.STATUSES.failed
123125
db.add(upload)
124126
db.commit()
125-
return ScheduledExecutionResponse.fail()
127+
execution_method_result = ResponseTypeEnum.FAIL
128+
129+
took = time.monotonic() - begin_time
130+
self.logger.info(
131+
'Execution of `process_upload` task for Upload {0} finished (took "{1}"): '
132+
'Upload status: `{2}`, Taks result: `{3}`.'.format(
133+
upload.id,
134+
took,
135+
upload.status,
136+
execution_method_result,
137+
),
138+
)
139+
return getattr(ScheduledExecutionResponse, execution_method_result)()

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ build-backend = "poetry.core.masonry.api"
5050
[tool.pytest.ini_options]
5151
testpaths = "tests"
5252
addopts = "--cov=connect_bi_reporter --cov-report=term-missing --cov-report=html --cov-report=xml"
53+
filterwarnings = [
54+
"ignore::sqlalchemy.exc.SADeprecationWarning"
55+
]
5356

5457
[tool.coverage.run]
5558
relative_files = true

tests/uploads/test_tasks.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import datetime, timedelta, timezone
12
import re
23
from unittest.mock import call
34

@@ -7,6 +8,7 @@
78
from connect.eaas.core.inject.models import Context
89
from sqlalchemy.exc import DBAPIError
910

11+
from connect_bi_reporter.constants import SECONDS_BACKOFF_FACTOR, SECONDS_DELAY
1012
from connect_bi_reporter.events import ConnectBiReporterEventsApplication
1113

1214

@@ -16,6 +18,8 @@ def test_process_upload(dbsession, connect_client, installation, logger, mocker,
1618
logger,
1719
config={},
1820
)
21+
p_time = mocker.patch('connect_bi_reporter.uploads.tasks.time')
22+
p_time.monotonic.side_effect = [10, 12]
1923
ext.get_installation_admin_client = lambda self: connect_client
2024

2125
with open('./tests/uploads/test-zip.zip', 'rb') as zf:
@@ -41,6 +45,10 @@ def test_process_upload(dbsession, connect_client, installation, logger, mocker,
4145
assert re.match(feed.file_name + '_\\d{8} \\d{2}:\\d{2}:\\d{2}.csv', upload.name)
4246
assert upload.size == 1024
4347
assert upload.status == upload.STATUSES.uploaded
48+
assert logger.method_calls[0].args[0] == (
49+
f'Execution of `process_upload` task for Upload {upload.id} '
50+
f'finished (took "2"): Upload status: `uploaded`, Taks result: `done`.'
51+
)
4452

4553

4654
def test_process_upload_report_download_failed(
@@ -229,6 +237,11 @@ def test_create_upload_schedule_task(
229237
),
230238
)
231239
ext.get_installation_admin_client = lambda self: connect_client
240+
241+
_now = datetime(2024, 10, 15, 10, 0, 0, tzinfo=timezone.utc)
242+
p_datetime = mocker.patch('connect_bi_reporter.uploads.services.datetime')
243+
p_datetime.utcnow = lambda: _now
244+
232245
mocker.patch(
233246
'connect_bi_reporter.uploads.tasks.get_extension_owner_client',
234247
return_value=connect_client,
@@ -245,6 +258,9 @@ def test_create_upload_schedule_task(
245258
'connect_bi_reporter.scheduler.create_schedule_task',
246259
return_value=eaas_schedule_task,
247260
)
261+
p_get_task_payload = mocker.patch(
262+
'connect_bi_reporter.scheduler.EaasScheduleTask.get_task_payload',
263+
)
248264
feed1 = feed_factory(
249265
schedule_id=report_schedule['id'],
250266
account_id=installation['owner']['id'],
@@ -274,6 +290,44 @@ def test_create_upload_schedule_task(
274290
),
275291
],
276292
)
293+
delay = SECONDS_DELAY
294+
new_delay = SECONDS_DELAY + SECONDS_BACKOFF_FACTOR
295+
p_get_task_payload.assert_has_calls(
296+
[
297+
call(
298+
trigger_type='onetime',
299+
trigger_data={
300+
'date': (_now + timedelta(seconds=delay)).isoformat(),
301+
},
302+
method_payload={
303+
'method': 'process_upload',
304+
'description': 'This task will download the report from'
305+
' connect and published it in the respective storage.',
306+
'parameter': {
307+
'installation_id': 'EIN-8436-7221-8308',
308+
'upload_id': f'ULF-{feed1.id.split("-", 1)[-1]}-000',
309+
},
310+
'name': 'Process Uploads - PA-000-000',
311+
},
312+
),
313+
call(
314+
trigger_type='onetime',
315+
trigger_data={
316+
'date': (_now + timedelta(seconds=new_delay)).isoformat(),
317+
},
318+
method_payload={
319+
'method': 'process_upload',
320+
'description': 'This task will download the report from'
321+
' connect and published it in the respective storage.',
322+
'parameter': {
323+
'installation_id': 'EIN-8436-7221-8308',
324+
'upload_id': f'ULF-{feed2.id.split("-", 1)[-1]}-000',
325+
},
326+
'name': 'Process Uploads - PA-000-000',
327+
},
328+
),
329+
],
330+
)
277331
for idx, zipped in enumerate(zip(uploads, [feed1, feed2])):
278332
upload, feed = zipped
279333
assert result.status == 'success'
@@ -298,6 +352,8 @@ def test_create_upload_schedule_task(
298352
f' created for Upload `{uploads[1].id}`: '
299353
f'Will process Report File `{report_file[1]["id"]}`'
300354
)
355+
assert delay == 120
356+
assert new_delay == 240
301357

302358

303359
def test_create_upload_schedule_task_no_feeds(

0 commit comments

Comments
 (0)