Skip to content

Commit 298dc68

Browse files
author
Francesco Faraone
authored
Merge pull request #71 from cloudblue/support_ui_extensions
LITE-23831, LITE-23832 support ui extensions.
2 parents 94bb115 + 27dccf3 commit 298dc68

File tree

16 files changed

+1468
-351
lines changed

16 files changed

+1468
-351
lines changed

connect/eaas/runner/base.py

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
#
2+
# This file is part of the Ingram Micro CloudBlue Connect EaaS Extension Runner.
3+
#
4+
# Copyright (c) 2021 Ingram Micro. All Rights Reserved.
5+
#
6+
import asyncio
7+
import json
8+
import logging
9+
import time
10+
from abc import ABC, abstractmethod
11+
from asyncio.exceptions import TimeoutError
12+
13+
import backoff
14+
import websockets
15+
from websockets.exceptions import (
16+
ConnectionClosedError,
17+
ConnectionClosedOK,
18+
InvalidStatusCode,
19+
)
20+
21+
from connect.eaas.core.proto import (
22+
Message,
23+
MessageType,
24+
)
25+
from connect.eaas.runner.constants import (
26+
DELAY_ON_CONNECT_EXCEPTION_SECONDS,
27+
MAX_RETRY_DELAY_TIME_SECONDS,
28+
MAX_RETRY_TIME_GENERIC_SECONDS,
29+
MAX_RETRY_TIME_MAINTENANCE_SECONDS,
30+
)
31+
from connect.eaas.runner.exceptions import (
32+
CommunicationError,
33+
MaintenanceError,
34+
StopBackoffError,
35+
)
36+
from connect.eaas.runner.helpers import to_ordinal
37+
38+
39+
logger = logging.getLogger(__name__)
40+
41+
42+
def _get_max_retry_time_maintenance():
43+
return MAX_RETRY_TIME_MAINTENANCE_SECONDS
44+
45+
46+
def _get_max_retry_time_generic():
47+
return MAX_RETRY_TIME_GENERIC_SECONDS
48+
49+
50+
def _get_max_retry_delay_time():
51+
return MAX_RETRY_DELAY_TIME_SECONDS
52+
53+
54+
class WorkerBase(ABC):
55+
"""
56+
The Worker is responsible to handle the websocket connection
57+
with the server. It will send the extension capabilities to
58+
the server and wait for tasks that need to be processed using
59+
the tasks manager.
60+
"""
61+
def __init__(self, config):
62+
self.config = config
63+
self.lock = asyncio.Lock()
64+
self.run_event = asyncio.Event()
65+
self.stop_event = asyncio.Event()
66+
self.ws = None
67+
self.main_task = None
68+
self.results_task = None
69+
70+
async def ensure_connection(self): # noqa: CCR001
71+
"""
72+
Ensure that a websocket connection is established.
73+
"""
74+
@backoff.on_exception(
75+
backoff.expo,
76+
CommunicationError,
77+
max_time=_get_max_retry_time_generic,
78+
max_value=_get_max_retry_delay_time,
79+
on_backoff=self._backoff_log,
80+
giveup=self._backoff_shutdown,
81+
)
82+
@backoff.on_exception(
83+
backoff.expo,
84+
MaintenanceError,
85+
max_time=_get_max_retry_time_maintenance,
86+
max_value=_get_max_retry_delay_time,
87+
on_backoff=self._backoff_log,
88+
giveup=self._backoff_shutdown,
89+
)
90+
async def _connect():
91+
async with self.lock:
92+
if self.ws is None or not self.ws.open:
93+
try:
94+
url = self.get_url()
95+
self.ws = await websockets.connect(
96+
url,
97+
extra_headers=self.config.get_headers(),
98+
ping_interval=60,
99+
ping_timeout=60,
100+
max_queue=128,
101+
)
102+
await (await self.ws.ping())
103+
await self.do_handshake()
104+
logger.info(f'Connected to {url}')
105+
except InvalidStatusCode as ic:
106+
if ic.status_code == 502:
107+
logger.warning('Maintenance in progress...')
108+
raise MaintenanceError()
109+
else:
110+
logger.warning(
111+
f'Received an unexpected status from server: {ic.status_code}...',
112+
)
113+
raise CommunicationError()
114+
except ConnectionClosedError:
115+
logger.warning(
116+
'Connection closed by the host...',
117+
)
118+
raise CommunicationError()
119+
except Exception as e:
120+
logger.exception(f'Received an unexpected exception: {e}...')
121+
raise CommunicationError()
122+
123+
await _connect()
124+
125+
async def do_handshake(self):
126+
await self.send(self.get_setup_request())
127+
message = await asyncio.wait_for(self.ws.recv(), timeout=5)
128+
message = Message.deserialize(json.loads(message))
129+
self.process_setup_response(message.data)
130+
131+
async def send(self, message):
132+
"""
133+
Send a message to the websocket server.
134+
"""
135+
await self.ws.send(json.dumps(message))
136+
137+
async def receive(self):
138+
"""
139+
Receive a message from the websocket server.
140+
"""
141+
try:
142+
message = await asyncio.wait_for(self.ws.recv(), timeout=1)
143+
print(message)
144+
return json.loads(message)
145+
except TimeoutError: # pragma: no cover
146+
pass
147+
148+
async def run(self): # noqa: CCR001
149+
"""
150+
Main loop for the websocket connection.
151+
Once started, this worker will send the capabilities message to
152+
the websocket server and start a loop to receive messages from the
153+
websocket server.
154+
"""
155+
await self.run_event.wait()
156+
while self.run_event.is_set():
157+
try:
158+
await self.ensure_connection()
159+
while self.run_event.is_set():
160+
message = await self.receive()
161+
if not message:
162+
continue
163+
logger.debug('New message received via WS')
164+
await self.process_message(message)
165+
except (ConnectionClosedOK, StopBackoffError):
166+
self.run_event.clear()
167+
continue
168+
except (CommunicationError, MaintenanceError):
169+
logger.error('Max connection attemps reached, exit!')
170+
self.run_event.clear()
171+
continue
172+
except ConnectionClosedError:
173+
logger.warning(
174+
f'Disconnected from: {self.get_url()}'
175+
f', try to reconnect in {DELAY_ON_CONNECT_EXCEPTION_SECONDS}s',
176+
)
177+
await asyncio.sleep(DELAY_ON_CONNECT_EXCEPTION_SECONDS)
178+
except InvalidStatusCode as ic:
179+
if ic.status_code == 502:
180+
logger.warning(
181+
'Maintenance in progress'
182+
f', try to reconnect in {DELAY_ON_CONNECT_EXCEPTION_SECONDS}s',
183+
)
184+
await asyncio.sleep(DELAY_ON_CONNECT_EXCEPTION_SECONDS)
185+
else:
186+
logger.warning(
187+
f'Received an unexpected status from server: {ic.status_code}'
188+
f', try to reconnect in {DELAY_ON_CONNECT_EXCEPTION_SECONDS}s',
189+
)
190+
await asyncio.sleep(DELAY_ON_CONNECT_EXCEPTION_SECONDS)
191+
except Exception as e:
192+
logger.exception(
193+
f'Unexpected exception {e}'
194+
f', try to reconnect in {DELAY_ON_CONNECT_EXCEPTION_SECONDS}s',
195+
)
196+
await asyncio.sleep(DELAY_ON_CONNECT_EXCEPTION_SECONDS)
197+
logger.info('Consumer loop exited!')
198+
199+
def process_setup_response(self, data):
200+
"""
201+
Process the configuration message.
202+
It will stop the tasks manager so the extension can be
203+
reconfigured, then restart the tasks manager.
204+
"""
205+
self.config.update_dynamic_config(data)
206+
logger.info('Extension configuration has been updated.')
207+
208+
async def shutdown(self):
209+
"""
210+
Shutdown the extension runner.
211+
"""
212+
logger.info('Shutdown extension runner.')
213+
self.stop()
214+
215+
async def start(self):
216+
"""
217+
Start the runner.
218+
"""
219+
logger.info('Starting control worker...')
220+
self.main_task = asyncio.create_task(self.run())
221+
self.run_event.set()
222+
logger.info('Control worker started')
223+
await self.stop_event.wait()
224+
await self.stopping()
225+
await self.main_task
226+
if self.ws:
227+
await self.ws.close()
228+
logger.info('Control worker stopped')
229+
230+
def stop(self):
231+
"""
232+
Stop the runner.
233+
"""
234+
logger.info('Stopping control worker...')
235+
self.run_event.clear()
236+
self.stop_event.set()
237+
238+
async def send_shutdown(self):
239+
try:
240+
msg = Message(version=2, message_type=MessageType.SHUTDOWN)
241+
await self.send(msg.serialize())
242+
except ConnectionClosedError:
243+
pass
244+
except Exception:
245+
logger.exception('Cannot send shutdown message')
246+
247+
def handle_signal(self):
248+
asyncio.create_task(self.send_shutdown())
249+
time.sleep(1)
250+
self.stop()
251+
252+
@abstractmethod
253+
def get_url(self):
254+
raise NotImplementedError()
255+
256+
@abstractmethod
257+
def get_setup_request(self):
258+
raise NotImplementedError()
259+
260+
@abstractmethod
261+
async def stopping(self):
262+
raise NotImplementedError()
263+
264+
@abstractmethod
265+
async def process_message(self, data):
266+
raise NotImplementedError()
267+
268+
def _backoff_shutdown(self, _):
269+
if not self.run_event.is_set():
270+
logger.info('Worker exiting, stop backoff loop')
271+
raise StopBackoffError()
272+
273+
def _backoff_log(self, details):
274+
logger.info(
275+
f'{to_ordinal(details["tries"])} communication attempt failed, backing off waiting '
276+
f'{details["wait"]:.2f} seconds after next retry. '
277+
f'Elapsed time: {details["elapsed"]:.2f}'
278+
' seconds.',
279+
)

