Skip to content

Commit 5ac762c

Browse files
committed
add optional db connection pool support
1 parent 5a36940 commit 5ac762c

File tree

4 files changed

+258
-3
lines changed

4 files changed

+258
-3
lines changed

gateway-ha/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@
104104
</exclusions>
105105
</dependency>
106106

107+
<dependency>
108+
<groupId>com.zaxxer</groupId>
109+
<artifactId>HikariCP</artifactId>
110+
<version>7.0.2</version>
111+
</dependency>
112+
107113
<dependency>
108114
<groupId>io.airlift</groupId>
109115
<artifactId>aircompressor-v3</artifactId>

gateway-ha/src/main/java/io/trino/gateway/ha/config/DataStoreConfiguration.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,32 @@ public class DataStoreConfiguration
2121
private String driver;
2222
private Integer queryHistoryHoursRetention = 4;
2323
private boolean runMigrationsEnabled = true;
24+
private Integer maxPoolSize;
2425

25-
public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled)
26+
public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled, Integer maxPoolSize)
2627
{
2728
this.jdbcUrl = jdbcUrl;
2829
this.user = user;
2930
this.password = password;
3031
this.driver = driver;
3132
this.queryHistoryHoursRetention = queryHistoryHoursRetention;
3233
this.runMigrationsEnabled = runMigrationsEnabled;
34+
this.maxPoolSize = maxPoolSize;
35+
}
36+
37+
public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled)
38+
{
39+
this(jdbcUrl, user, password, driver, queryHistoryHoursRetention, runMigrationsEnabled, null);
40+
}
41+
42+
public Integer getMaxPoolSize()
43+
{
44+
return this.maxPoolSize;
45+
}
46+
47+
public void setMaxPoolSize(Integer maxPoolSize)
48+
{
49+
this.maxPoolSize = maxPoolSize;
3350
}
3451

3552
public DataStoreConfiguration() {}

gateway-ha/src/main/java/io/trino/gateway/ha/persistence/JdbcConnectionManager.java

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
package io.trino.gateway.ha.persistence;
1515

1616
import com.google.common.annotations.VisibleForTesting;
17+
import com.zaxxer.hikari.HikariConfig;
18+
import com.zaxxer.hikari.HikariDataSource;
1719
import io.airlift.log.Logger;
1820
import io.trino.gateway.ha.config.DataStoreConfiguration;
1921
import io.trino.gateway.ha.persistence.dao.QueryHistoryDao;
@@ -24,6 +26,8 @@
2426
import java.net.URI;
2527
import java.net.URISyntaxException;
2628
import java.nio.file.Path;
29+
import java.util.Map;
30+
import java.util.concurrent.ConcurrentHashMap;
2731
import java.util.concurrent.Executors;
2832
import java.util.concurrent.ScheduledExecutorService;
2933
import java.util.concurrent.TimeUnit;
@@ -39,6 +43,8 @@ public class JdbcConnectionManager
3943
private final ScheduledExecutorService executorService =
4044
Executors.newSingleThreadScheduledExecutor();
4145

46+
private final Map<String, HikariDataSource> pools = new ConcurrentHashMap<>();
47+
4248
public JdbcConnectionManager(Jdbi jdbi, DataStoreConfiguration configuration)
4349
{
4450
this.jdbi = requireNonNull(jdbi, "jdbi is null")
@@ -59,7 +65,18 @@ public Jdbi getJdbi(@Nullable String routingGroupDatabase)
5965
return jdbi;
6066
}
6167

