Skip to content

Commit b49f088

Browse files
authored
Merge pull request #144 from cloudblue/lite-26666_add_transformation_execution
LITE-26666 Add execution of transformations to extension runner
2 parents 0c47d15 + 9eeabaf commit b49f088

File tree

20 files changed

+1575
-123
lines changed

20 files changed

+1575
-123
lines changed

connect/eaas/runner/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ def get_tfnapp_ws_url(self):
130130
def get_api_url(self):
131131
return f'https://{self.env["api_address"]}/public/v1'
132132

133+
def get_api_address(self):
134+
return f'https://{self.env["api_address"]}'
135+
133136
def get_user_agent(self):
134137
version = get_version()
135138
pimpl = platform.python_implementation()

connect/eaas/runner/constants.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,13 @@
9898
BACKGROUND_TASK_MAX_EXECUTION_TIME = 300
9999
INTERACTIVE_TASK_MAX_EXECUTION_TIME = 120
100100
SCHEDULED_TASK_MAX_EXECUTION_TIME = 60 * 60 * 12
101+
TRANSFORMATION_TASK_MAX_EXECUTION_TIME = 300
101102
RESULT_SENDER_MAX_RETRIES = 5
102103
RESULT_SENDER_WAIT_GRACE_SECONDS = 90
104+
TRANSFORMATION_TASK_MAX_PARALLEL_LINES = 20
105+
DOWNLOAD_CHUNK_SIZE = 1024
106+
UPLOAD_CHUNK_SIZE = 65535
107+
TRANSFORMATION_WRITE_QUEUE_TIMEOUT = 600
103108

104109
MAX_RETRY_TIME_GENERIC_SECONDS = 15 * 60
105110
MAX_RETRY_TIME_MAINTENANCE_SECONDS = 3 * 60 * 60

connect/eaas/runner/handlers/base.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
import importlib
22
import inspect
33
import logging
4+
import os
45
import sys
56
from abc import (
67
ABC,
78
abstractmethod,
89
)
910

