Skip to content

Commit 5f748ca

Browse files
authored
Clean API to control mocked behavior on proxy (#5775)
1 parent 23d13e6 commit 5f748ca

File tree

6 files changed

+239
-114
lines changed

6 files changed

+239
-114
lines changed

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ filelock==3.12.2 # for parametric tests
99
jsonschema==4.16.0
1010
kubernetes==29.0.0 #lib-injection kubernetes
1111
matplotlib # for performance tests
12-
# mitmproxy==9.0.1 # sueful for proxy development, but failing on gitlab
12+
mitmproxy==9.0.1
1313
msgpack==1.0.4
1414
mypy==1.15.0
1515
opentelemetry-api==1.27.0 # for parametric tests

utils/_context/containers.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -541,8 +541,6 @@ def save_image_info(self, dir_path: str):
541541

542542

543543
class ProxyContainer(TestedContainer):
544-
command_host_port = 11111 # Which port exposed to host to sent proxy commands
545-
546544
def __init__(
547545
self,
548546
*,
@@ -574,12 +572,12 @@ def __init__(
574572
"SYSTEM_TESTS_IPV6": str(enable_ipv6),
575573
"SYSTEM_TEST_MOCKED_BACKEND": str(mocked_backend),
576574
},
577-
working_dir="/app",
575+
working_dir="/app/utils",
578576
volumes={
579577
"./utils/": {"bind": "/app/utils/", "mode": "ro"},
580578
},
581-
ports={f"{ProxyPorts.proxy_commands}/tcp": ("127.0.0.1", self.command_host_port)},
582-
command="python utils/proxy/core.py",
579+
ports={f"{ProxyPorts.proxy_commands}/tcp": ("127.0.0.1", ProxyPorts.proxy_commands)},
580+
command="python -m proxy.core",
583581
healthcheck={
584582
"test": f"python -c \"import socket; s=socket.socket({socket_family}); s.settimeout(2); s.connect(('{host_target}', {ProxyPorts.weblog})); s.close()\"", # noqa: E501
585583
"retries": 30,

utils/_remote_config.py

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,32 +5,17 @@
55
import base64
66
import hashlib
77
import json
8-
import os
98
import re
109
import time
1110
import uuid
1211
from typing import Any
1312
from collections.abc import Mapping
1413

15-
import requests
16-
1714
from utils._context.core import context
1815
from utils.dd_constants import RemoteConfigApplyState as ApplyState
1916
from utils.interfaces import library
2017
from utils._logger import logger
21-
from utils._context.containers import ProxyContainer
22-
23-
24-
def _post(path: str, payload: list[dict] | dict) -> None:
25-
if "SYSTEM_TESTS_PROXY_HOST" in os.environ:
26-
domain = os.environ["SYSTEM_TESTS_PROXY_HOST"]
27-
elif "DOCKER_HOST" in os.environ:
28-
m = re.match(r"(?:ssh:|tcp:|fd:|)//(?:[^@]+@|)([^:]+)", os.environ["DOCKER_HOST"])
29-
domain = m.group(1) if m is not None else "localhost"
30-
else:
31-
domain = "localhost"
32-
33-
requests.post(f"http://{domain}:{ProxyContainer.command_host_port}{path}", data=json.dumps(payload), timeout=30)
18+
from utils.proxy.mocked_response import StaticJsonMockedResponse, SequentialRemoteConfigJsonMockedResponse
3419

3520

3621
class RemoteConfigStateResults:
@@ -126,7 +111,8 @@ def remote_config_applied(data: dict) -> bool:
126111
current_states.state = ApplyState.ACKNOWLEDGED
127112
return True
128113

129-
_post("/unique_command", raw_payload)
114+
StaticJsonMockedResponse(path="/v0.7/config", mocked_json=raw_payload).send()
115+
130116
library.wait_for(remote_config_applied, timeout=30)
131117
# ensure the library has enough time to apply the config to all subprocesses
132118
time.sleep(2)
@@ -140,7 +126,7 @@ def send_sequential_commands(commands: list[dict], *, wait_for_all_command: bool
140126
if len(commands) == 0:
141127
raise ValueError("No commands to send")
142128

143-
_post("/sequential_commands", commands)
129+
SequentialRemoteConfigJsonMockedResponse(mocked_json_sequence=commands).send()
144130

145131
if not wait_for_all_command:
146132
return

utils/proxy/_deserializer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
ExportLogsServiceRequest,
2828
ExportLogsServiceResponse,
2929
)
30-
from _decoders.protobuf_schemas import MetricPayload, TracePayload, SketchPayload, BackendResponsePayload
31-
from traces.trace_v1 import deserialize_v1_trace, _uncompress_agent_v1_trace
30+
from ._decoders.protobuf_schemas import MetricPayload, TracePayload, SketchPayload, BackendResponsePayload
31+
from .traces.trace_v1 import deserialize_v1_trace, _uncompress_agent_v1_trace
3232

3333

3434
logger = logging.getLogger(__name__)

utils/proxy/core.py

Lines changed: 47 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# keep this import in first
2-
import scrubber # noqa: F401
2+
from . import scrubber # noqa: F401
33

44
import asyncio
55
from collections import defaultdict
@@ -11,11 +11,19 @@
1111

1212
from mitmproxy import master, options, http
1313
from mitmproxy.addons import errorcheck, default_addons
14-
from mitmproxy.flow import Error as FlowError, Flow
14+
from mitmproxy.flow import Error as FlowError
1515
from mitmproxy.http import HTTPFlow, Request
1616

17-
from _deserializer import deserialize
18-
from ports import ProxyPorts
17+
from ._deserializer import deserialize
18+
from .ports import ProxyPorts
19+
from .mocked_response import (
20+
MOCKED_RESPONSE_PATH,
21+
MockedResponse,
22+
AddRemoteConfigEndpoint,
23+
RemoveMetaStructsSupport,
24+
SetSpanEventFlags,
25+
StaticJsonMockedResponse,
26+
)
1927

2028
# prevent permission issues on file created by the proxy when the host is linux
2129
os.umask(0)
@@ -43,24 +51,29 @@ def __init__(self) -> None:
4351
logger.addHandler(handler)
4452
logger.setLevel(logging.DEBUG)
4553

46-
self.host_log_folder = "logs"
54+
self.host_log_folder = "/app/logs"
4755

4856
self.rc_api_enabled = os.environ.get("SYSTEM_TESTS_RC_API_ENABLED") == "True"
4957
self.mocked_backend = os.environ.get("SYSTEM_TEST_MOCKED_BACKEND") == "True"
50-
5158
self.span_meta_structs_disabled = os.environ.get("SYSTEM_TESTS_AGENT_SPAN_META_STRUCTS_DISABLED") == "True"
59+
self.span_events = os.environ.get("SYSTEM_TESTS_AGENT_SPAN_EVENTS") != "False"
5260

5361
self.tracing_agent_target_host = os.environ.get("PROXY_TRACING_AGENT_TARGET_HOST", "agent")
5462
self.tracing_agent_target_port = int(os.environ.get("PROXY_TRACING_AGENT_TARGET_PORT", "8127"))
5563

56-
span_events = os.environ.get("SYSTEM_TESTS_AGENT_SPAN_EVENTS")
57-
self.span_events = span_events != "False"
64+
self.mocked_response: MockedResponse | None = None
65+
self.internal_mocked_responses: list[MockedResponse] = [
66+
SetSpanEventFlags(span_events=self.span_events),
67+
]
5868

59-
self.rc_api_command = None
69+
if self.rc_api_enabled:
70+
# add the remote config endpoint on available agent endpoints
71+
self.internal_mocked_responses.append(AddRemoteConfigEndpoint())
72+
# mimic the default response from the agent
73+
self.internal_mocked_responses.append(StaticJsonMockedResponse(path="/v0.7/config", mocked_json={}))
6074

61-
# mimic the old API
62-
self.rc_api_sequential_commands = None
63-
self.rc_api_runtime_ids_request_count: dict = {}
75+
if self.span_meta_structs_disabled:
76+
self.internal_mocked_responses.append(RemoveMetaStructsSupport())
6477

6578
logger.info(f"rc_api_enabled: {self.rc_api_enabled}")
6679
logger.info(f"mocked_backend: {self.mocked_backend}")
@@ -75,20 +88,19 @@ def request(self, flow: HTTPFlow):
7588
# sockname is the local address (host, port) we received this connection on.
7689
port = flow.client_conn.sockname[1]
7790

78-
logger.info(f"{flow.request.method} {flow.request.pretty_url}, using proxy port {port}")
91+
logger.info(f"{flow.request.method} {flow.request.pretty_url} using proxy port {port}")
7992

8093
if port == ProxyPorts.proxy_commands:
81-
if not self.rc_api_enabled:
82-
flow.response = self.get_error_response(b"RC API is not enabled")
83-
elif flow.request.path == "/unique_command":
84-
logger.info("Store RC command to mock")
85-
self.rc_api_command = flow.request.content
86-
flow.response = http.Response.make(200, b"Ok")
87-
elif flow.request.path == "/sequential_commands":
88-
logger.info("Reset mocked RC sequential commands")
89-
self.rc_api_sequential_commands = json.loads(flow.request.content)
90-
self.rc_api_runtime_ids_request_count = defaultdict(int)
91-
flow.response = http.Response.make(200, b"Ok")
94+
if flow.request.path == MOCKED_RESPONSE_PATH and flow.request.method == "PUT":
95+
source = json.loads(flow.request.content)
96+
try:
97+
self.mocked_response = MockedResponse.build_from_json(source) if source else None
98+
except Exception as e:
99+
logger.exception(f"Failed to build mocked response from {source}")
100+
flow.response = self.get_error_response(f"Invalid mocked response definition: {e}".encode())
101+
else:
102+
logger.info(f"Store mocked response :{self.mocked_response}")
103+
flow.response = http.Response.make(200, b"Ok")
92104
else:
93105
flow.response = http.Response.make(404, b"Not found")
94106

@@ -239,70 +251,17 @@ def response(self, flow: HTTPFlow):
239251
except:
240252
logger.exception("Unexpected error")
241253

242-
def _modify_response(self, flow: Flow):
254+
def _modify_response(self, flow: HTTPFlow):
243255
if self.request_is_from_tracer(flow.request):
244-
if self.rc_api_enabled:
245-
self._add_rc_capabilities_in_info_request(flow)
246-
247-
if flow.request.path == "/v0.7/config":
248-
# mimic the default response from the agent
249-
flow.response.status_code = 200
250-
flow.response.content = b"{}"
251-
flow.response.headers["Content-Type"] = "application/json"
252-
253-
if self.rc_api_command is not None:
254-
request_content = json.loads(flow.request.content)
255-
logger.info(" => modifying rc response")
256-
flow.response.content = self.rc_api_command
257-
258-
elif self.rc_api_sequential_commands is not None:
259-
request_content = json.loads(flow.request.content)
260-
runtime_id = request_content["client"]["client_tracer"]["runtime_id"]
261-
nth_api_command = self.rc_api_runtime_ids_request_count[runtime_id]
262-
response = self.rc_api_sequential_commands[nth_api_command]
263-
264-
logger.info(f" => Modifying RC response for runtime ID {runtime_id}")
265-
logger.info(f" => Overwriting /v0.7/config response #{nth_api_command}")
266-
267-
flow.response.content = json.dumps(response).encode()
268-
flow.response.headers["st-proxy-overwrite-rc-response"] = f"{nth_api_command}"
269-
270-
if nth_api_command + 1 < len(self.rc_api_sequential_commands):
271-
self.rc_api_runtime_ids_request_count[runtime_id] = nth_api_command + 1
272-
273-
if self.span_meta_structs_disabled:
274-
self._remove_meta_structs_support(flow)
275-
276-
self._modify_span_events_flag(flow)
277-
278-
def _remove_meta_structs_support(self, flow: Flow):
279-
if flow.request.path == "/info" and str(flow.response.status_code) == "200":
280-
c = json.loads(flow.response.content)
281-
if "span_meta_structs" in c:
282-
logger.info(" => Overwriting /info response to remove span_meta_structs field")
283-
c.pop("span_meta_structs")
284-
flow.response.content = json.dumps(c).encode()
285-
286-
def _add_rc_capabilities_in_info_request(self, flow: Flow):
287-
if flow.request.path == "/info" and str(flow.response.status_code) == "200":
288-
c = json.loads(flow.response.content)
289-
290-
if "/v0.7/config" not in c["endpoints"]:
291-
logger.info(" => Overwriting /info response to include /v0.7/config")
292-
c["endpoints"].append("/v0.7/config")
293-
flow.response.content = json.dumps(c).encode()
294-
295-
def _modify_span_events_flag(self, flow: Flow):
296-
"""Modify the agent flag that signals support for native span event serialization.
297-
There are three possible cases:
298-
- Not configured: agent's response is not modified, the real agent behavior is preserved
299-
- `true`: agent advertises support for native span events serialization
300-
- `false`: agent advertises that it does not support native span events serialization
301-
"""
302-
if flow.request.path == "/info" and str(flow.response.status_code) == "200":
303-
c = json.loads(flow.response.content)
304-
c["span_events"] = self.span_events
305-
flow.response.content = json.dumps(c).encode()
256+
for internal_mocked_response in self.internal_mocked_responses:
257+
if internal_mocked_response.path == flow.request.path:
258+
logger.info(f" => applying {internal_mocked_response}")
259+
internal_mocked_response.execute(flow)
260+
261+
if self.mocked_response is not None:
262+
if self.mocked_response.path == flow.request.path:
263+
logger.info(f" => applying {self.mocked_response}")
264+
self.mocked_response.execute(flow)
306265

307266

308267
def start_proxy() -> None:
@@ -324,7 +283,7 @@ def start_proxy() -> None:
324283
loop = asyncio.new_event_loop()
325284
asyncio.set_event_loop(loop)
326285
listen_host = "::" if os.environ.get("SYSTEM_TESTS_IPV6") == "True" else "0.0.0.0" # noqa: S104
327-
opts = options.Options(mode=modes, listen_host=listen_host, confdir="utils/proxy/.mitmproxy")
286+
opts = options.Options(mode=modes, listen_host=listen_host, confdir="/app/utils/proxy/.mitmproxy")
328287
proxy = master.Master(opts, event_loop=loop)
329288
proxy.addons.add(*default_addons())
330289
proxy.addons.add(errorcheck.ErrorCheck())

0 commit comments

Comments
 (0)