Skip to content

Commit e192613

Browse files
author
ffaraoneim
authored
Merge pull request #23 from cloudblue/LITE-19830-add-extra-metadata-to-logs
LITE-19830 add extra metadata to logging extras
2 parents dc19842 + 638e2c9 commit e192613

File tree

6 files changed

+259
-170
lines changed

6 files changed

+259
-170
lines changed

connect/eaas/dataclasses.py

Lines changed: 32 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -48,58 +48,52 @@ class TaskPayload:
4848
task_category: str
4949
task_type: str
5050
object_id: str
51-
result: str = None
51+
result: Optional[str] = None
5252
data: Any = None
5353
countdown: int = 0
54-
output: str = None
55-
correlation_id: str = None
56-
reply_to: str = None
57-
58-
def to_json(self):
59-
return dataclasses.asdict(self)
54+
output: Optional[str] = None
55+
correlation_id: Optional[str] = None
56+
reply_to: Optional[str] = None
6057

6158

6259
@dataclasses.dataclass
6360
class ConfigurationPayload:
64-
configuration: dict = None
65-
logging_api_key: str = None
66-
environment_type: str = None
67-
log_level: str = None
68-
runner_log_level: str = None
69-
70-
def to_json(self):
71-
return dataclasses.asdict(self)
61+
configuration: Optional[dict] = None
62+
logging_api_key: Optional[str] = None
63+
environment_type: Optional[str] = None
64+
account_id: Optional[str] = None
65+
account_name: Optional[str] = None
66+
log_level: Optional[str] = None
67+
runner_log_level: Optional[str] = None
7268

7369

7470
@dataclasses.dataclass
7571
class CapabilitiesPayload:
7672
capabilities: dict
77-
readme_url: str = None
78-
changelog_url: str = None
73+
readme_url: Optional[str] = None
74+
changelog_url: Optional[str] = None
7975

80-
def to_json(self):
81-
return dataclasses.asdict(self)
8276

83-
84-
@dataclasses.dataclass(init=False)
77+
@dataclasses.dataclass
8578
class Message:
8679
message_type: str
8780
data: Optional[Union[CapabilitiesPayload, ConfigurationPayload, TaskPayload]] = None
8881

89-
def __init__(self, message_type=None, data=None):
90-
self.message_type = message_type
91-
if isinstance(data, dict):
92-
if self.message_type == MessageType.CONFIGURATION:
93-
self.data = ConfigurationPayload(**data)
94-
elif self.message_type == MessageType.TASK:
95-
self.data = TaskPayload(**data)
96-
elif self.message_type == MessageType.CAPABILITIES:
97-
self.data = CapabilitiesPayload(**data)
98-
else:
99-
self.data = data
100-
101-
def to_json(self):
102-
payload = {'message_type': self.message_type}
103-
if self.data:
104-
payload['data'] = dataclasses.asdict(self.data)
105-
return payload
82+
83+
def from_dict(cls, data):
84+
field_names = set(f.name for f in dataclasses.fields(cls))
85+
return cls(**{k: v for k, v in data.items() if k in field_names})
86+
87+
88+
def parse_message(payload):
89+
message_type = payload['message_type']
90+
if message_type == MessageType.CONFIGURATION:
91+
data = from_dict(ConfigurationPayload, payload.get('data'))
92+
elif message_type == MessageType.TASK:
93+
data = from_dict(TaskPayload, payload.get('data'))
94+
elif message_type == MessageType.CAPABILITIES:
95+
data = from_dict(CapabilitiesPayload, payload.get('data'))
96+
else:
97+
data = payload.get('data')
98+
99+
return Message(message_type=message_type, data=data)

