Skip to content

Commit a77c874

Browse files
author
ffaraoneim
authored
Merge pull request #5 from cloudblue/CP_1
Cumulative fixes
2 parents cd470ea + 50f560c commit a77c874

File tree

9 files changed

+73
-103
lines changed

9 files changed

+73
-103
lines changed

Dockerfile

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ RUN poetry build
1818

1919
RUN pip install dist/*.whl
2020

21-
WORKDIR /
22-
2321
RUN rm -rf /install_temp
22+
23+
COPY ./entrypoint.sh /entrypoint.sh
24+
RUN chmod 755 /entrypoint.sh
25+
26+
ENTRYPOINT [ "/entrypoint.sh" ]

connect/eaas/helpers.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,6 @@
1313
from connect.eaas.exceptions import EaaSError
1414

1515

16-
def install_extension(root_dir):
17-
result = subprocess.run(
18-
['poetry', 'install'],
19-
cwd=root_dir,
20-
stdin=subprocess.DEVNULL,
21-
start_new_session=True,
22-
)
23-
try:
24-
result.check_returncode()
25-
except subprocess.CalledProcessError:
26-
raise EaaSError(result.stderr.decode())
27-
28-
2916
def get_container_id():
3017
result = subprocess.run(
3118
['cat', '/proc/1/cpuset'],

connect/eaas/main.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import pathlib
1111
import signal
1212

13-
from connect.eaas.helpers import install_extension
1413
from connect.eaas.worker import Worker
1514

1615

@@ -42,18 +41,20 @@ def configure_logger(debug):
4241
'handlers': ['console'],
4342
'level': 'DEBUG' if debug else 'INFO',
4443
},
44+
'eaas.extension': {
45+
'handlers': ['console'],
46+
'level': 'DEBUG' if debug else 'INFO',
47+
},
4548
},
4649
},
4750
)
4851

4952

5053
def start(data):
5154
logger.info('Starting Connect EaaS runtime....')
52-
logger.info('Installing the extension package...')
53-
install_extension(data.extension_dir)
54-
logger.info('The extension has been installed, starting the control worker...')
5555
if data.unsecure:
5656
logger.warning('Websocket connections will be established using unsecure protocol (ws).')
57+
5758
worker = Worker(secure=not data.unsecure)
5859
loop = asyncio.get_event_loop()
5960
loop.add_signal_handler(

connect/eaas/manager.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@ class TasksManager:
4141
Once completed, it will push the result to a queue from which, the
4242
result_sender task will pick it and send the result to the backend.
4343
"""
44-
def __init__(self, worker, extension):
44+
def __init__(self, worker):
4545
self.worker = worker
46-
self.extension = extension
4746
self.background_executor = ThreadPoolExecutor()
4847
self.interactive_executor = ThreadPoolExecutor()
4948
self.run_event = asyncio.Event()
@@ -87,17 +86,22 @@ async def submit_task(self, data):
8786
object_id = data.object_id
8887
task_type = data.task_type
8988
method_name = TASK_TYPE_EXT_METHOD_MAP[task_type]
90-
method = getattr(self.extension, method_name)
89+
extension = self.worker.get_extension()
90+
method = getattr(extension, method_name)
9191
logger.debug(f'invoke {method_name}')
9292
self.running_tasks += 1
93-
try:
94-
request = await self.get_request(object_id, task_type)
95-
except ClientError as e:
96-
logger.warning(f'Cannot retrieve object {data.object_id} for task {data.task_id}')
97-
future = Future()
98-
future.set_exception(e)
99-
asyncio.create_task(self.enqueue_result(data, future))
100-
return
93+
request = None
94+
if data.task_category == TaskCategory.BACKGROUND:
95+
try:
96+
request = await self.get_request(object_id, task_type)
97+
except ClientError as e:
98+
logger.warning(f'Cannot retrieve object {data.object_id} for task {data.task_id}')
99+
future = Future()
100+
future.set_exception(e)
101+
asyncio.create_task(self.enqueue_result(data, future))
102+
return
103+
else:
104+
request = data.data
101105

102106
if inspect.iscoroutinefunction(method):
103107
future = asyncio.create_task(method(request))

connect/eaas/worker.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,7 @@ async def stop_tasks_manager(self):
118118

119119
def start_tasks_manager(self):
120120
logger.info('Starting tasks worker...')
121-
extension = self.extension_class(
122-
self.get_client(),
123-
self.get_extension_logger(self.logging_api_key),
124-
self.extension_config,
125-
)
126-
self.tasks_manager = TasksManager(self, extension)
121+
self.tasks_manager = TasksManager(self)
127122
self.tasks_manager.start()
128123
logger.info('Task worker started')
129124

@@ -134,6 +129,13 @@ def ensure_tasks_manager_running(self):
134129
def get_url(self):
135130
return f'{self.base_ws_url}/{self.environment_id}/{self.instance_id}'
136131

132+
def get_extension(self):
133+
return self.extension_class(
134+
self.get_client(),
135+
self.get_extension_logger(self.logging_api_key),
136+
self.extension_config,
137+
)
138+
137139
async def run(self): # noqa: CCR001
138140
"""
139141
Main loop for the websocket connection.
@@ -195,12 +197,8 @@ async def configuration(self, data):
195197
It will stop the tasks manager so the extension can be
196198
reconfigured, then restart the tasks manager.
197199
"""
198-
self.paused = True
199-
await self.stop_tasks_manager()
200200
self.extension_config = data.configuration
201201
self.logging_api_key = data.logging_api_key
202-
self.paused = False
203-
self.start_tasks_manager()
204202

205203
async def pause(self):
206204
"""

entrypoint.sh

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/bin/bash
2+
set -e
3+
4+
EXTENSION_DIR=${EXTENSION_DIR:-'/extension'}
5+
6+
DIST_DIR=$EXTENSION_DIR/dist
7+
8+
echo "Installing the extension package from $EXTENSION_DIR..."
9+
10+
pushd .
11+
12+
cd $EXTENSION_DIR
13+
14+
if [ -d "$DIST_DIR" ]; then rm -Rf $DIST_DIR; fi
15+
16+
poetry build
17+
18+
pip install -U pip && pip install $DIST_DIR/*.whl
19+
20+
popd
21+
22+
echo "Extension installed."
23+
24+
exec "$@"

tests/test_helpers.py

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,9 @@
1515
get_environment,
1616
get_extension_class,
1717
get_extension_type,
18-
install_extension,
1918
)
2019

2120

22-
def test_install_extension(mocker):
23-
result = mocker.MagicMock()
24-
result.returnvalue = 0
25-
mocked = mocker.patch('connect.eaas.helpers.subprocess.run', return_value=result)
26-
install_extension('/root_dir')
27-
28-
assert mocked.mock_calls[0].args[0] == ['poetry', 'install']
29-
assert mocked.mock_calls[0].kwargs['cwd'] == '/root_dir'
30-
31-
32-
def test_install_extension_ko(mocker):
33-
result = mocker.MagicMock()
34-
result.check_returncode = mocker.MagicMock(
35-
side_effect=subprocess.CalledProcessError(128, cmd=[]),
36-
)
37-
result.stderr = 'error message'.encode('utf-8')
38-
mocker.patch('connect.eaas.helpers.subprocess.run', return_value=result)
39-
with pytest.raises(EaaSError) as cv:
40-
install_extension('/root_dir')
41-
42-
assert str(cv.value) == 'error message'
43-
44-
4521
def test_get_container_id(mocker):
4622
result = mocker.MagicMock()
4723
result.returnvalue = 0

tests/test_main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def process_asset_purchase_request(self, request):
3131
def validate_asset_purchase_request(self, request):
3232
pass
3333
run_mock = mocker.AsyncMock()
34-
mocker.patch('connect.eaas.main.install_extension')
34+
3535
mocker.patch('connect.eaas.worker.get_extension_class', return_value=MyExtension)
3636
mocker.patch.object(Worker, 'run', run_mock)
3737
parsed_args = namedtuple('_Args', ('unsecure', 'extension_dir'))

tests/test_manager.py

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
@pytest.mark.asyncio
2424
async def test_start_stop_is_running(mocker, caplog):
25-
manager = TasksManager(mocker.MagicMock(), mocker.MagicMock())
25+
manager = TasksManager(mocker.MagicMock())
2626
manager.start()
2727
assert manager.run_event.is_set() is True
2828
assert manager.is_running() is True
@@ -43,9 +43,10 @@ async def test_background_task_sync(mocker, extension_cls, task_type):
4343
extension = extension_class(None, None, None)
4444

4545
worker = mocker.MagicMock()
46+
worker.get_extension.return_value = extension
4647
worker.send = mocker.AsyncMock()
4748

48-
manager = TasksManager(worker, extension)
49+
manager = TasksManager(worker)
4950
manager.get_request = mocker.AsyncMock(return_value={'id': 'PR-000'})
5051

5152
manager.start()
@@ -77,9 +78,10 @@ async def test_background_task_async(mocker, extension_cls, task_type):
7778
extension = extension_class(None, None, None)
7879

7980
worker = mocker.MagicMock()
81+
worker.get_extension.return_value = extension
8082
worker.send = mocker.AsyncMock()
8183

82-
manager = TasksManager(worker, extension)
84+
manager = TasksManager(worker)
8385
manager.get_request = mocker.AsyncMock(return_value={'id': 'PR-000'})
8486

8587
manager.start()
@@ -111,9 +113,10 @@ async def test_interactive_task_sync(mocker, extension_cls, task_type):
111113
extension = extension_class(None, None, None)
112114

113115
worker = mocker.MagicMock()
116+
worker.get_extension.return_value = extension
114117
worker.send = mocker.AsyncMock()
115118

116-
manager = TasksManager(worker, extension)
119+
manager = TasksManager(worker)
117120
manager.get_request = mocker.AsyncMock(return_value={'id': 'PR-000'})
118121

119122
manager.start()
@@ -145,9 +148,10 @@ async def test_interactive_task_async(mocker, extension_cls, task_type):
145148
extension = extension_class(None, None, None)
146149

147150
worker = mocker.MagicMock()
151+
worker.get_extension.return_value = extension
148152
worker.send = mocker.AsyncMock()
149153

150-
manager = TasksManager(worker, extension)
154+
manager = TasksManager(worker)
151155
manager.get_request = mocker.AsyncMock(return_value={'id': 'PR-000'})
152156

153157
manager.start()
@@ -175,9 +179,10 @@ async def test_background_task_request_error(mocker, extension_cls):
175179
extension = extension_class(None, None, None)
176180

177181
worker = mocker.MagicMock()
182+
worker.get_extension.return_value = extension
178183
worker.send = mocker.AsyncMock()
179184

180-
manager = TasksManager(worker, extension)
185+
manager = TasksManager(worker)
181186
manager.get_request = mocker.AsyncMock(side_effect=ClientError('Request not found', 404))
182187

183188
manager.start()
@@ -199,45 +204,16 @@ async def test_background_task_request_error(mocker, extension_cls):
199204
worker.send.assert_awaited_once_with(json_msg)
200205

201206

202-
@pytest.mark.asyncio
203-
async def test_interactive_task_request_error(mocker, extension_cls):
204-
extension_class = extension_cls('validate_asset_purchase_request')
205-
extension = extension_class(None, None, None)
206-
207-
worker = mocker.MagicMock()
208-
worker.send = mocker.AsyncMock()
209-
210-
manager = TasksManager(worker, extension)
211-
manager.get_request = mocker.AsyncMock(side_effect=ClientError('Request not found', 404))
212-
213-
manager.start()
214-
215-
task = TaskPayload(
216-
'TQ-000',
217-
TaskCategory.INTERACTIVE,
218-
TaskType.ASSET_PURCHASE_REQUEST_VALIDATION,
219-
'PR-000',
220-
)
221-
222-
await manager.submit_task(task)
223-
await asyncio.sleep(.1)
224-
await manager.stop()
225-
message = Message(message_type=MessageType.TASK, data=task)
226-
json_msg = message.to_json()
227-
json_msg['data']['result'] = 'failed'
228-
json_msg['data']['failure_output'] = 'Request not found'
229-
worker.send.assert_awaited_once_with(json_msg)
230-
231-
232207
@pytest.mark.asyncio
233208
async def test_result_sender_retries(mocker, extension_cls):
234209
extension_class = extension_cls('process_asset_purchase_request')
235210
extension = extension_class(None, None, None)
236211

237212
worker = mocker.MagicMock()
213+
worker.get_extension.return_value = extension
238214
worker.send = mocker.AsyncMock(side_effect=[Exception('retry'), None])
239215

240-
manager = TasksManager(worker, extension)
216+
manager = TasksManager(worker)
241217
manager.get_request = mocker.AsyncMock(return_value={'id': 'PR-000'})
242218

243219
manager.start()
@@ -264,11 +240,12 @@ async def test_result_sender_max_retries_exceeded(mocker, extension_cls, caplog)
264240
extension = extension_class(None, None, None)
265241

266242
worker = mocker.MagicMock()
243+
worker.get_extension.return_value = extension
267244
worker.send = mocker.AsyncMock(
268245
side_effect=[Exception('retry') for _ in range(RESULT_SENDER_MAX_RETRIES)],
269246
)
270247

271-
manager = TasksManager(worker, extension)
248+
manager = TasksManager(worker)
272249
manager.get_request = mocker.AsyncMock(return_value={'id': 'PR-000'})
273250

274251
manager.start()

0 commit comments

Comments
 (0)