Skip to content

Commit 50f560c

Browse files
author
Francesco Faraone
committed
Install extension from docker entrypoint.
Create new extension instance for each task so the instance will be thread safe. For interactive tasks, call handler with task data instead of retrieving the object through API.
1 parent cd470ea commit 50f560c

File tree

9 files changed

+73
-103
lines changed

9 files changed

+73
-103
lines changed

Dockerfile

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ RUN poetry build
1818

1919
RUN pip install dist/*.whl
2020

21-
WORKDIR /
22-
2321
RUN rm -rf /install_temp
22+
23+
COPY ./entrypoint.sh /entrypoint.sh
24+
RUN chmod 755 /entrypoint.sh
25+
26+
ENTRYPOINT [ "/entrypoint.sh" ]

connect/eaas/helpers.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,6 @@
1313
from connect.eaas.exceptions import EaaSError
1414

1515

16-
def install_extension(root_dir):
17-
result = subprocess.run(
18-
['poetry', 'install'],
19-
cwd=root_dir,
20-
stdin=subprocess.DEVNULL,
21-
start_new_session=True,
22-
)
23-
try:
24-
result.check_returncode()
25-
except subprocess.CalledProcessError:
26-
raise EaaSError(result.stderr.decode())
27-
28-
2916
def get_container_id():
3017
result = subprocess.run(
3118
['cat', '/proc/1/cpuset'],

connect/eaas/main.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import pathlib
1111
import signal
1212

13-
from connect.eaas.helpers import install_extension
1413
from connect.eaas.worker import Worker
1514

1615

@@ -42,18 +41,20 @@ def configure_logger(debug):
4241
'handlers': ['console'],
4342
'level': 'DEBUG' if debug else 'INFO',
4443
},
44+
'eaas.extension': {
45+
'handlers': ['console'],
46+
'level': 'DEBUG' if debug else 'INFO',
47+
},
4548
},
4649
},
4750
)
4851

4952

5053
def start(data):
5154
logger.info('Starting Connect EaaS runtime....')
52-
logger.info('Installing the extension package...')
53-
install_extension(data.extension_dir)
54-
logger.info('The extension has been installed, starting the control worker...')
5555
if data.unsecure:
5656
logger.warning('Websocket connections will be established using unsecure protocol (ws).')
57+
5758
worker = Worker(secure=not data.unsecure)
5859
loop = asyncio.get_event_loop()
5960
loop.add_signal_handler(

connect/eaas/manager.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@ class TasksManager:
4141
Once completed, it will push the result to a queue from which, the
4242
result_sender task will pick it and send the result to the backend.
4343
"""
44-
def __init__(self, worker, extension):
44+
def __init__(self, worker):
4545
self.worker = worker
46-
self.extension = extension
4746
self.background_executor = ThreadPoolExecutor()
4847
self.interactive_executor = ThreadPoolExecutor()
4948
self.run_event = asyncio.Event()
@@ -87,17 +86,22 @@ async def submit_task(self, data):
8786
object_id = data.object_id
8887
task_type = data.task_type
8988
method_name = TASK_TYPE_EXT_METHOD_MAP[task_type]
90-
method = getattr(self.extension, method_name)
89+
extension = self.worker.get_extension()
90+
method = getattr(extension, method_name)
9191
logger.debug(f'invoke {method_name}')
9292
self.running_tasks += 1
93-
try:
94-
request = await self.get_request(object_id, task_type)
95-
except ClientError as e:
96-
logger.warning(f'Cannot retrieve object {data.object_id} for task {data.task_id}')
97-
future = Future()
98-
future.set_exception(e)
99-
asyncio.create_task(self.enqueue_result(data, future))
100-
return
93+
request = None
94+
if data.task_category == TaskCategory.BACKGROUND:
95+
try:
96+
request = await self.get_request(object_id, task_type)
97+
except ClientError as e:
98+
logger.warning(f'Cannot retrieve object {data.object_id} for task {data.task_id}')
99+
future = Future()
100+
future.set_exception(e)
101+
asyncio.create_task(self.enqueue_result(data, future))
102+
return
103+
else:
104+
request = data.data
101105

102106
if inspect.iscoroutinefunction(method):
103107
future = asyncio.create_task(method(request))

connect/eaas/worker.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,7 @@ async def stop_tasks_manager(self):
118118

119119
def start_tasks_manager(self):
120120
logger.info('Starting tasks worker...')
121-
extension = self.extension_class(
122-
self.get_client(),
123-
self.get_extension_logger(self.logging_api_key),
124-
self.extension_config,
125-
)
126-
self.tasks_manager = TasksManager(self, extension)
121+
self.tasks_manager = TasksManager(self)
127122
self.tasks_manager.start()
128123
logger.info('Task worker started')
129124

@@ -134,6 +129,13 @@ def ensure_tasks_manager_running(self):
134129
def get_url(self):
135130
return f'{self.base_ws_url}/{self.environment_id}/{self.instance_id}'
136131

132+
def get_extension(self):
133+
return self.extension_class(
134+
self.get_client(),
135+
self.get_extension_logger(self.logging_api_key),
136+
self.extension_config,
137+
)
138+
137139
async def run(self): # noqa: CCR001
138140
"""
139141
Main loop for the websocket connection.
@@ -195,12 +197,8 @@ async def configuration(self, data):
195197
It will stop the tasks manager so the extension can be
196198
reconfigured, then restart the tasks manager.
197199
"""
198-
self.paused = True
199-
await self.stop_tasks_manager()
200200
self.extension_config = data.configuration
201201
self.logging_api_key = data.logging_api_key
202-
self.paused = False
203-
self.start_tasks_manager()
204202

205203
async def pause(self):
206204
"""

entrypoint.sh

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/bin/bash
2+
set -e
3+
4+
EXTENSION_DIR=${EXTENSION_DIR:-'/extension'}
5+
6+
DIST_DIR=$EXTENSION_DIR/dist
7+
8+
echo "Installing the extension package from $EXTENSION_DIR..."
9+
10+
pushd .
11+
12+
cd $EXTENSION_DIR
13+
14+
if [ -d "$DIST_DIR" ]; then rm -Rf $DIST_DIR; fi
15+
16+
poetry build
17+
18+
pip install -U pip && pip install $DIST_DIR/*.whl
19+
20+
popd
21+
22+
echo "Extension installed."
23+
24+
exec "$@"

tests/test_helpers.py

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,9 @@
1515
get_environment,
1616
get_extension_class,
1717
get_extension_type,
18-
install_extension,
1918
)
2019

2120

22-
def test_install_extension(mocker):
23-
result = mocker.MagicMock()
24-
result.returnvalue = 0
25-
mocked = mocker.patch('connect.eaas.helpers.subprocess.run', return_value=result)
26-
install_extension('/root_dir')
27-
28-
assert mocked.mock_calls[0].args[0] == ['poetry', 'install']
29-
assert mocked.mock_calls[0].kwargs['cwd'] == '/root_dir'
30-
31-
32-
def test_install_extension_ko(mocker):
33-
result = mocker.MagicMock()
34-
result.check_returncode = mocker.MagicMock(
35-
side_effect=subprocess.CalledProcessError(128, cmd=[]),
36-
)
37-
result.stderr = 'error message'.encode('utf-8')
38-
mocker.patch('connect.eaas.helpers.subprocess.run', return_value=result)
39-
with pytest.raises(EaaSError) as cv:
40-
install_extension('/root_dir')
41-
42-
assert str(cv.value) == 'error message'
43-
44-
4521
def test_get_container_id(mocker):
4622
result = mocker.MagicMock()
4723
result.returnvalue = 0

tests/test_main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def process_asset_purchase_request(self, request):
3131
def validate_asset_purchase_request(self, request):
3232
pass
3333
run_mock = mocker.AsyncMock()
34-
mocker.patch('connect.eaas.main.install_extension')
34+
3535
mocker.patch('connect.eaas.worker.get_extension_class', return_value=MyExtension)
3636
mocker.patch.object(Worker, 'run', run_mock)
3737
parsed_args = namedtuple('_Args', ('unsecure', 'extension_dir'))

tests/test_manager.py

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
@pytest.mark.asyncio
2424
async def test_start_stop_is_running(mocker, caplog):
25-
manager = TasksManager(mocker.MagicMock(), mocker.MagicMock())
25+
manager = TasksManager(mocker.MagicMock())
2626
manager.start()
2727
assert manager.run_event.is_set() is True
2828
assert manager.is_running() is True
@@ -43,9 +43,10 @@ async def test_background_task_sync(mocker, extension_cls, task_type):
4343
extension = extension_class(None, None, None)
4444

4545
worker = mocker.MagicMock()
46+
worker.get_extension.return_value = extension
4647
worker.send = mocker.AsyncMock()
4748

48-
manager = TasksManager(worker, extension)
49+
manager = TasksManager(worker)
4950
manager.get_request = mocker.AsyncMock(return_value={'id': 'PR-000'})
5051

5152
manager.start()
@@ -77,9 +78,10 @@ async def test_background_task_async(mocker, extension_cls, task_type):
7778
extension = extension_class(None, None, None)
7879

7980
worker = mocker.MagicMock()
81+
worker.get_extension.return_value = extension
8082
worker.send = mocker.AsyncMock()
8183

82-
manager = TasksManager(worker, extension)
84+
manager = TasksManager(worker)
8385
manager.get_request = mocker.AsyncMock(return_value={'id': 'PR-000'})
8486

8587
manager.start()
@@ -111,9 +113,10 @@ async def test_interactive_task_sync(mocker, extension_cls, task_type):
111113
extension = extension_class(None, None, None)
112114

113115
worker = mocker.MagicMock()
116+
worker.get_extension.return_value = extension
114117
worker.send = mocker.AsyncMock()
115118

116-
manager = TasksManager(worker, extension)
119+
manager = TasksManager(worker)
117120
manager.get_request = mocker.AsyncMock(return_value={'id': 'PR-000'})
118121

119122
manager.start()
@@ -145,9 +148,10 @@ async def test_interactive_task_async(mocker, extension_cls, task_type):
145148
extension = extension_class(None, None, None)
146149

147150
worker = mocker.MagicMock()
151+
worker.get_extension.return_value = extension
148152
worker.send = mocker.AsyncMock()
149153

150-
manager = TasksManager(worker, extension)
154+
manager = TasksManager(worker)
151155
manager.get_request = mocker.AsyncMock(return_value={'id': 'PR-000'})
152156

153157
manager.start()
@@ -175,9 +179,10 @@ async def test_background_task_request_error(mocker, extension_cls):
175179
extension = extension_class(None, None, None)
176180

177181
worker = mocker.MagicMock()
182+
worker.get_extension.return_value = extension
178183
worker.send = mocker.AsyncMock()
179184

180-
manager = TasksManager(worker, extension)
185+
manager = TasksManager(worker)
181186
manager.get_request = mocker.AsyncMock(side_effect=ClientError('Request not found', 404))
182187

183188
manager.start()
@@ -199,45 +204,16 @@ async def test_background_task_request_error(mocker, extension_cls):
199204
worker.send.assert_awaited_once_with(json_msg)
200205

201206

202-
@pytest.mark.asyncio
203-
async def test_interactive_task_request_error(mocker, extension_cls):
204-
extension_class = extension_cls('validate_asset_purchase_request')
205-
extension = extension_class(None, None, None)
206-
207-
worker = mocker.MagicMock()
208-
worker.send = mocker.AsyncMock()
209-
210-
manager = TasksManager(worker, extension)
211-
manager.get_request = mocker.AsyncMock(side_effect=ClientError('Request not found', 404))
212-
213-
manager.start()
214-
215-
task = TaskPayload(
216-
'TQ-000',
217-
TaskCategory.INTERACTIVE,
218-
TaskType.ASSET_PURCHASE_REQUEST_VALIDATION,
219-
'PR-000',
220-
)
221-
222-
await manager.submit_task(task)
223-
await asyncio.sleep(.1)
224-
await manager.stop()
225-
message = Message(message_type=MessageType.TASK, data=task)
226-
json_msg = message.to_json()
227-
json_msg['data']['result'] = 'failed'
228-
json_msg['data']['failure_output'] = 'Request not found'
229-
worker.send.assert_awaited_once_with(json_msg)
230-
231-
232207
@pytest.mark.asyncio
233208
async def test_result_sender_retries(mocker, extension_cls):
234209
extension_class = extension_cls('process_asset_purchase_request')
235210
extension = extension_class(None, None, None)
236211

237212
worker = mocker.MagicMock()
213+
worker.get_extension.return_value = extension
238214
worker.send = mocker.AsyncMock(side_effect=[Exception('retry'), None])
239215

240-
manager = TasksManager(worker, extension)
216+
manager = TasksManager(worker)
241217
manager.get_request = mocker.AsyncMock(return_value={'id': 'PR-000'})
242218

243219
manager.start()
@@ -264,11 +240,12 @@ async def test_result_sender_max_retries_exceeded(mocker, extension_cls, caplog)
264240
extension = extension_class(None, None, None)
265241

266242
worker = mocker.MagicMock()
243+
worker.get_extension.return_value = extension
267244
worker.send = mocker.AsyncMock(
268245
side_effect=[Exception('retry') for _ in range(RESULT_SENDER_MAX_RETRIES)],
269246
)
270247

271-
manager = TasksManager(worker, extension)
248+
manager = TasksManager(worker)
272249
manager.get_request = mocker.AsyncMock(return_value={'id': 'PR-000'})
273250

274251
manager.start()

0 commit comments

Comments
 (0)