connect/eaas/manager.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# Copyright (c) 2021 Ingram Micro. All Rights Reserved.
55
#
66
import asyncio
7+
import dataclasses
78
import inspect
89
import logging
910
import traceback
@@ -90,7 +91,7 @@ async def submit_task(self, data):
9091
object_id = data.object_id
9192
task_type = data.task_type
9293
method_name = TASK_TYPE_EXT_METHOD_MAP[task_type]
93-
extension = self.worker.get_extension()
94+
extension = self.worker.get_extension(data.task_id)
9495
method = getattr(extension, method_name)
9596
logger.debug(f'invoke {method_name}')
9697
self.running_tasks += 1
@@ -175,7 +176,7 @@ async def result_sender(self): # noqa: CCR001
175176
message_type=MessageType.TASK,
176177
data=result,
177178
)
178-
await self.worker.send(message.to_json())
179+
await self.worker.send(dataclasses.asdict(message))
179180
logger.info(f'Result for task {result.task_id} has been sent.')
180181
break
181182
except Exception:
@@ -222,13 +223,13 @@ async def build_bg_response(self, task_data, future):
222223
"""
223224
Wait for a background task to be completed and than uild the task result message.
224225
"""
225-
result_message = TaskPayload(**task_data.to_json())
226+
result_message = TaskPayload(**dataclasses.asdict(task_data))
226227
result = None
227228
try:
228229
result = await asyncio.wait_for(future, timeout=BACKGROUND_TASK_MAX_EXECUTION_TIME)
229230
except Exception as e:
230231
logger.warning(f'Got exception during execution of task {task_data.task_id}: {e}')
231-
self.worker.get_extension().logger.exception(
232+
self.worker.get_extension(task_data.task_id).logger.exception(
232233
f'Unhandled exception during execution of task {task_data.task_id}',
233234
)
234235
result_message.result = ResultType.RETRY
@@ -249,12 +250,12 @@ async def build_interactive_response(self, task_data, future):
249250
Wait for an interactive task to be completed and than uild the task result message.
250251
"""
251252
result = None
252-
result_message = TaskPayload(**task_data.to_json())
253+
result_message = TaskPayload(**dataclasses.asdict(task_data))
253254
try:
254255
result = await asyncio.wait_for(future, timeout=INTERACTIVE_TASK_MAX_EXECUTION_TIME)
255256
except Exception as e:
256257
logger.warning(f'Got exception during execution of task {task_data.task_id}: {e}')
257-
self.worker.get_extension().logger.exception(
258+
self.worker.get_extension(task_data.task_id).logger.exception(
258259
f'Unhandled exception during execution of task {task_data.task_id}',
259260
)
260261
result_message.result = ResultType.FAIL

connect/eaas/worker.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# Copyright (c) 2021 Ingram Micro. All Rights Reserved.
55
#
66
import asyncio
7+
import dataclasses
78
import json
89
import logging
910
from asyncio.exceptions import TimeoutError
@@ -17,13 +18,21 @@
1718
)
1819

1920
from connect.client import AsyncConnectClient, ConnectClient
20-
from connect.eaas.dataclasses import CapabilitiesPayload, Message, MessageType
21+
from connect.eaas.dataclasses import (
22+
CapabilitiesPayload,
23+
Message,
24+
MessageType,
25+
parse_message,
26+
)
2127
from connect.eaas.helpers import (
2228
get_environment,
2329
get_extension_class,
2430
get_extension_type,
2531
)
26-
from connect.eaas.logging import ExtensionLogHandler, RequestLogger
32+
from connect.eaas.logging import (
33+
ExtensionLogHandler,
34+
RequestLogger,
35+
)
2736
from connect.eaas.manager import TasksManager
2837

2938

@@ -63,6 +72,8 @@ def __init__(self, secure=True):
6372
self.paused = False
6473
self.logging_handler = None
6574
self.environment_type = None
75+
self.account_id = None
76+
self.account_name = None
6677

6778
async def ensure_connection(self):
6879
"""
@@ -93,7 +104,7 @@ async def receive(self):
93104
except TimeoutError:
94105
pass
95106

96-
def get_client(self):
107+
def get_client(self, task_id):
97108
"""
98109
Get an instance of the Connect Openapi Client. If the extension is asyncrhonous
99110
it returns an instance of the AsyncConnectClient otherwise the ConnectClient.
@@ -104,7 +115,10 @@ def get_client(self):
104115
endpoint=f'https://{self.api_address}/public/v1',
105116
use_specs=False,
106117
logger=RequestLogger(
107-
self.get_extension_logger(self.logging_api_key),
118+
logging.LoggerAdapter(
119+
self.get_extension_logger(self.logging_api_key),
120+
{'task_id': task_id},
121+
),
108122
),
109123
)
110124

@@ -122,6 +136,8 @@ def get_extension_logger(self, token):
122136
'environment_id': self.environment_id,
123137
'instance_id': self.instance_id,
124138
'environment_type': self.environment_type,
139+
'account_id': self.account_id,
140+
'account_name': self.account_name,
125141
'api_address': self.api_address,
126142
},
127143
)
@@ -151,10 +167,13 @@ def get_url(self):
151167
url = f'{self.base_ws_url}/{self.environment_id}/{self.instance_id}'
152168
return f'{url}?running_tasks={running_tasks}'
153169

154-
def get_extension(self):
170+
def get_extension(self, task_id):
155171
return self.extension_class(
156-
self.get_client(),
157-
self.get_extension_logger(self.logging_api_key),
172+
self.get_client(task_id),
173+
logging.LoggerAdapter(
174+
self.get_extension_logger(self.logging_api_key),
175+
{'task_id': task_id},
176+
),
158177
self.extension_config,
159178
)
160179

@@ -176,7 +195,7 @@ async def run(self): # noqa: CCR001
176195
self.changelog_url,
177196
),
178197
)
179-
await self.send(message.to_json())
198+
await self.send(dataclasses.asdict(message))
180199
while self.run_event.is_set():
181200
await self.ensure_connection()
182201
self.ensure_tasks_manager_running()
@@ -208,7 +227,7 @@ async def process_message(self, data):
208227
"""
209228
Process a message received from the websocket server.
210229
"""
211-
message = Message(**data)
230+
message = parse_message(data)
212231
if message.message_type == MessageType.CONFIGURATION:
213232
await self.configuration(message.data)
214233
elif message.message_type == MessageType.TASK:
@@ -232,6 +251,11 @@ async def configuration(self, data):
232251
self.logging_api_key = data.logging_api_key
233252
if data.environment_type:
234253
self.environment_type = data.environment_type
254+
if data.account_id:
255+
self.account_id = data.account_id
256+
if data.account_name:
257+
self.account_name = data.account_name
258+
235259
if data.log_level:
236260
logger.info(f'Change extesion logger level to {data.log_level}')
237261
logging.getLogger('eaas.extension').setLevel(

tests/test_dataclasses.py

Lines changed: 76 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,93 @@
1+
import dataclasses
2+
13
from connect.eaas.dataclasses import (
24
CapabilitiesPayload,
35
ConfigurationPayload,
6+
from_dict,
47
Message,
58
MessageType,
6-
9+
parse_message,
10+
TaskPayload,
711
)
812

913

10-
def test_capabilities_payload():
11-
assert CapabilitiesPayload(
12-
{'cap1': 'val1'},
13-
'https://example.com/readme',
14-
'https://example.com/changelog',
15-
).to_json() == {
16-
'capabilities': {'cap1': 'val1'},
17-
'readme_url': 'https://example.com/readme',
18-
'changelog_url': 'https://example.com/changelog',
14+
def test_from_dict():
15+
data = {
16+
'capabilities': {'test': 'data'},
17+
'readme_url': 'https://read.me',
18+
'changelog_url': 'https://change.log',
19+
'extra': 'data',
1920
}
21+
capabilities = from_dict(CapabilitiesPayload, data)
2022

23+
assert capabilities.capabilities == data['capabilities']
24+
assert capabilities.changelog_url == data['changelog_url']
25+
assert capabilities.readme_url == data['readme_url']
2126

22-
def test_configuration_payload():
23-
assert ConfigurationPayload(
24-
{'conf1': 'val1'},
25-
'logging-token',
26-
'environ-type',
27-
'log-level',
28-
'runner-log-level',
29-
).to_json() == {
30-
'configuration': {'conf1': 'val1'},
31-
'logging_api_key': 'logging-token',
32-
'environment_type': 'environ-type',
33-
'log_level': 'log-level',
34-
'runner_log_level': 'runner-log-level',
27+
28+
def test_parse_task_message():
29+
msg_data = {
30+
'message_type': 'task',
31+
'data': {
32+
'task_id': 'task_id',
33+
'task_category': 'task_category',
34+
'task_type': 'task_type',
35+
'object_id': 'object_id',
36+
'result': 'result',
37+
'data': {'data': 'value'},
38+
'countdown': 10,
39+
'output': 'output',
40+
'correlation_id': 'correlation_id',
41+
'reply_to': 'reply_to',
42+
},
3543
}
3644

45+
message = parse_message(msg_data)
46+
47+
assert isinstance(message, Message)
48+
assert message.message_type == MessageType.TASK
49+
assert isinstance(message.data, TaskPayload)
3750

38-
def test_message_capabilities():
39-
cap = CapabilitiesPayload(
40-
{'cap1': 'val1'},
41-
'https://example.com/readme',
42-
'https://example.com/changelog',
43-
)
51+
assert dataclasses.asdict(message) == msg_data
4452

45-
msg = Message(
46-
MessageType.CAPABILITIES,
47-
cap.to_json(),
48-
)
49-
assert msg.data == cap
5053

51-
assert msg.to_json() == {
52-
'message_type': MessageType.CAPABILITIES,
53-
'data': cap.to_json(),
54+
def test_parse_capabilities_message():
55+
msg_data = {
56+
'message_type': 'capabilities',
57+
'data': {
58+
'capabilities': {'test': 'data'},
59+
'readme_url': 'https://read.me',
60+
'changelog_url': 'https://change.log',
61+
},
5462
}
63+
64+
message = parse_message(msg_data)
65+
66+
assert isinstance(message, Message)
67+
assert message.message_type == MessageType.CAPABILITIES
68+
assert isinstance(message.data, CapabilitiesPayload)
69+
70+
assert dataclasses.asdict(message) == msg_data
71+
72+
73+
def test_parse_configuration_message():
74+
msg_data = {
75+
'message_type': 'configuration',
76+
'data': {
77+
'configuration': {'conf1': 'val1'},
78+
'logging_api_key': 'logging-token',
79+
'environment_type': 'environ-type',
80+
'log_level': 'log-level',
81+
'runner_log_level': 'runner-log-level',
82+
'account_id': 'account_id',
83+
'account_name': 'account_name',
84+
},
85+
}
86+
87+
message = parse_message(msg_data)
88+
89+
assert isinstance(message, Message)
90+
assert message.message_type == MessageType.CONFIGURATION
91+
assert isinstance(message.data, ConfigurationPayload)
92+
93+
assert dataclasses.asdict(message) == msg_data

0 commit comments

Comments
 (0)