diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java index c5da4498d..140e665b1 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java @@ -107,8 +107,12 @@ public void close() throws SQLException { @Override public void close(boolean removeFromSession) throws DatabricksSQLException { LOGGER.debug("public void close(boolean removeFromSession)"); - - if (statementId == null) { + if (isClosed) { + if (resultSet != null) { + this.resultSet.close(); + this.resultSet = null; + } + } else if (statementId == null) { String warningMsg = "The statement you are trying to close does not have an ID yet."; LOGGER.warn(warningMsg); warnings = WarningUtil.addWarning(warnings, warningMsg); @@ -817,6 +821,24 @@ void checkIfClosed() throws DatabricksSQLException { } } + /** + * Marks the statement as closed without attempting to close it on the server or clean up local + * resources. This should be used when the server has already indicated the statement is closed. + * + *

This method sets the closed flag to prevent further operations, but defers resource cleanup + * (result set, executor) to the {@link #close(boolean)} method. When {@code close()} is + * subsequently called, it will detect the statement is already closed and skip the server-side + * close operation while still cleaning up local resources. + * + * @see #close(boolean) + */ + public void markAsClosed() { + LOGGER.debug("Marking statement {} as closed (server already closed)", statementId); + this.connection.closeStatement(this); + DatabricksThreadContextHolder.clearStatementInfo(); + this.isClosed = true; + } + /** * Shuts down the ExecutorService used for asynchronous execution. * diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java index b8dfed049..51a1ee71d 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java @@ -274,6 +274,12 @@ public DatabricksResultSet executeStatement( if (responseState != StatementState.SUCCEEDED && responseState != StatementState.CLOSED) { handleFailedExecution(response, statementId, sql); } + + if (responseState == StatementState.CLOSED && parentStatement != null) { + LOGGER.debug("Statement {} returned CLOSED status, marking statement as closed", statementId); + ((DatabricksStatement) parentStatement.getStatement()).markAsClosed(); + } + return new DatabricksResultSet( response.getStatus(), typedStatementId, diff --git a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java index 25ca3697d..5f4f4d784 100644 --- a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java +++ b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java @@ -734,6 +734,120 @@ public void testIsInsertQuery() { assertTrue(DatabricksStatement.isInsertQuery("(INSERT INTO users (id) VALUES (?))")); } + @Test + public void testMarkAsClosed() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement))) + .thenReturn(resultSet); + + // Execute a query to set up result set + statement.executeQuery(STATEMENT); + assertFalse(statement.isClosed()); + + // Mark statement as closed without attempting to close on server or clean up resources + statement.markAsClosed(); + + // Verify statement is marked as closed + assertTrue(statement.isClosed()); + + // Verify that closeStatement was NOT called on the client (server already closed it) + verify(client, never()).closeStatement(any(StatementId.class)); + + // Verify that result set is NOT closed yet by markAsClosed + verify(resultSet, never()).close(); + + // Verify that the statement cannot be used anymore + assertThrows(DatabricksSQLException.class, () -> statement.executeQuery(STATEMENT)); + + // Now call close() - it should clean up the result set without trying to close on server + statement.close(); + + // Verify that result set was closed by close() + verify(resultSet, times(1)).close(); + + // Verify that closeStatement was still NOT called on the client (already closed on server) + verify(client, never()).closeStatement(any(StatementId.class)); + } + + @Test + public void testMarkAsClosedThenCloseWithResultSetError() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + // Create a mock result set that throws an exception on close + DatabricksResultSet mockResultSet = mock(DatabricksResultSet.class); + doThrow(new DatabricksSQLException("Error closing result set", "HY000")) + .when(mockResultSet) + .close(); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement))) + .thenReturn(mockResultSet); + + // Execute a query to set up result set + statement.executeQuery(STATEMENT); + assertFalse(statement.isClosed()); + + // Mark statement as closed - should not throw since it doesn't close result set + assertDoesNotThrow(() -> statement.markAsClosed()); + + // Verify statement is marked as closed + assertTrue(statement.isClosed()); + + // Verify result set was NOT closed by markAsClosed + verify(mockResultSet, never()).close(); + + // Now call close() - it should attempt to close result set and throw the exception + assertThrows(DatabricksSQLException.class, () -> statement.close()); + + // Verify that result set close was attempted during close() + verify(mockResultSet, times(1)).close(); + + // Verify that closeStatement was NOT called on the client + verify(client, never()).closeStatement(any(StatementId.class)); + } + + @Test + public void testMarkAsClosedWithoutResultSet() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + // Mark statement as closed without executing any query (no result set) + assertFalse(statement.isClosed()); + statement.markAsClosed(); + + // Verify statement is marked as closed + assertTrue(statement.isClosed()); + + // Verify that closeStatement was NOT called on the client + verify(client, never()).closeStatement(any(StatementId.class)); + + // Calling close() after markAsClosed should not throw + assertDoesNotThrow(() -> statement.close()); + + // Statement should still be closed + assertTrue(statement.isClosed()); + } + @Test public void testRemoveEmptyEscapeClauseFromQuery() throws Exception { IDatabricksConnectionContext connectionContext = diff --git a/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java b/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java index f657f91d4..a38ca96cc 100644 --- a/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java +++ b/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java @@ -722,4 +722,114 @@ public void testSeaSyncMetadataHeaderNotAddedWhenDisabled() throws Exception { }), eq(ExecuteStatementResponse.class)); } + + @Test + public void testExecuteStatementWithClosedStatus() throws Exception { + // Set up connection and statement + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksSdkClient databricksSdkClient = + new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient); + DatabricksConnection connection = + new DatabricksConnection(connectionContext, databricksSdkClient); + + // Mock session creation + CreateSessionResponse sessionResponse = new CreateSessionResponse().setSessionId(SESSION_ID); + when(apiClient.execute(any(Request.class), eq(CreateSessionResponse.class))) + .thenReturn(sessionResponse); + connection.open(); + + DatabricksStatement statement = spy(new DatabricksStatement(connection)); + statement.setMaxRows(100); + + // Create a response with CLOSED status + StatementStatus closedStatus = new StatementStatus().setState(StatementState.CLOSED); + ExecuteStatementResponse closedResponse = + new ExecuteStatementResponse() + .setStatementId(STATEMENT_ID.toSQLExecStatementId()) + .setStatus(closedStatus) + .setResult(resultData) + .setManifest( + new ResultManifest() + .setFormat(Format.JSON_ARRAY) + .setSchema(new ResultSchema().setColumns(new ArrayList<>()).setColumnCount(0L)) + .setTotalRowCount(0L)); + + when(apiClient.execute(any(Request.class), any())) + .thenAnswer( + invocationOnMock -> { + Request req = invocationOnMock.getArgument(0, Request.class); + if (req.getUrl().equals(STATEMENT_PATH)) { + return closedResponse; + } else if (req.getUrl().equals(SESSION_PATH)) { + return sessionResponse; + } + return null; + }); + + // Execute statement + databricksSdkClient.executeStatement( + STATEMENT, + warehouse, + new HashMap<>(), + StatementType.QUERY, + connection.getSession(), + statement); + + // Verify that markAsClosed was called on the statement + verify(statement, times(1)).markAsClosed(); + } + + @Test + public void testExecuteStatementWithClosedStatusAndNoParentStatement() throws Exception { + // Set up connection without parent statement + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksSdkClient databricksSdkClient = + new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient); + DatabricksConnection connection = + new DatabricksConnection(connectionContext, databricksSdkClient); + + // Mock session creation + CreateSessionResponse sessionResponse = new CreateSessionResponse().setSessionId(SESSION_ID); + when(apiClient.execute(any(Request.class), eq(CreateSessionResponse.class))) + .thenReturn(sessionResponse); + connection.open(); + + // Create a response with CLOSED status + StatementStatus closedStatus = new StatementStatus().setState(StatementState.CLOSED); + ExecuteStatementResponse closedResponse = + new ExecuteStatementResponse() + .setStatementId(STATEMENT_ID.toSQLExecStatementId()) + .setStatus(closedStatus) + .setResult(resultData) + .setManifest( + new ResultManifest() + .setFormat(Format.JSON_ARRAY) + .setSchema(new ResultSchema().setColumns(new ArrayList<>()).setColumnCount(0L)) + .setTotalRowCount(0L)); + + when(apiClient.execute(any(Request.class), any())) + .thenAnswer( + invocationOnMock -> { + Request req = invocationOnMock.getArgument(0, Request.class); + if (req.getUrl().equals(STATEMENT_PATH)) { + return closedResponse; + } else if (req.getUrl().equals(SESSION_PATH)) { + return sessionResponse; + } + return null; + }); + + // Execute statement with null parent statement - should not throw + assertDoesNotThrow( + () -> + databricksSdkClient.executeStatement( + STATEMENT, + warehouse, + new HashMap<>(), + StatementType.QUERY, + connection.getSession(), + null)); + } }