connect/eaas/runner/config.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ def logging_api_key(self):
6969
def variables(self):
7070
return self.dyn_config.variables
7171

72+
@property
73+
def webapp_port(self):
74+
return self.env['webapp_port']
75+
7276
@property
7377
def metadata(self):
7478
return {
@@ -88,13 +92,20 @@ def event_definitions(self):
8892
for definition in (self.dyn_config.event_definitions or [])
8993
}
9094

91-
def get_ws_url(self):
95+
def get_events_ws_url(self):
9296
proto = 'wss' if self.secure else 'ws'
9397
return (
9498
f'{proto}://{self.env["ws_address"]}/public/v1/devops/ws'
9599
f'/{self.env["environment_id"]}/{self.env["instance_id"]}'
96100
)
97101

102+
def get_webapp_ws_url(self):
103+
proto = 'wss' if self.secure else 'ws'
104+
return (
105+
f'{proto}://{self.env["ws_address"]}/public/v1/devops/ws'
106+
f'/{self.env["environment_id"]}/{self.env["instance_id"]}/webapp'
107+
)
108+
98109
def get_api_url(self):
99110
return f'https://{self.env["api_address"]}/public/v1'
100111

connect/eaas/runner/handler.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,17 @@ class ExtensionHandler:
1919
def __init__(self, config: ConfigHelper):
2020
self._config = config
2121
self._extension_class = self.get_extension_class()
22-
self._descriptor = self._extension_class.get_descriptor()
23-
self._events = self.get_events()
24-
self._schedulables = self.get_schedulables()
25-
self._variables = self.get_variables()
22+
if self._extension_class:
23+
self._descriptor = self._extension_class.get_descriptor()
24+
self._events = self.get_events()
25+
self._schedulables = self.get_schedulables()
26+
self._variables = self.get_variables()
2627
self._logging_handler = None
2728

29+
@property
30+
def should_start(self):
31+
return self._extension_class is not None
32+
2833
@property
2934
def events(self):
3035
return self._events

connect/eaas/runner/helpers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def get_environment():
6363
'instance_id': os.getenv('INSTANCE_ID', get_container_id()),
6464
'ws_address': os.getenv('SERVER_ADDRESS', 'api.cnct.info'),
6565
'api_address': os.getenv('API_ADDRESS', os.getenv('SERVER_ADDRESS', 'api.cnct.info')),
66+
'webapp_port': int(os.getenv('WEBAPP_PORT', '53537')),
6667
'background_task_max_execution_time': int(os.getenv(
6768
'BACKGROUND_TASK_MAX_EXECUTION_TIME', BACKGROUND_TASK_MAX_EXECUTION_TIME,
6869
)),

0 commit comments

Comments
 (0)