62-
return Jdbi.create(buildJdbcUrl(routingGroupDatabase), configuration.getUser(), configuration.getPassword())
68+
Integer maxPoolSize = configuration.getMaxPoolSize();
69+
if (maxPoolSize != null && maxPoolSize > 0) {
70+
HikariDataSource ds = getOrCreateDataSource(routingGroupDatabase, maxPoolSize);
71+
return Jdbi.create(ds)
72+
.installPlugin(new SqlObjectPlugin())
73+
.registerRowMapper(new RecordAndAnnotatedConstructorMapper());
74+
}
75+
76+
return Jdbi.create(
77+
buildJdbcUrl(routingGroupDatabase),
78+
configuration.getUser(),
79+
configuration.getPassword())
6380
.installPlugin(new SqlObjectPlugin())
6481
.registerRowMapper(new RecordAndAnnotatedConstructorMapper());
6582
}
@@ -107,11 +124,51 @@ private void startCleanUps()
107124
executorService.scheduleWithFixedDelay(
108125
() -> {
109126
log.info("Performing query history cleanup task");
110-
long created = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(this.configuration.getQueryHistoryHoursRetention());
127+
long created = System.currentTimeMillis()
128+
- TimeUnit.HOURS.toMillis(this.configuration.getQueryHistoryHoursRetention());
111129
jdbi.onDemand(QueryHistoryDao.class).deleteOldHistory(created);
112130
},
113131
1,
114132
120,
115133
TimeUnit.MINUTES);
116134
}
135+
136+
private HikariDataSource getOrCreateDataSource(String routingGroupDatabase, int maxPoolSize)
137+
{
138+
return pools.compute(routingGroupDatabase, (key, existing) -> {
139+
if (existing != null && !existing.isClosed()) {
140+
return existing;
141+
}
142+
143+
HikariConfig cfg = new HikariConfig();
144+
cfg.setJdbcUrl(buildJdbcUrl(key));
145+
cfg.setUsername(configuration.getUser());
146+
cfg.setPassword(configuration.getPassword());
147+
if (configuration.getDriver() != null) {
148+
cfg.setDriverClassName(configuration.getDriver());
149+
}
150+
cfg.setMaximumPoolSize(maxPoolSize);
151+
cfg.setPoolName("gateway-ha-" + key);
152+
153+
return new HikariDataSource(cfg);
154+
});
155+
}
156+
157+
public void close()
158+
{
159+
for (Map.Entry<String, HikariDataSource> e : pools.entrySet()) {
160+
HikariDataSource ds = e.getValue();
161+
if (ds != null && !ds.isClosed()) {
162+
try {
163+
ds.close();
164+
}
165+
catch (RuntimeException ex) {
166+
log.warn(ex, "Failed to close datasource for key: %s", e.getKey());
167+
}
168+
}
169+
}
170+
pools.clear();
171+
172+
executorService.shutdownNow();
173+
}
117174
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.persistence;
15+
16+
import io.trino.gateway.ha.config.DataStoreConfiguration;
17+
import org.jdbi.v3.core.Jdbi;
18+
import org.junit.jupiter.api.Test;
19+
20+
import java.nio.file.Path;
21+
import java.sql.Connection;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.Future;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.TimeoutException;
30+
31+
import static org.assertj.core.api.Assertions.assertThat;
32+
33+
final class TestJdbcConnectionManagerPool
34+
{
35+
@Test
36+
void blocksWhenExceedingMaxPoolSize()
37+
throws Exception
38+
{
39+
String dbPath = Path.of(System.getProperty("java.io.tmpdir"), "h2db-pool-" + System.currentTimeMillis()).toString();
40+
String jdbcUrl = "jdbc:h2:" + dbPath;
41+
42+
DataStoreConfiguration cfg = new DataStoreConfiguration(
43+
jdbcUrl, "sa", "sa", "org.h2.Driver",
44+
4, true,
45+
2);
46+
47+
JdbcConnectionManager cm = new JdbcConnectionManager(Jdbi.create(jdbcUrl, "sa", "sa"), cfg);
48+
Jdbi jdbi = cm.getJdbi("testdb");
49+
50+
try (ExecutorService es = Executors.newFixedThreadPool(3)) {
51+
List<Future<Connection>> acquired = new ArrayList<>();
52+
53+
CountDownLatch hold = new CountDownLatch(1);
54+
CountDownLatch acquiredLatch = new CountDownLatch(2);
55+
56+
// Open exactly maxPoolSize connections and keep them open
57+
for (int i = 0; i < 2; i++) {
58+
acquired.add(es.submit(() -> {
59+
try (var h = jdbi.open()) {
60+
acquiredLatch.countDown();
61+
boolean released = hold.await(10, TimeUnit.SECONDS);
62+
assertThat(released).as("hold latch should be released by the test").isTrue();
63+
}
64+
return null;
65+
}));
66+
}
67+
68+
// Wait until both connections are actually acquired (avoid race)
69+
boolean bothAcquired = acquiredLatch.await(3, TimeUnit.SECONDS);
70+
assertThat(bothAcquired).as("both connections should be acquired before third attempt").isTrue();
71+
72+
// Third attempt should block since the pool is full
73+
Future<Boolean> third = es.submit(() -> {
74+
var h = jdbi.open();
75+
h.close();
76+
return true;
77+
});
78+
79+
boolean completedIn200ms = false;
80+
try {
81+
third.get(200, TimeUnit.MILLISECONDS);
82+
completedIn200ms = true; // if this happens when connection was not blocked, which is wrong
83+
}
84+
catch (TimeoutException expected) {
85+
// expected, means the request was blocked on the pool
86+
}
87+
88+
assertThat(completedIn200ms)
89+
.as("third getJdbi().open() should be blocked by maxPoolSize=2")
90+
.isFalse();
91+
92+
// Release the first two connections, the third one should complete now
93+
hold.countDown();
94+
assertThat(third.get(3, TimeUnit.SECONDS)).isTrue();
95+
96+
// Wait for the first two to finish gracefully
97+
for (Future<Connection> f : acquired) {
98+
f.get(3, TimeUnit.SECONDS);
99+
}
100+
}
101+
}
102+
103+
@Test
104+
void doesNotBlockWhenMaxPoolSizeIsNull()
105+
throws Exception
106+
{
107+
String dbPath = Path.of(System.getProperty("java.io.tmpdir"), "h2db-nopool-" + System.currentTimeMillis()).toString();
108+
String jdbcUrl = "jdbc:h2:" + dbPath;
109+
110+
// maxPoolSize == null -> no pool path
111+
DataStoreConfiguration cfg = new DataStoreConfiguration(
112+
jdbcUrl, "sa", "sa", "org.h2.Driver",
113+
4, true);
114+
115+
JdbcConnectionManager cm = new JdbcConnectionManager(Jdbi.create(jdbcUrl, "sa", "sa"), cfg);
116+
Jdbi jdbi = cm.getJdbi("testdb");
117+
118+
try (ExecutorService es = Executors.newFixedThreadPool(3)) {
119+
try {
120+
CountDownLatch hold = new CountDownLatch(1);
121+
CountDownLatch acquiredLatch = new CountDownLatch(2);
122+
123+
// Open two connections and keep them open
124+
for (int i = 0; i < 2; i++) {
125+
es.submit(() -> {
126+
try (var h = jdbi.open()) {
127+
acquiredLatch.countDown();
128+
boolean released = hold.await(10, TimeUnit.SECONDS);
129+
assertThat(released).isTrue();
130+
}
131+
return null;
132+
});
133+
}
134+
135+
// Wait until both connections are really open (avoid race conditions)
136+
boolean bothAcquired = acquiredLatch.await(3, TimeUnit.SECONDS);
137+
assertThat(bothAcquired).isTrue();
138+
139+
// Third connection attempt should NOT block since no pool is used
140+
Future<Boolean> third = es.submit(() -> {
141+
var h = jdbi.open();
142+
h.close();
143+
return true;
144+
});
145+
146+
boolean completedIn200ms;
147+
try {
148+
third.get(200, TimeUnit.MILLISECONDS);
149+
completedIn200ms = true; // not blocked - expected behavior
150+
}
151+
catch (TimeoutException ignore) {
152+
completedIn200ms = false; // blocked - incorrect for no-pool case
153+
}
154+
155+
assertThat(completedIn200ms)
156+
.as("third getJdbi().open() should NOT block when no pool is configured")
157+
.isTrue();
158+
159+
// check H2 session count to confirm multiple physical connections were opened
160+
int sessions = jdbi.withHandle(h ->
161+
h.createQuery("SELECT COUNT(*) FROM INFORMATION_SCHEMA.SESSIONS")
162+
.mapTo(int.class)
163+
.one());
164+
assertThat(sessions).isGreaterThanOrEqualTo(3);
165+
166+
// Release the first two connections
167+
hold.countDown();
168+
assertThat(third.get(3, TimeUnit.SECONDS)).isTrue();
169+
}
170+
finally {
171+
es.shutdownNow();
172+
}
173+
}
174+
}
175+
}

0 commit comments

Comments
 (0)