Skip to content

Commit 8ee93c0

Browse files
committed
Add development server
1 parent 4c57774 commit 8ee93c0

13 files changed

+221
-136
lines changed

etc/catalog/jmx.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
connector.name=jmx

etc/catalog/memory.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
connector.name=memory

etc/catalog/tpcds.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
connector.name=tpcds

etc/catalog/tpch.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
connector.name=tpch
2+
tpch.splits-per-node=4

etc/config.properties

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
node.id=coordinator
2+
node.environment=test
3+
4+
coordinator=true
5+
experimental.concurrent-startup=true
6+
node-scheduler.include-coordinator=true
7+
http-server.http.port=8080
8+
query.max-memory=1GB
9+
discovery.uri=http://localhost:8080
10+
11+
# Use task.min-writer-count > 1, as this allows to expose writer-concurrency related bugs.
12+
task.min-writer-count=2
13+
task.concurrency=2
14+
task.max-writer-count=2
15+
16+
# Experimental protocol spooling settings
17+
protocol.spooling.enabled=true
18+
protocol.spooling.shared-secret-key=jxTKysfCBuMZtFqUf8UJDQ1w9ez8rynEJsJqgJf66u0=
19+
protocol.spooling.retrieval-mode=coordinator_proxy
20+
21+
# Disable http request log
22+
http-server.log.enabled=false

etc/jvm.config

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
-server
2+
-Xmx2G
3+
-XX:G1HeapRegionSize=32M
4+
-XX:+ExplicitGCInvokesConcurrent
5+
-XX:+ExitOnOutOfMemoryError
6+
-XX:+HeapDumpOnOutOfMemoryError
7+
-XX:-OmitStackTraceInFastThrow
8+
-XX:ReservedCodeCacheSize=150M
9+
-XX:PerMethodRecompilationCutoff=10000
10+
-XX:PerBytecodeRecompilationCutoff=10000
11+
-Djdk.attach.allowAttachSelf=true
12+
# jdk.nio.maxCachedBufferSize controls what buffers can be allocated in per-thread "temporary buffer cache" (sun.nio.ch.Util). Value of 0 disables the cache.
13+
-Djdk.nio.maxCachedBufferSize=0
14+
# Allow loading dynamic agent used by JOL
15+
-XX:+EnableDynamicAgentLoading
16+
-XX:+UnlockDiagnosticVMOptions
17+
--enable-native-access=ALL-UNNAMED

etc/spooling-manager.properties

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
spooling-manager.name=filesystem
2+
fs.s3.enabled=true
3+
fs.location=s3://spooling/
4+
s3.endpoint=http://172.17.0.1:9000/
5+
s3.region=us-east-1
6+
s3.aws-access-key=minio-access-key
7+
s3.aws-secret-key=minio-secret-key
8+
s3.path-style-access=true
9+
fs.segment.ttl=5m
10+
fs.segment.pruning.interval=15s
11+
fs.segment.pruning.batch-size=250
12+
# Disable as we don't support SSE-C while writing/reading from S3
13+
fs.segment.encryption=false

setup.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@
4646
"pre-commit",
4747
"black",
4848
"isort",
49-
"keyring"
49+
"keyring",
50+
"testcontainers",
51+
"minio"
5052
]
5153