11+
from connect.client import (
12+
AsyncConnectClient,
13+
ConnectClient,
14+
)
15+
1016
from connect.eaas.core.logging import (
1117
ExtensionLogHandler,
18+
RequestLogger,
1219
)
1320
from connect.eaas.runner.helpers import (
1421
iter_entry_points,
@@ -115,3 +122,37 @@ def get_application(self):
115122
@abstractmethod
116123
def get_features(self):
117124
raise NotImplementedError()
125+
126+
def _create_client(self, event_type, task_id, method_name, api_key, connect_correlation_id):
127+
"""
128+
Get an instance of Connect Openapi Client. Returns an instance of the AsyncConnectClient
129+
or the ConnectClient depending on method type.
130+
"""
131+
method = getattr(self.get_application(), method_name)
132+
133+
Client = ConnectClient if not inspect.iscoroutinefunction(method) else AsyncConnectClient
134+
135+
default_headers = {
136+
'EAAS_EXT_ID': self._config.service_id,
137+
'EAAS_TASK_ID': task_id,
138+
'EAAS_TASK_TYPE': event_type,
139+
}
140+
141+
default_headers.update(self._config.get_user_agent())
142+
143+
if connect_correlation_id:
144+
operation_id = connect_correlation_id[3:34]
145+
span_id = os.urandom(8).hex()
146+
correlation_id = f'00-{operation_id}-{span_id}-01'
147+
default_headers['ext-traceparent'] = correlation_id
148+
149+
return Client(
150+
api_key,
151+
endpoint=self._config.get_api_url(),
152+
use_specs=False,
153+
max_retries=3,
154+
logger=RequestLogger(
155+
self.get_logger(extra={'task_id': task_id}),
156+
),
157+
default_headers=default_headers,
158+
)

connect/eaas/runner/handlers/events.py

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,6 @@
11
import inspect
22
import logging
3-
import os
43

5-
from connect.client import (
6-
AsyncConnectClient,
7-
ConnectClient,
8-
)
9-
10-
from connect.eaas.core.logging import (
11-
RequestLogger,
12-
)
134
from connect.eaas.core.models import (
145
Context,
156
)
@@ -95,40 +86,6 @@ def get_method(
9586

9687
return getattr(ext, method_name, None)
9788

98-
def _create_client(self, event_type, task_id, method_name, api_key, connect_correlation_id):
99-
"""
100-
Get an instance of the Connect Openapi Client. If the extension is asyncrhonous
101-
it returns an instance of the AsyncConnectClient otherwise the ConnectClient.
102-
"""
103-
method = getattr(self.get_application(), method_name)
104-
105-
Client = ConnectClient if not inspect.iscoroutinefunction(method) else AsyncConnectClient
106-
107-
default_headers = {
108-
'EAAS_EXT_ID': self._config.service_id,
109-
'EAAS_TASK_ID': task_id,
110-
'EAAS_TASK_TYPE': event_type,
111-
}
112-
113-
default_headers.update(self._config.get_user_agent())
114-
115-
if connect_correlation_id:
116-
operation_id = connect_correlation_id[3:34]
117-
span_id = os.urandom(8).hex()
118-
correlation_id = f'00-{operation_id}-{span_id}-01'
119-
default_headers['ext-traceparent'] = correlation_id
120-
121-
return Client(
122-
api_key,
123-
endpoint=self._config.get_api_url(),
124-
use_specs=False,
125-
max_retries=3,
126-
logger=RequestLogger(
127-
self.get_logger(extra={'task_id': task_id}),
128-
),
129-
default_headers=default_headers,
130-
)
131-
13289
def get_events(self):
13390
if 'capabilities' in self.get_descriptor():
13491
logger.warning(

connect/eaas/runner/handlers/transformations.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33
#
44
# Copyright (c) 2022 Ingram Micro. All Rights Reserved.
55
#
6+
import inspect
67
import logging
78

9+
from connect.eaas.core.models import (
10+
Context,
11+
)
812
from connect.eaas.runner.config import (
913
ConfigHelper,
1014
)
@@ -20,8 +24,11 @@ class TfnApp(ApplicationHandlerBase):
2024
"""
2125
Handle the lifecycle of a Transformation extension.
2226
"""
27+
LOGGER_NAME = 'eaas.tfnapp'
28+
2329
def __init__(self, config: ConfigHelper):
2430
super().__init__(config)
31+
self._config = config
2532
self._transformations = None
2633

2734
def get_application(self):
@@ -35,3 +42,50 @@ def get_features(self):
3542
return {
3643
'transformations': self.transformations,
3744
}
45+
46+
def get_method(
47+
self,
48+
event_type,
49+
task_id,
50+
method_name,
51+
transformation_request=None,
52+
installation=None,
53+
api_key=None,
54+
connect_correlation_id=None,
55+
):
56+
if not method_name: # pragma: no cover
57+
return
58+
59+
args = (
60+
self._create_client(
61+
event_type, task_id, method_name, self._config.api_key, connect_correlation_id,
62+
),
63+
self.get_logger(extra={'task_id': task_id}),
64+
self._config.variables,
65+
)
66+
67+
kwargs = {}
68+
if installation:
69+
kwargs['installation'] = installation
70+
kwargs['installation_client'] = self._create_client(
71+
event_type,
72+
task_id,
73+
method_name,
74+
api_key,
75+
connect_correlation_id,
76+
)
77+
if transformation_request:
78+
kwargs['transformation_request'] = transformation_request
79+
80+
app_class = self.get_application()
81+
82+
if 'context' in inspect.signature(app_class.__init__).parameters:
83+
kwargs['context'] = Context(
84+
extension_id=self.config.service_id,
85+
environment_id=self.config.environment_id,
86+
environment_type=self.config.environment_type,
87+
)
88+
89+
ext = app_class(*args, **kwargs)
90+
91+
return getattr(ext, method_name, None)

connect/eaas/runner/helpers.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
ORDINAL_SUFFIX,
5353
PYPI_EXTENSION_RUNNER_URL,
5454
SCHEDULED_TASK_MAX_EXECUTION_TIME,
55+
TRANSFORMATION_TASK_MAX_EXECUTION_TIME,
56+
TRANSFORMATION_WRITE_QUEUE_TIMEOUT,
5557
)
5658

5759

@@ -110,6 +112,12 @@ def get_environment():
110112
'scheduled_task_max_execution_time': int(os.getenv(
111113
'SCHEDULED_TASK_MAX_EXECUTION_TIME', SCHEDULED_TASK_MAX_EXECUTION_TIME,
112114
)),
115+
'transformation_task_max_execution_time': int(os.getenv(
116+
'TRANSFORMATION_TASK_MAX_EXECUTION_TIME', TRANSFORMATION_TASK_MAX_EXECUTION_TIME,
117+
)),
118+
'transformation_write_queue_timeout': int(os.getenv(
119+
'TRANSFORMATION_WRITE_QUEUE_TIMEOUT', TRANSFORMATION_WRITE_QUEUE_TIMEOUT,
120+
)),
113121
}
114122

115123

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
from connect.eaas.runner.managers.background import BackgroundTasksManager # noqa
22
from connect.eaas.runner.managers.interactive import InteractiveTasksManager # noqa
33
from connect.eaas.runner.managers.scheduled import ScheduledTasksManager # noqa
4+
from connect.eaas.runner.managers.transformation import TransformationTasksManager # noqa

connect/eaas/runner/managers/background.py

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,11 @@
77
import logging
88
import time
99
import traceback
10-
from string import (
11-
Template,
12-
)
1310

1411
from connect.client import (
1512
AsyncConnectClient,
1613
)
1714
from connect.client.models import (
18-
AsyncCollection,
1915
AsyncResource,
2016
)
2117

@@ -27,7 +23,7 @@
2723
TaskOutput,
2824
)
2925
from connect.eaas.core.responses import (
30-
ProcessingResponse,
26+
BackgroundResponse,
3127
)
3228
from connect.eaas.runner.managers.base import (
3329
TasksManagerBase,
@@ -42,6 +38,11 @@ class BackgroundTasksManager(TasksManagerBase):
4238
def get_method_name(self, task_data, argument):
4339
return self.handler.events[task_data.input.event_type]['method']
4440

41+
def send_skip_response(self, data, output):
42+
future = asyncio.Future()
43+
future.set_result(BackgroundResponse.skip(output))
44+
asyncio.create_task(self.enqueue_result(data, future))
45+
4546
async def get_argument(self, task_data):
4647
"""
4748
Get the request object through the Connect public API
@@ -56,28 +57,14 @@ async def get_argument(self, task_data):
5657
default_headers=self.config.get_user_agent(),
5758
)
5859

59-
definition = self.config.event_definitions[task_data.input.event_type]
60-
supported_statuses = self.handler.events[task_data.input.event_type]['statuses']
61-
rql_filter = Template(definition.api_collection_filter).substitute(
62-
{
63-
'_statuses_': f'({",".join(supported_statuses)})',
64-
'_object_id_': task_data.input.object_id,
65-
},
60+
object_exists = await self.filter_collection_by_event_definition(
61+
client,
62+
task_data,
6663
)
67-
68-
collection = AsyncCollection(client, definition.api_collection_endpoint)
69-
if await collection.filter(rql_filter).count() == 0:
70-
logger.info(
71-
f'Send skip response for {task_data.options.task_id} since '
72-
'the current request status is not supported.',
73-
)
74-
self.send_skip_response(
75-
task_data,
76-
'The request status does not match the '
77-
f'supported statuses: {",".join(supported_statuses)}.',
78-
)
64+
if not object_exists:
7965
return
8066

67+
definition = self.config.event_definitions[task_data.input.event_type]
8168
url = definition.api_resource_endpoint.format(pk=task_data.input.object_id)
8269
resource = AsyncResource(client, url)
8370

@@ -112,8 +99,3 @@ async def build_response(self, task_data, future):
11299
result_message.output.message = traceback.format_exc()[:4000]
113100

114101
return result_message
115-
116-
def send_skip_response(self, data, output):
117-
future = asyncio.Future()
118-
future.set_result(ProcessingResponse.skip(output))
119-
asyncio.create_task(self.enqueue_result(data, future))

0 commit comments

Comments
 (0)