Skip to content

Commit 35ce440

Browse files
committed
added a customParam config to connectionConfig
1 parent 7dc4185 commit 35ce440

File tree

11 files changed

+222
-16
lines changed

11 files changed

+222
-16
lines changed

document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.mongodb.MongoClientSettings;
77
import com.typesafe.config.Config;
88
import java.time.Duration;
9+
import java.util.Collections;
910
import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode;
1011
import org.hypertrace.core.documentstore.model.config.DatabaseType;
1112
import org.hypertrace.core.documentstore.model.config.DatastoreConfig;
@@ -34,7 +35,8 @@ public DatastoreConfig convert(final Config config) {
3435
null,
3536
AggregatePipelineMode.DEFAULT_ALWAYS,
3637
DataFreshness.SYSTEM_DEFAULT,
37-
Duration.ofMinutes(20)) {
38+
Duration.ofMinutes(20),
39+
Collections.emptyMap()) {
3840
public MongoClientSettings toSettings() {
3941
final MongoClientSettings.Builder settingsBuilder =
4042
MongoClientSettings.builder()
@@ -88,7 +90,8 @@ public DatastoreConfig convert(final Config config) {
8890
connectionConfig.database(),
8991
connectionConfig.credentials(),
9092
connectionConfig.applicationName(),
91-
connectionConfig.connectionPoolConfig()) {
93+
connectionConfig.connectionPoolConfig(),
94+
connectionConfig.customParameters()) {
9295
@Override
9396
public String toConnectionString() {
9497
return config.hasPath("url")

document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
import com.google.common.base.Preconditions;
66
import java.time.Duration;
77
import java.util.ArrayList;
8+
import java.util.Collections;
9+
import java.util.HashMap;
810
import java.util.List;
11+
import java.util.Map;
912
import javax.annotation.Nullable;
1013
import lombok.AccessLevel;
1114
import lombok.AllArgsConstructor;
@@ -34,18 +37,21 @@ public class ConnectionConfig {
3437
@NonNull AggregatePipelineMode aggregationPipelineMode;
3538
@NonNull DataFreshness dataFreshness;
3639
@NonNull Duration queryTimeout;
40+
@NonNull Map<String, String> customParameters;
3741

3842
public ConnectionConfig(
3943
@NonNull List<@NonNull Endpoint> endpoints,
4044
@NonNull String database,
41-
@Nullable ConnectionCredentials credentials) {
45+
@Nullable ConnectionCredentials credentials,
46+
Map<String, String> customParameters) {
4247
this(
4348
endpoints,
4449
database,
4550
credentials,
4651
AggregatePipelineMode.DEFAULT_ALWAYS,
4752
DataFreshness.SYSTEM_DEFAULT,
48-
Duration.ofMinutes(20));
53+
Duration.ofMinutes(20),
54+
customParameters != null ? customParameters : Collections.emptyMap());
4955
}
5056

5157
public static ConnectionConfigBuilder builder() {
@@ -63,6 +69,13 @@ public static class ConnectionConfigBuilder {
6369
ConnectionCredentials credentials;
6470
String applicationName = DEFAULT_APP_NAME;
6571
String replicaSet;
72+
Map<String, String> customParameters = new HashMap<>();
73+
74+
public ConnectionConfigBuilder customParameters(String key, String value) {
75+
this.customParameters.put(key, value);
76+
return this;
77+
}
78+
6679
ConnectionPoolConfig connectionPoolConfig;
6780
AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS;
6881
DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT;
@@ -96,15 +109,17 @@ public ConnectionConfig build() {
96109
connectionPoolConfig,
97110
aggregationPipelineMode,
98111
dataFreshness,
99-
queryTimeout);
112+
queryTimeout,
113+
customParameters);
100114

101115
case POSTGRES:
102116
return new PostgresConnectionConfig(
103117
unmodifiableList(endpoints),
104118
database,
105119
credentials,
106120
applicationName,
107-
connectionPoolConfig);
121+
connectionPoolConfig,
122+
customParameters);
108123
}
109124

110125
throw new IllegalArgumentException("Unsupported database type: " + type);

document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,13 @@
1111
import org.hypertrace.core.documentstore.model.config.DatastoreConfig.DatastoreConfigBuilder;
1212
import org.hypertrace.core.documentstore.model.config.Endpoint.EndpointBuilder;
1313
import org.hypertrace.core.documentstore.model.options.DataFreshness;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
1416

1517
@Value
1618
public class TypesafeConfigDatastoreConfigExtractor {
19+
private static final Logger LOGGER =
20+
LoggerFactory.getLogger(TypesafeConfigDatastoreConfigExtractor.class);
1721
private static final String DEFAULT_HOST_KEY = "host";
1822
private static final String DEFAULT_PORT_KEY = "port";
1923
private static final String DEFAULT_ENDPOINTS_KEY = "endpoints";
@@ -29,6 +33,7 @@ public class TypesafeConfigDatastoreConfigExtractor {
2933
private static final String DEFAULT_AGGREGATION_PIPELINE_MODE_KEY = "aggregationPipelineMode";
3034
private static final String DEFAULT_DATA_FRESHNESS_KEY = "dataFreshness";
3135
private static final String DEFAULT_QUERY_TIMEOUT_KEY = "queryTimeout";
36+
private static final String DEFAULT_CUSTOM_PARAMETERS_PREFIX = "customParams";
3237

3338
@NonNull Config config;
3439
DatastoreConfigBuilder datastoreConfigBuilder;
@@ -74,7 +79,8 @@ private TypesafeConfigDatastoreConfigExtractor(
7479
.poolConnectionSurrenderTimeoutKey(DEFAULT_CONNECTION_IDLE_TIME_KEY)
7580
.aggregationPipelineMode(DEFAULT_AGGREGATION_PIPELINE_MODE_KEY)
7681
.dataFreshnessKey(DEFAULT_DATA_FRESHNESS_KEY)
77-
.queryTimeoutKey(DEFAULT_QUERY_TIMEOUT_KEY);
82+
.queryTimeoutKey(DEFAULT_QUERY_TIMEOUT_KEY)
83+
.customParametersKey(DEFAULT_CUSTOM_PARAMETERS_PREFIX);
7884
}
7985

8086
public static TypesafeConfigDatastoreConfigExtractor from(
@@ -169,6 +175,27 @@ public TypesafeConfigDatastoreConfigExtractor replicaSetKey(@NonNull final Strin
169175
return this;
170176
}
171177

178+
public TypesafeConfigDatastoreConfigExtractor customParametersKey(@NonNull final String key) {
179+
if (config.hasPath(key)) {
180+
try {
181+
// Try to extract parameters as an object (Config)
182+
Config paramConfig = config.getConfig(key);
183+
paramConfig
184+
.entrySet()
185+
.forEach(
186+
entry -> {
187+
connectionConfigBuilder.customParameters(
188+
entry.getKey(), paramConfig.getString(entry.getKey()));
189+
});
190+
} catch (Exception e) {
191+
// If not a Config object, log warning
192+
LOGGER.warn("Custom parameters key '{}' exists but is not a config object", key);
193+
}
194+
}
195+
196+
return this;
197+
}
198+
172199
public TypesafeConfigDatastoreConfigExtractor poolMaxConnectionsKey(@NonNull final String key) {
173200
if (config.hasPath(key)) {
174201
connectionPoolConfigBuilder.maxConnections(config.getInt(key));
@@ -228,6 +255,7 @@ public DatastoreConfig extract() {
228255
connectionConfigBuilder
229256
.connectionPoolConfig(connectionPoolConfigBuilder.build())
230257
.credentials(connectionCredentialsBuilder.build())
258+
.customParameters(connectionConfigBuilder.customParameters())
231259
.build())
232260
.build();
233261
}

document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.time.Duration;
1414
import java.util.ArrayList;
1515
import java.util.List;
16+
import java.util.Map;
1617
import java.util.Optional;
1718
import java.util.concurrent.TimeUnit;
1819
import javax.annotation.Nullable;
@@ -53,14 +54,16 @@ public MongoConnectionConfig(
5354
@Nullable final ConnectionPoolConfig connectionPoolConfig,
5455
@NonNull final AggregatePipelineMode aggregationPipelineMode,
5556
@NonNull final DataFreshness dataFreshness,
56-
@NonNull final Duration queryTimeout) {
57+
@NonNull final Duration queryTimeout,
58+
@NonNull final Map<String, String> customParameters) {
5759
super(
5860
ensureAtLeastOneEndpoint(endpoints),
5961
getDatabaseOrDefault(database),
6062
getCredentialsOrDefault(credentials, database),
6163
aggregationPipelineMode,
6264
dataFreshness,
63-
queryTimeout);
65+
queryTimeout,
66+
customParameters);
6467
this.applicationName = applicationName;
6568
this.replicaSetName = replicaSetName;
6669
this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig);

document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import java.util.ArrayList;
77
import java.util.List;
8+
import java.util.Map;
89
import java.util.Optional;
910
import java.util.Properties;
1011
import javax.annotation.Nullable;
@@ -45,11 +46,13 @@ public PostgresConnectionConfig(
4546
@Nullable final String database,
4647
@Nullable final ConnectionCredentials credentials,
4748
@NonNull final String applicationName,
48-
@Nullable final ConnectionPoolConfig connectionPoolConfig) {
49+
@Nullable final ConnectionPoolConfig connectionPoolConfig,
50+
@NonNull final Map<String, String> customParameters) {
4951
super(
5052
ensureSingleEndpoint(endpoints),
5153
getDatabaseOrDefault(database),
52-
getCredentialsOrDefault(credentials));
54+
getCredentialsOrDefault(credentials),
55+
customParameters);
5356
this.applicationName = applicationName;
5457
this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig);
5558
}

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.sql.ResultSet;
77
import java.sql.SQLException;
88
import java.time.Duration;
9+
import java.util.Map;
910
import java.util.concurrent.TimeUnit;
1011
import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig;
1112
import org.hypertrace.core.documentstore.model.config.postgres.PostgresDefaults;
@@ -51,6 +52,10 @@ public Connection getPooledConnection() throws SQLException {
5152
return connectionPool.getConnection();
5253
}
5354

55+
public Map<String, String> getCustomParameters() {
56+
return connectionConfig.customParameters();
57+
}
58+
5459
public void close() {
5560
if (connection != null) {
5661
try {

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.google.common.annotations.VisibleForTesting;
3333
import java.io.IOException;
3434
import java.math.BigInteger;
35+
import java.sql.Array;
3536
import java.sql.BatchUpdateException;
3637
import java.sql.Connection;
3738
import java.sql.PreparedStatement;
@@ -96,6 +97,7 @@ public class PostgresCollection implements Collection {
9697
private static final String CREATED_NOW_ALIAS = "created_now_alias";
9798
private static final CloseableIterator<Document> EMPTY_ITERATOR =
9899
CloseableIterator.emptyIterator();
100+
private static final String FLAT_STRUCTURE_COLLECTION_KEY = "flatStructureCollection";
99101

100102
private final PostgresClient client;
101103
private final PostgresTableIdentifier tableIdentifier;
@@ -492,7 +494,9 @@ public CloseableIterator<Document> find(
492494
@Override
493495
public CloseableIterator<Document> query(
494496
final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) {
495-
return queryExecutor.execute(client.getConnection(), query);
497+
String flatStructureCollectionName =
498+
client.getCustomParameters().get(FLAT_STRUCTURE_COLLECTION_KEY);
499+
return queryExecutor.execute(client.getConnection(), query, flatStructureCollectionName);
496500
}
497501

498502
@Override
@@ -1254,11 +1258,135 @@ private String getSubDocUpdateQuery() {
12541258
tableIdentifier, DOCUMENT, DOCUMENT, ID);
12551259
}
12561260

1261+
static class PostgresResultIteratorWithBasicTypes extends PostgresResultIterator {
1262+
1263+
public PostgresResultIteratorWithBasicTypes(ResultSet resultSet) {
1264+
super(resultSet);
1265+
}
1266+
1267+
@Override
1268+
public Document next() {
1269+
try {
1270+
if (!cursorMovedForward) {
1271+
resultSet.next();
1272+
}
1273+
// reset the cursorMovedForward state, if it was forwarded in hasNext.
1274+
cursorMovedForward = false;
1275+
return prepareDocument();
1276+
} catch (IOException | SQLException e) {
1277+
System.out.println("prepare document failed!");
1278+
closeResultSet();
1279+
return JSONDocument.errorDocument(e.getMessage());
1280+
}
1281+
}
1282+
1283+
protected Document prepareDocument() throws SQLException, IOException {
1284+
ObjectNode jsonNode = MAPPER.createObjectNode();
1285+
1286+
// Get metadata to iterate through all columns
1287+
ResultSetMetaData metaData = resultSet.getMetaData();
1288+
int columnCount = metaData.getColumnCount();
1289+
1290+
for (int i = 1; i <= columnCount; i++) {
1291+
String columnName = metaData.getColumnName(i);
1292+
String columnType = metaData.getColumnTypeName(i);
1293+
1294+
addColumnToJsonNode(jsonNode, columnName, columnType, i);
1295+
}
1296+
1297+
// Remove document ID if needed
1298+
if (removeDocumentId) {
1299+
jsonNode.remove(DOCUMENT_ID);
1300+
}
1301+
1302+
return new JSONDocument(MAPPER.writeValueAsString(jsonNode));
1303+
}
1304+
1305+
private void addColumnToJsonNode(
1306+
ObjectNode jsonNode, String columnName, String columnType, int columnIndex)
1307+
throws SQLException {
1308+
switch (columnType.toLowerCase()) {
1309+
case "bool":
1310+
case "boolean":
1311+
boolean boolValue = resultSet.getBoolean(columnIndex);
1312+
if (!resultSet.wasNull()) {
1313+
jsonNode.put(columnName, boolValue);
1314+
}
1315+
break;
1316+
1317+
case "int4":
1318+
case "integer":
1319+
int intValue = resultSet.getInt(columnIndex);
1320+
if (!resultSet.wasNull()) {
1321+
jsonNode.put(columnName, intValue);
1322+
}
1323+
break;
1324+
1325+
case "int8":
1326+
case "bigint":
1327+
long longValue = resultSet.getLong(columnIndex);
1328+
if (!resultSet.wasNull()) {
1329+
jsonNode.put(columnName, longValue);
1330+
}
1331+
break;
1332+
1333+
case "float8":
1334+
case "double":
1335+
double doubleValue = resultSet.getDouble(columnIndex);
1336+
if (!resultSet.wasNull()) {
1337+
jsonNode.put(columnName, doubleValue);
1338+
}
1339+
break;
1340+
1341+
case "text":
1342+
case "varchar":
1343+
String stringValue = resultSet.getString(columnIndex);
1344+
if (stringValue != null) {
1345+
jsonNode.put(columnName, stringValue);
1346+
}
1347+
break;
1348+
1349+
case "_text": // text array
1350+
Array array = resultSet.getArray(columnIndex);
1351+
if (array != null) {
1352+
String[] stringArray = (String[]) array.getArray();
1353+
ArrayNode arrayNode = MAPPER.createArrayNode();
1354+
for (String item : stringArray) {
1355+
arrayNode.add(item);
1356+
}
1357+
jsonNode.set(columnName, arrayNode);
1358+
}
1359+
break;
1360+
1361+
case "jsonb":
1362+
case "json":
1363+
String jsonString = resultSet.getString(columnIndex);
1364+
if (jsonString != null) {
1365+
try {
1366+
JsonNode jsonValue = MAPPER.readTree(jsonString);
1367+
jsonNode.set(columnName, jsonValue);
1368+
} catch (IOException e) {
1369+
// Fallback to string if JSON parsing fails
1370+
jsonNode.put(columnName, jsonString);
1371+
}
1372+
}
1373+
break;
1374+
1375+
default:
1376+
Object objectValue = resultSet.getObject(columnIndex);
1377+
if (objectValue != null) {
1378+
jsonNode.put(columnName, objectValue.toString());
1379+
}
1380+
break;
1381+
}
1382+
}
1383+
}
1384+
12571385
static class PostgresResultIterator implements CloseableIterator<Document> {
12581386

12591387
protected final ObjectMapper MAPPER = new ObjectMapper();
12601388
protected ResultSet resultSet;
1261-
private final boolean removeDocumentId;
1389+
final boolean removeDocumentId;
12621390
protected boolean cursorMovedForward = false;
12631391
protected boolean hasNext = false;
12641392

0 commit comments

Comments
 (0)