Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,12 @@ public void close() throws SQLException {
@Override
public void close(boolean removeFromSession) throws DatabricksSQLException {
LOGGER.debug("public void close(boolean removeFromSession)");

if (statementId == null) {
if (isClosed) {
if (resultSet != null) {
this.resultSet.close();
this.resultSet = null;
}
} else if (statementId == null) {
String warningMsg = "The statement you are trying to close does not have an ID yet.";
LOGGER.warn(warningMsg);
warnings = WarningUtil.addWarning(warnings, warningMsg);
Expand Down Expand Up @@ -816,6 +820,24 @@ void checkIfClosed() throws DatabricksSQLException {
}
}

/**
* Marks the statement as closed without attempting to close it on the server or clean up local
* resources. This should be used when the server has already indicated the statement is closed.
*
* <p>This method sets the closed flag to prevent further operations, but defers resource cleanup
* (result set, executor) to the {@link #close(boolean)} method. When {@code close()} is
* subsequently called, it will detect the statement is already closed and skip the server-side
* close operation while still cleaning up local resources.
*
* @see #close(boolean)
*/
public void markAsClosed() {
LOGGER.debug("Marking statement {} as closed (server already closed)", statementId);
this.connection.closeStatement(this);
DatabricksThreadContextHolder.clearStatementInfo();
this.isClosed = true;
}

/**
* Shuts down the ExecutorService used for asynchronous execution.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ public DatabricksResultSet executeStatement(
if (responseState != StatementState.SUCCEEDED && responseState != StatementState.CLOSED) {
handleFailedExecution(response, statementId, sql);
}

if (responseState == StatementState.CLOSED && parentStatement != null) {
LOGGER.debug("Statement {} returned CLOSED status, marking statement as closed", statementId);
((DatabricksStatement) parentStatement.getStatement()).markAsClosed();
}

return new DatabricksResultSet(
response.getStatus(),
typedStatementId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,120 @@ public void testIsInsertQuery() {
assertTrue(DatabricksStatement.isInsertQuery("(INSERT INTO users (id) VALUES (?))"));
}

@Test
public void testMarkAsClosed() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);

when(client.executeStatement(
eq(STATEMENT),
eq(new Warehouse(WAREHOUSE_ID)),
eq(new HashMap<>()),
eq(StatementType.QUERY),
any(IDatabricksSession.class),
eq(statement)))
.thenReturn(resultSet);

// Execute a query to set up result set
statement.executeQuery(STATEMENT);
assertFalse(statement.isClosed());

// Mark statement as closed without attempting to close on server or clean up resources
statement.markAsClosed();

// Verify statement is marked as closed
assertTrue(statement.isClosed());

// Verify that closeStatement was NOT called on the client (server already closed it)
verify(client, never()).closeStatement(any(StatementId.class));

// Verify that result set is NOT closed yet by markAsClosed
verify(resultSet, never()).close();

// Verify that the statement cannot be used anymore
assertThrows(DatabricksSQLException.class, () -> statement.executeQuery(STATEMENT));

// Now call close() - it should clean up the result set without trying to close on server
statement.close();

// Verify that result set was closed by close()
verify(resultSet, times(1)).close();

// Verify that closeStatement was still NOT called on the client (already closed on server)
verify(client, never()).closeStatement(any(StatementId.class));
}

@Test
public void testMarkAsClosedThenCloseWithResultSetError() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);

// Create a mock result set that throws an exception on close
DatabricksResultSet mockResultSet = mock(DatabricksResultSet.class);
doThrow(new DatabricksSQLException("Error closing result set", "HY000"))
.when(mockResultSet)
.close();

when(client.executeStatement(
eq(STATEMENT),
eq(new Warehouse(WAREHOUSE_ID)),
eq(new HashMap<>()),
eq(StatementType.QUERY),
any(IDatabricksSession.class),
eq(statement)))
.thenReturn(mockResultSet);

// Execute a query to set up result set
statement.executeQuery(STATEMENT);
assertFalse(statement.isClosed());

// Mark statement as closed - should not throw since it doesn't close result set
assertDoesNotThrow(() -> statement.markAsClosed());

// Verify statement is marked as closed
assertTrue(statement.isClosed());

// Verify result set was NOT closed by markAsClosed
verify(mockResultSet, never()).close();

// Now call close() - it should attempt to close result set and throw the exception
assertThrows(DatabricksSQLException.class, () -> statement.close());

// Verify that result set close was attempted during close()
verify(mockResultSet, times(1)).close();

// Verify that closeStatement was NOT called on the client
verify(client, never()).closeStatement(any(StatementId.class));
}

@Test
public void testMarkAsClosedWithoutResultSet() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);

// Mark statement as closed without executing any query (no result set)
assertFalse(statement.isClosed());
statement.markAsClosed();

// Verify statement is marked as closed
assertTrue(statement.isClosed());

// Verify that closeStatement was NOT called on the client
verify(client, never()).closeStatement(any(StatementId.class));

// Calling close() after markAsClosed should not throw
assertDoesNotThrow(() -> statement.close());

// Statement should still be closed
assertTrue(statement.isClosed());
}

@Test
public void testRemoveEmptyEscapeClauseFromQuery() throws Exception {
IDatabricksConnectionContext connectionContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,4 +722,114 @@ public void testSeaSyncMetadataHeaderNotAddedWhenDisabled() throws Exception {
}),
eq(ExecuteStatementResponse.class));
}

@Test
public void testExecuteStatementWithClosedStatus() throws Exception {
// Set up connection and statement
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksSdkClient databricksSdkClient =
new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient);
DatabricksConnection connection =
new DatabricksConnection(connectionContext, databricksSdkClient);

// Mock session creation
CreateSessionResponse sessionResponse = new CreateSessionResponse().setSessionId(SESSION_ID);
when(apiClient.execute(any(Request.class), eq(CreateSessionResponse.class)))
.thenReturn(sessionResponse);
connection.open();

DatabricksStatement statement = spy(new DatabricksStatement(connection));
statement.setMaxRows(100);

// Create a response with CLOSED status
StatementStatus closedStatus = new StatementStatus().setState(StatementState.CLOSED);
ExecuteStatementResponse closedResponse =
new ExecuteStatementResponse()
.setStatementId(STATEMENT_ID.toSQLExecStatementId())
.setStatus(closedStatus)
.setResult(resultData)
.setManifest(
new ResultManifest()
.setFormat(Format.JSON_ARRAY)
.setSchema(new ResultSchema().setColumns(new ArrayList<>()).setColumnCount(0L))
.setTotalRowCount(0L));

when(apiClient.execute(any(Request.class), any()))
.thenAnswer(
invocationOnMock -> {
Request req = invocationOnMock.getArgument(0, Request.class);
if (req.getUrl().equals(STATEMENT_PATH)) {
return closedResponse;
} else if (req.getUrl().equals(SESSION_PATH)) {
return sessionResponse;
}
return null;
});

// Execute statement
databricksSdkClient.executeStatement(
STATEMENT,
warehouse,
new HashMap<>(),
StatementType.QUERY,
connection.getSession(),
statement);

// Verify that markAsClosed was called on the statement
verify(statement, times(1)).markAsClosed();
}

@Test
public void testExecuteStatementWithClosedStatusAndNoParentStatement() throws Exception {
// Set up connection without parent statement
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksSdkClient databricksSdkClient =
new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient);
DatabricksConnection connection =
new DatabricksConnection(connectionContext, databricksSdkClient);

// Mock session creation
CreateSessionResponse sessionResponse = new CreateSessionResponse().setSessionId(SESSION_ID);
when(apiClient.execute(any(Request.class), eq(CreateSessionResponse.class)))
.thenReturn(sessionResponse);
connection.open();

// Create a response with CLOSED status
StatementStatus closedStatus = new StatementStatus().setState(StatementState.CLOSED);
ExecuteStatementResponse closedResponse =
new ExecuteStatementResponse()
.setStatementId(STATEMENT_ID.toSQLExecStatementId())
.setStatus(closedStatus)
.setResult(resultData)
.setManifest(
new ResultManifest()
.setFormat(Format.JSON_ARRAY)
.setSchema(new ResultSchema().setColumns(new ArrayList<>()).setColumnCount(0L))
.setTotalRowCount(0L));

when(apiClient.execute(any(Request.class), any()))
.thenAnswer(
invocationOnMock -> {
Request req = invocationOnMock.getArgument(0, Request.class);
if (req.getUrl().equals(STATEMENT_PATH)) {
return closedResponse;
} else if (req.getUrl().equals(SESSION_PATH)) {
return sessionResponse;
}
return null;
});

// Execute statement with null parent statement - should not throw
assertDoesNotThrow(
() ->
databricksSdkClient.executeStatement(
STATEMENT,
warehouse,
new HashMap<>(),
StatementType.QUERY,
connection.getSession(),
null));
}
}
Loading