Skip to content

Commit b3e23a0

Browse files
authored
Thrift - input cluster and build compute (#115)
1 parent d6dd2ed commit b3e23a0

25 files changed

+319
-162
lines changed

src/main/java/com/databricks/jdbc/client/DatabricksClient.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.databricks.jdbc.core.IDatabricksStatement;
77
import com.databricks.jdbc.core.ImmutableSessionInfo;
88
import com.databricks.jdbc.core.ImmutableSqlParameter;
9+
import com.databricks.jdbc.core.types.ComputeResource;
910
import java.sql.SQLException;
1011
import java.util.Collection;
1112
import java.util.Map;
@@ -23,15 +24,18 @@ public interface DatabricksClient {
2324
* @return created session
2425
*/
2526
ImmutableSessionInfo createSession(
26-
String warehouseId, String catalog, String schema, Map<String, String> sessionConf);
27+
ComputeResource computeResource,
28+
String catalog,
29+
String schema,
30+
Map<String, String> sessionConf);
2731

2832
/**
2933
* Deletes a session for given session-Id
3034
*
3135
* @param sessionId for which the session should be deleted
3236
* @param warehouseId underlying warehouse-Id
3337
*/
34-
void deleteSession(String sessionId, String warehouseId);
38+
void deleteSession(String sessionId, ComputeResource computeResource);
3539

3640
/**
3741
* Executes a statement in Databricks server
@@ -46,7 +50,7 @@ ImmutableSessionInfo createSession(
4650
*/
4751
DatabricksResultSet executeStatement(
4852
String sql,
49-
String warehouseId,
53+
ComputeResource computeResource,
5054
Map<Integer, ImmutableSqlParameter> parameters,
5155
StatementType statementType,
5256
IDatabricksSession session,

src/main/java/com/databricks/jdbc/client/impl/sdk/DatabricksMetadataSdkClient.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ public DatabricksResultSet listTypeInfo(IDatabricksSession session) {
3838
@Override
3939
public DatabricksResultSet listCatalogs(IDatabricksSession session) throws SQLException {
4040
String showCatalogsSQL = "show catalogs";
41-
LOGGER.debug("SQL command to fetch catalogs: {}" + showCatalogsSQL);
41+
LOGGER.debug("SQL command to fetch catalogs: {}", showCatalogsSQL);
4242

4343
ResultSet rs =
4444
sdkClient.executeStatement(
4545
showCatalogsSQL,
46-
session.getWarehouseId(),
46+
session.getComputeResource(),
4747
new HashMap<Integer, ImmutableSqlParameter>(),
4848
StatementType.METADATA,
4949
session,
@@ -95,12 +95,12 @@ public DatabricksResultSet listSchemas(
9595
if (!WildcardUtil.isMatchAnything(schemaNamePattern)) {
9696
showSchemaSQL += " like '" + schemaNamePattern + "'";
9797
}
98-
LOGGER.debug("SQL command to fetch schemas: {}" + showSchemaSQL);
98+
LOGGER.debug("SQL command to fetch schemas: {}", showSchemaSQL);
9999
try {
100100
ResultSet rs =
101101
sdkClient.executeStatement(
102102
showSchemaSQL,
103-
session.getWarehouseId(),
103+
session.getComputeResource(),
104104
new HashMap<Integer, ImmutableSqlParameter>(),
105105
StatementType.METADATA,
106106
session,
@@ -163,12 +163,12 @@ public DatabricksResultSet listTables(
163163
if (!WildcardUtil.isMatchAnything(tableWithContext)) {
164164
showTablesSQL += " like '" + tableWithContext + "'";
165165
}
166-
LOGGER.debug("SQL command to fetch tables: {}" + showTablesSQL);
166+
LOGGER.debug("SQL command to fetch tables: {}", showTablesSQL);
167167
try {
168168
ResultSet rs =
169169
sdkClient.executeStatement(
170170
showTablesSQL,
171-
session.getWarehouseId(),
171+
session.getComputeResource(),
172172
new HashMap<Integer, ImmutableSqlParameter>(),
173173
StatementType.METADATA,
174174
session,
@@ -265,12 +265,12 @@ public DatabricksResultSet listColumns(
265265
String[] combination = catalogSchemaTableCombinations.poll();
266266
String showColumnsSQL =
267267
"show columns in " + combination[0] + "." + combination[1] + "." + combination[2];
268-
LOGGER.debug("SQL command to fetch columns: {}" + showColumnsSQL);
268+
LOGGER.debug("SQL command to fetch columns: {}", showColumnsSQL);
269269
try {
270270
ResultSet rs =
271271
sdkClient.executeStatement(
272272
showColumnsSQL,
273-
session.getWarehouseId(),
273+
session.getComputeResource(),
274274
new HashMap<Integer, ImmutableSqlParameter>(),
275275
StatementType.METADATA,
276276
session,

src/main/java/com/databricks/jdbc/client/impl/sdk/DatabricksNewMetadataSdkClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public DatabricksResultSet listCatalogs(IDatabricksSession session) throws SQLEx
4343
CommandBuilder commandBuilder = new CommandBuilder(session);
4444
String SQL = commandBuilder.getSQLString(CommandName.LIST_CATALOGS);
4545
LOGGER.debug("SQL command to fetch catalogs: {}", SQL);
46-
ResultSet resultSet = getResultSet(SQL, session);
4746
return MetadataResultSetBuilder.getCatalogsResult(getResultSet(SQL, session));
4847
}
4948

@@ -122,7 +121,7 @@ public DatabricksResultSet listPrimaryKeys(
122121
private ResultSet getResultSet(String SQL, IDatabricksSession session) throws SQLException {
123122
return sdkClient.executeStatement(
124123
SQL,
125-
session.getWarehouseId(),
124+
session.getComputeResource(),
126125
new HashMap<Integer, ImmutableSqlParameter>(),
127126
StatementType.METADATA,
128127
session,

src/main/java/com/databricks/jdbc/client/impl/sdk/DatabricksSdkClient.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import com.databricks.jdbc.client.sqlexec.GetStatementResponse;
1616
import com.databricks.jdbc.client.sqlexec.ResultData;
1717
import com.databricks.jdbc.core.*;
18+
import com.databricks.jdbc.core.types.ComputeResource;
19+
import com.databricks.jdbc.core.types.Warehouse;
1820
import com.databricks.jdbc.driver.IDatabricksConnectionContext;
1921
import com.databricks.sdk.WorkspaceClient;
2022
import com.databricks.sdk.core.ApiClient;
@@ -77,15 +79,16 @@ public DatabricksSdkClient(
7779

7880
@Override
7981
public ImmutableSessionInfo createSession(
80-
String warehouseId, String catalog, String schema, Map<String, String> sessionConf) {
82+
ComputeResource warehouse, String catalog, String schema, Map<String, String> sessionConf) {
8183
LOGGER.debug(
8284
"public Session createSession(String warehouseId = {}, String catalog = {}, String schema = {}, Map<String, String> sessionConf = {})",
83-
warehouseId,
85+
((Warehouse) warehouse).getWarehouseId(),
8486
catalog,
8587
schema,
8688
sessionConf);
8789
// TODO: [PECO-1460] Handle sessionConf in public session API
88-
CreateSessionRequest request = new CreateSessionRequest().setWarehouseId(warehouseId);
90+
CreateSessionRequest request =
91+
new CreateSessionRequest().setWarehouseId(((Warehouse) warehouse).getWarehouseId());
8992
if (catalog != null) {
9093
request.setCatalog(catalog);
9194
}
@@ -100,16 +103,18 @@ public ImmutableSessionInfo createSession(
100103
.apiClient()
101104
.POST(SESSION_PATH, request, CreateSessionResponse.class, getHeaders());
102105
return ImmutableSessionInfo.builder()
103-
.warehouseId(warehouseId)
106+
.computeResource(warehouse)
104107
.sessionId(createSessionResponse.getSessionId())
105108
.build();
106109
}
107110

108111
@Override
109-
public void deleteSession(String sessionId, String warehouseId) {
112+
public void deleteSession(String sessionId, ComputeResource warehouse) {
110113
LOGGER.debug("public void deleteSession(String sessionId = {})", sessionId);
111114
DeleteSessionRequest request =
112-
new DeleteSessionRequest().setSessionId(sessionId).setWarehouseId(warehouseId);
115+
new DeleteSessionRequest()
116+
.setSessionId(sessionId)
117+
.setWarehouseId(((Warehouse) warehouse).getWarehouseId());
113118
String path = String.format(SESSION_PATH_WITH_ID, request.getSessionId());
114119
Map<String, String> headers = new HashMap<>();
115120
workspaceClient.apiClient().DELETE(path, request, Void.class, headers);
@@ -118,22 +123,28 @@ public void deleteSession(String sessionId, String warehouseId) {
118123
@Override
119124
public DatabricksResultSet executeStatement(
120125
String sql,
121-
String warehouseId,
126+
ComputeResource computeResource,
122127
Map<Integer, ImmutableSqlParameter> parameters,
123128
StatementType statementType,
124129
IDatabricksSession session,
125130
IDatabricksStatement parentStatement)
126131
throws SQLException {
127132
LOGGER.debug(
128-
"public DatabricksResultSet executeStatement(String sql = {}, String warehouseId = {}, Map<Integer, ImmutableSqlParameter> parameters, StatementType statementType = {}, IDatabricksSession session)",
133+
"public DatabricksResultSet executeStatement(String sql = {}, compute resource = {}, Map<Integer, ImmutableSqlParameter> parameters, StatementType statementType = {}, IDatabricksSession session)",
129134
sql,
130-
warehouseId,
135+
computeResource.toString(),
131136
statementType);
132137

133138
long pollCount = 0;
134139
long executionStartTime = Instant.now().toEpochMilli();
135140
ExecuteStatementRequest request =
136-
getRequest(statementType, sql, warehouseId, session, parameters, parentStatement);
141+
getRequest(
142+
statementType,
143+
sql,
144+
((Warehouse) computeResource).getWarehouseId(),
145+
session,
146+
parameters,
147+
parentStatement);
137148
ExecuteStatementResponse response =
138149
workspaceClient
139150
.apiClient()

src/main/java/com/databricks/jdbc/client/impl/thrift/DatabricksThriftClient.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,38 @@
55
import com.databricks.jdbc.client.StatementType;
66
import com.databricks.jdbc.client.sqlexec.ExternalLink;
77
import com.databricks.jdbc.core.*;
8+
import com.databricks.jdbc.core.types.ComputeResource;
9+
import com.databricks.jdbc.driver.IDatabricksConnectionContext;
810
import java.sql.SQLException;
911
import java.util.Collection;
1012
import java.util.Map;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
1115

1216
public class DatabricksThriftClient implements DatabricksClient, DatabricksMetadataClient {
17+
18+
private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksThriftClient.class);
19+
private final IDatabricksConnectionContext connectionContext;
20+
1321
@Override
1422
public ImmutableSessionInfo createSession(
15-
String warehouseId, String catalog, String schema, Map<String, String> sessionConf) {
23+
ComputeResource cluster, String catalog, String schema, Map<String, String> sessionConf) {
1624
throw new UnsupportedOperationException();
1725
}
1826

27+
public DatabricksThriftClient(IDatabricksConnectionContext connectionContext) {
28+
this.connectionContext = connectionContext;
29+
}
30+
1931
@Override
20-
public void deleteSession(String sessionId, String warehouseId) {
32+
public void deleteSession(String sessionId, ComputeResource cluster) {
2133
throw new UnsupportedOperationException();
2234
}
2335

2436
@Override
2537
public DatabricksResultSet executeStatement(
2638
String sql,
27-
String warehouseId,
39+
ComputeResource computeResource,
2840
Map<Integer, ImmutableSqlParameter> parameters,
2941
StatementType statementType,
3042
IDatabricksSession session,

src/main/java/com/databricks/jdbc/core/DatabricksConnection.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,16 @@ public class DatabricksConnection implements IDatabricksConnection, Connection {
3030
*
3131
* @param connectionContext underlying connection context
3232
*/
33-
public DatabricksConnection(IDatabricksConnectionContext connectionContext) {
33+
public DatabricksConnection(IDatabricksConnectionContext connectionContext)
34+
throws DatabricksSQLException {
3435
this.session = new DatabricksSession(connectionContext);
3536
this.session.open();
3637
}
3738

3839
@VisibleForTesting
3940
public DatabricksConnection(
40-
IDatabricksConnectionContext connectionContext, DatabricksClient databricksClient) {
41+
IDatabricksConnectionContext connectionContext, DatabricksClient databricksClient)
42+
throws DatabricksSQLException {
4143
this.session = new DatabricksSession(connectionContext, databricksClient);
4244
this.session.open();
4345
new DatabricksDriver().setUserAgent(connectionContext);

src/main/java/com/databricks/jdbc/core/DatabricksSession.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
import com.databricks.jdbc.client.DatabricksMetadataClient;
55
import com.databricks.jdbc.client.impl.sdk.DatabricksMetadataSdkClient;
66
import com.databricks.jdbc.client.impl.sdk.DatabricksSdkClient;
7+
import com.databricks.jdbc.client.impl.thrift.DatabricksThriftClient;
78
import com.databricks.jdbc.core.types.CompressionType;
9+
import com.databricks.jdbc.core.types.ComputeResource;
10+
import com.databricks.jdbc.core.types.Warehouse;
811
import com.databricks.jdbc.driver.IDatabricksConnectionContext;
912
import com.databricks.sdk.support.ToStringer;
1013
import com.google.common.annotations.VisibleForTesting;
@@ -19,7 +22,7 @@ public class DatabricksSession implements IDatabricksSession {
1922
private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksSession.class);
2023
private final DatabricksClient databricksClient;
2124
private final DatabricksMetadataClient databricksMetadataClient;
22-
private final String warehouseId;
25+
private final ComputeResource computeResource;
2326

2427
private boolean isSessionOpen;
2528
private ImmutableSessionInfo session;
@@ -37,13 +40,18 @@ public class DatabricksSession implements IDatabricksSession {
3740
*
3841
* @param connectionContext underlying connection context
3942
*/
40-
public DatabricksSession(IDatabricksConnectionContext connectionContext) {
41-
this.databricksClient = new DatabricksSdkClient(connectionContext);
43+
public DatabricksSession(IDatabricksConnectionContext connectionContext)
44+
throws DatabricksSQLException {
45+
if (connectionContext.isAllPurposeCluster()) {
46+
this.databricksClient = new DatabricksThriftClient(connectionContext);
47+
} else {
48+
this.databricksClient = new DatabricksSdkClient(connectionContext);
49+
}
4250
this.databricksMetadataClient =
4351
new DatabricksMetadataSdkClient((DatabricksSdkClient) databricksClient);
4452
this.isSessionOpen = false;
4553
this.session = null;
46-
this.warehouseId = connectionContext.getWarehouse();
54+
this.computeResource = connectionContext.getComputeResource();
4755
this.catalog = connectionContext.getCatalog();
4856
this.schema = connectionContext.getSchema();
4957
this.sessionConfigs = connectionContext.getSessionConfigs();
@@ -53,13 +61,14 @@ public DatabricksSession(IDatabricksConnectionContext connectionContext) {
5361
/** Construct method to be used for mocking in a test case. */
5462
@VisibleForTesting
5563
DatabricksSession(
56-
IDatabricksConnectionContext connectionContext, DatabricksClient databricksClient) {
64+
IDatabricksConnectionContext connectionContext, DatabricksClient databricksClient)
65+
throws DatabricksSQLException {
5766
this.databricksClient = databricksClient;
5867
this.databricksMetadataClient =
5968
new DatabricksMetadataSdkClient((DatabricksSdkClient) databricksClient);
6069
this.isSessionOpen = false;
6170
this.session = null;
62-
this.warehouseId = connectionContext.getWarehouse();
71+
this.computeResource = connectionContext.getComputeResource();
6372
this.catalog = connectionContext.getCatalog();
6473
this.schema = connectionContext.getSchema();
6574
this.sessionConfigs = connectionContext.getSessionConfigs();
@@ -74,9 +83,9 @@ public String getSessionId() {
7483
}
7584

7685
@Override
77-
public String getWarehouseId() {
86+
public ComputeResource getComputeResource() throws DatabricksSQLException {
7887
LOGGER.debug("public String getWarehouseId()");
79-
return warehouseId;
88+
return this.computeResource;
8089
}
8190

8291
@Override
@@ -101,7 +110,7 @@ public void open() {
101110
// TODO: handle errors
102111
this.session =
103112
databricksClient.createSession(
104-
this.warehouseId, this.catalog, this.schema, this.sessionConfigs);
113+
this.computeResource, this.catalog, this.schema, this.sessionConfigs);
105114
this.isSessionOpen = true;
106115
}
107116
}
@@ -114,7 +123,11 @@ public void close() {
114123
synchronized (this) {
115124
if (isSessionOpen) {
116125
// TODO: handle closed connections by server
117-
databricksClient.deleteSession(this.session.sessionId(), getWarehouseId());
126+
if (computeResource instanceof Warehouse) {
127+
databricksClient.deleteSession(this.session.sessionId(), computeResource);
128+
} else {
129+
130+
}
118131
this.session = null;
119132
this.isSessionOpen = false;
120133
}
@@ -160,7 +173,7 @@ public void setSchema(String schema) {
160173
@Override
161174
public String toString() {
162175
return (new ToStringer(DatabricksSession.class))
163-
.add("warehouseId", this.warehouseId)
176+
.add("compute", this.computeResource.toString())
164177
.add("catalog", this.catalog)
165178
.add("schema", this.schema)
166179
.add("sessionID", this.getSessionId())

src/main/java/com/databricks/jdbc/core/DatabricksStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ DatabricksResultSet getResultFromClient(
437437
DatabricksClient client = connection.getSession().getDatabricksClient();
438438
return client.executeStatement(
439439
sql,
440-
connection.getSession().getWarehouseId(),
440+
connection.getSession().getComputeResource(),
441441
params,
442442
statementType,
443443
connection.getSession(),

src/main/java/com/databricks/jdbc/core/IDatabricksSession.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.databricks.jdbc.client.DatabricksClient;
44
import com.databricks.jdbc.client.DatabricksMetadataClient;
55
import com.databricks.jdbc.core.types.CompressionType;
6+
import com.databricks.jdbc.core.types.ComputeResource;
67
import java.util.Map;
78
import javax.annotation.Nullable;
89

@@ -22,7 +23,7 @@ public interface IDatabricksSession {
2223
*
2324
* @return warehouse-Id
2425
*/
25-
String getWarehouseId();
26+
ComputeResource getComputeResource() throws DatabricksSQLException;
2627

2728
/**
2829
* Checks if session is open and valid.

0 commit comments

Comments
 (0)