5254
setup(

tests/development_server.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import os
2+
import time
3+
from contextlib import contextmanager
4+
from pathlib import Path
5+
6+
from minio import Minio, S3Error
7+
from testcontainers.core.container import DockerContainer
8+
from testcontainers.core.waiting_utils import wait_for_logs
9+
10+
from trino.constants import DEFAULT_PORT
11+
12+
MINIO_ROOT_USER = "minio-access-key"
13+
MINIO_ROOT_PASSWORD = "minio-secret-key"
14+
15+
TRINO_VERSION = os.environ.get("TRINO_VERSION") or "latest"
16+
TRINO_HOST = "localhost"
17+
18+
19+
def create_bucket():
20+
client = Minio(
21+
"localhost:9000",
22+
access_key=MINIO_ROOT_USER,
23+
secret_key=MINIO_ROOT_PASSWORD,
24+
secure=False
25+
)
26+
bucket_name = "spooling"
27+
try:
28+
# Check if the bucket exists
29+
if not client.bucket_exists(bucket_name):
30+
# Create the bucket if it does not exist
31+
client.make_bucket(bucket_name)
32+
print(f"Bucket {bucket_name} created successfully.")
33+
else:
34+
print(f"Bucket {bucket_name} already exists.")
35+
except S3Error as e:
36+
print(f"Error occurred: {e}")
37+
38+
39+
@contextmanager
40+
def start_development_server(port=None, trino_version=TRINO_VERSION):
41+
minio = None
42+
trino = None
43+
44+
try:
45+
if TRINO_VERSION >= "465":
46+
minio = DockerContainer("minio/minio:latest") \
47+
.with_name("minio") \
48+
.with_command(f"server --address '0.0.0.0:{9000}' --console-address '0.0.0.0:{9001}' -- /data") \
49+
.with_env("MINIO_ROOT_USER", "minio-access-key") \
50+
.with_env("MINIO_ROOT_PASSWORD", "minio-secret-key") \
51+
.with_exposed_ports(9000, 9001) \
52+
.with_bind_ports(9000, 9000) \
53+
.with_bind_ports(9001, 9001)
54+
55+
# Start the container
56+
print("Starting MinIO container...")
57+
minio.start()
58+
59+
# Wait for logs indicating MinIO has started
60+
wait_for_logs(minio, "API: http://", timeout=30)
61+
62+
# create spooling bucket
63+
create_bucket()
64+
65+
trino = DockerContainer(f"trinodb/trino:{trino_version}") \
66+
.with_name("trino") \
67+
.with_env("TRINO_CONFIG_DIR", "/etc/trino") \
68+
.with_bind_ports(DEFAULT_PORT, port)
69+
70+
root = Path(__file__).parent.parent
71+
72+
trino = trino \
73+
.with_volume_mapping(str(root / "etc/config.properties"), "/etc/trino/config.properties") \
74+
.with_volume_mapping(str(root / "etc/jvm.config"), "/etc/trino/jvm.config") \
75+
.with_volume_mapping(str(root / "etc/catalog"), "/etc/trino/catalog")
76+
if TRINO_VERSION >= "465":
77+
trino.with_volume_mapping(
78+
str(root / "etc/spooling-manager.properties"),
79+
"/etc/trino/spooling-manager.properties", "rw")
80+
81+
print("Starting Trino container...")
82+
trino.start()
83+
84+
# Wait for logs indicating the service has started
85+
wait_for_logs(trino, "SERVER STARTED", timeout=60)
86+
87+
# Otherwise some tests fail with No nodes available
88+
time.sleep(2)
89+
90+
yield minio, trino
91+
finally:
92+
# Stop containers when exiting the context
93+
if trino:
94+
print("Stopping Trino container...")
95+
trino.stop()
96+
if minio:
97+
print("Stopping MinIO container...")
98+
minio.stop()
99+
100+
101+
def main():
102+
"""Run Trino setup independently from pytest."""
103+
with start_development_server(port=DEFAULT_PORT):
104+
print(f"Trino started at {TRINO_HOST}:{DEFAULT_PORT}")
105+
106+
# Keep the process running so that the containers stay up
107+
input("Press Enter to stop containers...")
108+
109+
110+
if __name__ == "__main__":
111+
main()

tests/integration/conftest.py

Lines changed: 16 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,24 @@
1212

1313
import os
1414
import socket
15-
import subprocess
16-
import time
1715
from contextlib import closing
18-
from uuid import uuid4
1916

2017
import pytest
2118

2219
import trino.logging
23-
from trino.client import ClientSession, TrinoQuery, TrinoRequest
20+
from tests.development_server import TRINO_HOST, TRINO_VERSION, start_development_server
2421
from trino.constants import DEFAULT_PORT
2522

2623
logger = trino.logging.get_logger(__name__)
2724

2825

29-
TRINO_VERSION = os.environ.get("TRINO_VERSION") or "latest"
30-
TRINO_HOST = "127.0.0.1"
31-
TRINO_PORT = 8080
32-
33-
34-
def is_trino_available():
26+
def is_trino_available(host, port):
3527
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
3628
sock.settimeout(2)
37-
result = sock.connect_ex((TRINO_HOST, DEFAULT_PORT))
29+
result = sock.connect_ex((host, port))
3830
if result == 0:
3931
return True
32+
return False
4033

4134

4235
def get_local_port():
@@ -45,115 +38,21 @@ def get_local_port():
4538
return s.getsockname()[1]
4639

4740

48-
def get_default_trino_image_tag():
49-
return "trinodb/trino:" + TRINO_VERSION
50-
51-
52-
def start_trino(image_tag=None):
53-
if not image_tag:
54-
image_tag = get_default_trino_image_tag()
55-
56-
container_id = "trino-python-client-tests-" + uuid4().hex[:7]
57-
local_port = get_local_port()
58-
logger.info("starting Docker container")
59-
docker_run = [
60-
"docker",
61-
"run",
62-
"--rm",
63-
"-p",
64-
"{host_port}:{cont_port}".format(host_port=local_port, cont_port=TRINO_PORT),
65-
"--name",
66-
container_id,
67-
image_tag,
68-
]
69-
run = subprocess.Popen(docker_run, universal_newlines=True, stderr=subprocess.PIPE)
70-
return (container_id, run, "localhost", local_port)
71-
72-
73-
def wait_for_trino_workers(host, port, timeout=180):
74-
request = TrinoRequest(
75-
host=host,
76-
port=port,
77-
client_session=ClientSession(
78-
user="test_fixture"
79-
)
80-
)
81-
sql = "SELECT state FROM system.runtime.nodes"
82-
t0 = time.time()
83-
while True:
84-
query = TrinoQuery(request, sql)
85-
rows = list(query.execute())
86-
if any(row[0] == "active" for row in rows):
87-
break
88-
if time.time() - t0 > timeout:
89-
raise TimeoutError
90-
time.sleep(1)
91-
92-
93-
def wait_for_trino_coordinator(stream, timeout=180):
94-
started_tag = "======== SERVER STARTED ========"
95-
t0 = time.time()
96-
for line in iter(stream.readline, b""):
97-
if line:
98-
print(line)
99-
if started_tag in line:
100-
time.sleep(5)
101-
return True
102-
if time.time() - t0 > timeout:
103-
logger.error("coordinator took longer than %s to start", timeout)
104-
raise TimeoutError
105-
return False
106-
107-
108-
def start_local_trino_server(image_tag):
109-
container_id, proc, host, port = start_trino(image_tag)
110-
print("trino.server.state starting")
111-
trino_ready = wait_for_trino_coordinator(proc.stderr)
112-
if not trino_ready:
113-
raise Exception("Trino server did not start")
114-
wait_for_trino_workers(host, port)
115-
print("trino.server.state ready")
116-
return container_id, proc, host, port
117-
118-
119-
def start_trino_and_wait(image_tag=None):
120-
container_id = None
121-
proc = None
122-
host = os.environ.get("TRINO_RUNNING_HOST", None)
123-
if host:
124-
port = os.environ.get("TRINO_RUNNING_PORT", DEFAULT_PORT)
125-
else:
126-
container_id, proc, host, port = start_local_trino_server(
127-
image_tag
128-
)
129-
130-
print("trino.server.hostname {}".format(host))
131-
print("trino.server.port {}".format(port))
132-
if proc:
133-
print("trino.server.pid {}".format(proc.pid))
134-
if container_id:
135-
print("trino.server.contained_id {}".format(container_id))
136-
return container_id, proc, host, port
137-
138-
139-
def stop_trino(container_id, proc):
140-
subprocess.check_call(["docker", "kill", container_id])
141-
142-
143-
@pytest.fixture(scope="module")
41+
@pytest.fixture(scope="session")
14442
def run_trino():
145-
if is_trino_available():
146-
yield None, TRINO_HOST, DEFAULT_PORT
147-
return
43+
host = os.environ.get("TRINO_RUNNING_HOST", TRINO_HOST)
44+
port = os.environ.get("TRINO_RUNNING_PORT", DEFAULT_PORT)
14845

149-
image_tag = os.environ.get("TRINO_IMAGE")
150-
if not image_tag:
151-
image_tag = get_default_trino_image_tag()
46+
# Is there any local Trino available
47+
if is_trino_available(host, port):
48+
yield host, port
49+
return
15250

153-
container_id, proc, host, port = start_trino_and_wait(image_tag)
154-
yield proc, host, port
155-
if container_id or proc:
156-
stop_trino(container_id, proc)
51+
# Start Trino and MinIO server
52+
print(f"Could not connect to Trino at {host}:{port}, starting server...")
53+
local_port = get_local_port()
54+
with start_development_server(port=local_port):
55+
yield TRINO_HOST, local_port
15756

15857

15958
def trino_version():

0 commit comments

Comments
 (0)