From dc38997e6aeae629d8bc36adc08976c293a2b6e5 Mon Sep 17 00:00:00 2001 From: Madhav Sainanee Date: Tue, 25 Nov 2025 05:30:31 +0000 Subject: [PATCH 1/3] sea close statement --- .metals/metals.lock.db | 6 + .../jdbc/api/impl/DatabricksStatement.java | 20 ++++ .../impl/sqlexec/DatabricksSdkClient.java | 6 + .../api/impl/DatabricksStatementTest.java | 87 ++++++++++++++ .../impl/sqlexec/DatabricksSdkClientTest.java | 110 ++++++++++++++++++ 5 files changed, 229 insertions(+) create mode 100644 .metals/metals.lock.db diff --git a/.metals/metals.lock.db b/.metals/metals.lock.db new file mode 100644 index 000000000..9da718346 --- /dev/null +++ b/.metals/metals.lock.db @@ -0,0 +1,6 @@ +#FileLock +#Tue Nov 25 05:29:53 UTC 2025 +server=localhost\:37449 +hostName=localhost +method=file +id=19ab96ccaa237406155a471e7b05c1681a464efbe7f diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java index 76ef97132..dc854beea 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java @@ -816,6 +816,26 @@ void checkIfClosed() throws DatabricksSQLException { } } + /** + * Marks the statement as closed without attempting to close it on the server. This should be used + * when the server has already indicated the statement is closed. + */ + public void markAsClosed() { + LOGGER.debug("Marking statement {} as closed (server already closed)", statementId); + if (resultSet != null) { + try { + this.resultSet.close(); + } catch (DatabricksSQLException e) { + LOGGER.warn("Error closing result set: {}", e.getMessage()); + } + this.resultSet = null; + } + this.connection.closeStatement(this); + DatabricksThreadContextHolder.clearStatementInfo(); + shutDownExecutor(); + this.isClosed = true; + } + /** * Shuts down the ExecutorService used for asynchronous execution. * diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java index b8dfed049..51a1ee71d 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java @@ -274,6 +274,12 @@ public DatabricksResultSet executeStatement( if (responseState != StatementState.SUCCEEDED && responseState != StatementState.CLOSED) { handleFailedExecution(response, statementId, sql); } + + if (responseState == StatementState.CLOSED && parentStatement != null) { + LOGGER.debug("Statement {} returned CLOSED status, marking statement as closed", statementId); + ((DatabricksStatement) parentStatement.getStatement()).markAsClosed(); + } + return new DatabricksResultSet( response.getStatus(), typedStatementId, diff --git a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java index 25ca3697d..f4180bdba 100644 --- a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java +++ b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java @@ -734,6 +734,93 @@ public void testIsInsertQuery() { assertTrue(DatabricksStatement.isInsertQuery("(INSERT INTO users (id) VALUES (?))")); } + @Test + public void testMarkAsClosed() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement))) + .thenReturn(resultSet); + + // Execute a query to set up result set + statement.executeQuery(STATEMENT); + assertFalse(statement.isClosed()); + + // Mark statement as closed without attempting to close on server + statement.markAsClosed(); + + // Verify statement is closed + assertTrue(statement.isClosed()); + + // Verify that closeStatement was NOT called on the client (only on connection) + verify(client, never()).closeStatement(any(StatementId.class)); + + // Verify that the statement cannot be used anymore + assertThrows(DatabricksSQLException.class, () -> statement.executeQuery(STATEMENT)); + } + + @Test + public void testMarkAsClosedWithResultSetCloseError() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + // Create a mock result set that throws an exception on close + DatabricksResultSet mockResultSet = mock(DatabricksResultSet.class); + doThrow(new DatabricksSQLException("Error closing result set", "HY000")) + .when(mockResultSet) + .close(); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement))) + .thenReturn(mockResultSet); + + // Execute a query to set up result set + statement.executeQuery(STATEMENT); + assertFalse(statement.isClosed()); + + // Mark statement as closed - should not throw even if result set close fails + assertDoesNotThrow(() -> statement.markAsClosed()); + + // Verify statement is still closed despite result set close error + assertTrue(statement.isClosed()); + + // Verify that closeStatement was NOT called on the client + verify(client, never()).closeStatement(any(StatementId.class)); + } + + @Test + public void testMarkAsClosedWithoutResultSet() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + // Mark statement as closed without executing any query (no result set) + assertFalse(statement.isClosed()); + statement.markAsClosed(); + + // Verify statement is closed + assertTrue(statement.isClosed()); + + // Verify that closeStatement was NOT called on the client + verify(client, never()).closeStatement(any(StatementId.class)); + } + @Test public void testRemoveEmptyEscapeClauseFromQuery() throws Exception { IDatabricksConnectionContext connectionContext = diff --git a/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java b/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java index f657f91d4..a38ca96cc 100644 --- a/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java +++ b/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java @@ -722,4 +722,114 @@ public void testSeaSyncMetadataHeaderNotAddedWhenDisabled() throws Exception { }), eq(ExecuteStatementResponse.class)); } + + @Test + public void testExecuteStatementWithClosedStatus() throws Exception { + // Set up connection and statement + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksSdkClient databricksSdkClient = + new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient); + DatabricksConnection connection = + new DatabricksConnection(connectionContext, databricksSdkClient); + + // Mock session creation + CreateSessionResponse sessionResponse = new CreateSessionResponse().setSessionId(SESSION_ID); + when(apiClient.execute(any(Request.class), eq(CreateSessionResponse.class))) + .thenReturn(sessionResponse); + connection.open(); + + DatabricksStatement statement = spy(new DatabricksStatement(connection)); + statement.setMaxRows(100); + + // Create a response with CLOSED status + StatementStatus closedStatus = new StatementStatus().setState(StatementState.CLOSED); + ExecuteStatementResponse closedResponse = + new ExecuteStatementResponse() + .setStatementId(STATEMENT_ID.toSQLExecStatementId()) + .setStatus(closedStatus) + .setResult(resultData) + .setManifest( + new ResultManifest() + .setFormat(Format.JSON_ARRAY) + .setSchema(new ResultSchema().setColumns(new ArrayList<>()).setColumnCount(0L)) + .setTotalRowCount(0L)); + + when(apiClient.execute(any(Request.class), any())) + .thenAnswer( + invocationOnMock -> { + Request req = invocationOnMock.getArgument(0, Request.class); + if (req.getUrl().equals(STATEMENT_PATH)) { + return closedResponse; + } else if (req.getUrl().equals(SESSION_PATH)) { + return sessionResponse; + } + return null; + }); + + // Execute statement + databricksSdkClient.executeStatement( + STATEMENT, + warehouse, + new HashMap<>(), + StatementType.QUERY, + connection.getSession(), + statement); + + // Verify that markAsClosed was called on the statement + verify(statement, times(1)).markAsClosed(); + } + + @Test + public void testExecuteStatementWithClosedStatusAndNoParentStatement() throws Exception { + // Set up connection without parent statement + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksSdkClient databricksSdkClient = + new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient); + DatabricksConnection connection = + new DatabricksConnection(connectionContext, databricksSdkClient); + + // Mock session creation + CreateSessionResponse sessionResponse = new CreateSessionResponse().setSessionId(SESSION_ID); + when(apiClient.execute(any(Request.class), eq(CreateSessionResponse.class))) + .thenReturn(sessionResponse); + connection.open(); + + // Create a response with CLOSED status + StatementStatus closedStatus = new StatementStatus().setState(StatementState.CLOSED); + ExecuteStatementResponse closedResponse = + new ExecuteStatementResponse() + .setStatementId(STATEMENT_ID.toSQLExecStatementId()) + .setStatus(closedStatus) + .setResult(resultData) + .setManifest( + new ResultManifest() + .setFormat(Format.JSON_ARRAY) + .setSchema(new ResultSchema().setColumns(new ArrayList<>()).setColumnCount(0L)) + .setTotalRowCount(0L)); + + when(apiClient.execute(any(Request.class), any())) + .thenAnswer( + invocationOnMock -> { + Request req = invocationOnMock.getArgument(0, Request.class); + if (req.getUrl().equals(STATEMENT_PATH)) { + return closedResponse; + } else if (req.getUrl().equals(SESSION_PATH)) { + return sessionResponse; + } + return null; + }); + + // Execute statement with null parent statement - should not throw + assertDoesNotThrow( + () -> + databricksSdkClient.executeStatement( + STATEMENT, + warehouse, + new HashMap<>(), + StatementType.QUERY, + connection.getSession(), + null)); + } } From 20833a3fcf7349988baa2f5bc55a0a38d6721efe Mon Sep 17 00:00:00 2001 From: Madhav Sainanee Date: Wed, 26 Nov 2025 05:05:26 +0000 Subject: [PATCH 2/3] remove rogue file --- .metals/metals.lock.db | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 .metals/metals.lock.db diff --git a/.metals/metals.lock.db b/.metals/metals.lock.db deleted file mode 100644 index 9da718346..000000000 --- a/.metals/metals.lock.db +++ /dev/null @@ -1,6 +0,0 @@ -#FileLock -#Tue Nov 25 05:29:53 UTC 2025 -server=localhost\:37449 -hostName=localhost -method=file -id=19ab96ccaa237406155a471e7b05c1681a464efbe7f From 7da511948a58d4d6501dd81b0d9ec0ae617a6102 Mon Sep 17 00:00:00 2001 From: Madhav Sainanee Date: Thu, 27 Nov 2025 06:38:19 +0000 Subject: [PATCH 3/3] address comments --- .../jdbc/api/impl/DatabricksStatement.java | 28 +++++++------ .../api/impl/DatabricksStatementTest.java | 41 +++++++++++++++---- 2 files changed, 49 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java index dc854beea..509d8cdc8 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java @@ -106,8 +106,12 @@ public void close() throws SQLException { @Override public void close(boolean removeFromSession) throws DatabricksSQLException { LOGGER.debug("public void close(boolean removeFromSession)"); - - if (statementId == null) { + if (isClosed) { + if (resultSet != null) { + this.resultSet.close(); + this.resultSet = null; + } + } else if (statementId == null) { String warningMsg = "The statement you are trying to close does not have an ID yet."; LOGGER.warn(warningMsg); warnings = WarningUtil.addWarning(warnings, warningMsg); @@ -817,22 +821,20 @@ void checkIfClosed() throws DatabricksSQLException { } /** - * Marks the statement as closed without attempting to close it on the server. This should be used - * when the server has already indicated the statement is closed. + * Marks the statement as closed without attempting to close it on the server or clean up local + * resources. This should be used when the server has already indicated the statement is closed. + * + *

This method sets the closed flag to prevent further operations, but defers resource cleanup + * (result set, executor) to the {@link #close(boolean)} method. When {@code close()} is + * subsequently called, it will detect the statement is already closed and skip the server-side + * close operation while still cleaning up local resources. + * + * @see #close(boolean) */ public void markAsClosed() { LOGGER.debug("Marking statement {} as closed (server already closed)", statementId); - if (resultSet != null) { - try { - this.resultSet.close(); - } catch (DatabricksSQLException e) { - LOGGER.warn("Error closing result set: {}", e.getMessage()); - } - this.resultSet = null; - } this.connection.closeStatement(this); DatabricksThreadContextHolder.clearStatementInfo(); - shutDownExecutor(); this.isClosed = true; } diff --git a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java index f4180bdba..5f4f4d784 100644 --- a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java +++ b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java @@ -754,21 +754,33 @@ public void testMarkAsClosed() throws Exception { statement.executeQuery(STATEMENT); assertFalse(statement.isClosed()); - // Mark statement as closed without attempting to close on server + // Mark statement as closed without attempting to close on server or clean up resources statement.markAsClosed(); - // Verify statement is closed + // Verify statement is marked as closed assertTrue(statement.isClosed()); - // Verify that closeStatement was NOT called on the client (only on connection) + // Verify that closeStatement was NOT called on the client (server already closed it) verify(client, never()).closeStatement(any(StatementId.class)); + // Verify that result set is NOT closed yet by markAsClosed + verify(resultSet, never()).close(); + // Verify that the statement cannot be used anymore assertThrows(DatabricksSQLException.class, () -> statement.executeQuery(STATEMENT)); + + // Now call close() - it should clean up the result set without trying to close on server + statement.close(); + + // Verify that result set was closed by close() + verify(resultSet, times(1)).close(); + + // Verify that closeStatement was still NOT called on the client (already closed on server) + verify(client, never()).closeStatement(any(StatementId.class)); } @Test - public void testMarkAsClosedWithResultSetCloseError() throws Exception { + public void testMarkAsClosedThenCloseWithResultSetError() throws Exception { IDatabricksConnectionContext connectionContext = DatabricksConnectionContext.parse(JDBC_URL, new Properties()); DatabricksConnection connection = new DatabricksConnection(connectionContext, client); @@ -793,12 +805,21 @@ public void testMarkAsClosedWithResultSetCloseError() throws Exception { statement.executeQuery(STATEMENT); assertFalse(statement.isClosed()); - // Mark statement as closed - should not throw even if result set close fails + // Mark statement as closed - should not throw since it doesn't close result set assertDoesNotThrow(() -> statement.markAsClosed()); - // Verify statement is still closed despite result set close error + // Verify statement is marked as closed assertTrue(statement.isClosed()); + // Verify result set was NOT closed by markAsClosed + verify(mockResultSet, never()).close(); + + // Now call close() - it should attempt to close result set and throw the exception + assertThrows(DatabricksSQLException.class, () -> statement.close()); + + // Verify that result set close was attempted during close() + verify(mockResultSet, times(1)).close(); + // Verify that closeStatement was NOT called on the client verify(client, never()).closeStatement(any(StatementId.class)); } @@ -814,11 +835,17 @@ public void testMarkAsClosedWithoutResultSet() throws Exception { assertFalse(statement.isClosed()); statement.markAsClosed(); - // Verify statement is closed + // Verify statement is marked as closed assertTrue(statement.isClosed()); // Verify that closeStatement was NOT called on the client verify(client, never()).closeStatement(any(StatementId.class)); + + // Calling close() after markAsClosed should not throw + assertDoesNotThrow(() -> statement.close()); + + // Statement should still be closed + assertTrue(statement.isClosed()); } @Test