diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index fc706b85369..8dbd22072c2 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -773,6 +773,80 @@ public Client alias(final Map aliases) { } } + /** + * A {@code Client} implementation that operates in the context of a session. {@code ChildSessionClient} is tied to + * another client as a child, it borrows the connection from the parent client's pool for the transaction. Requests are + * sent to a single server, where each request is bound to the same thread and same connection with the same set of + * bindings across requests. + * Transaction are not automatically committed. It is up the client to issue commit/rollback commands. + */ + public final static class SessionedChildClient extends Client { + private final String sessionId; + private final boolean manageTransactions; + private final boolean maintainStateAfterException; + private final Client parentClient; + private Connection borrowedConnection; + private boolean closed; + + public SessionedChildClient(final Client parentClient, String sessionId) { + super(parentClient.cluster, parentClient.settings); + this.parentClient = parentClient; + this.sessionId = sessionId; + this.closed = false; + this.manageTransactions = parentClient.settings.getSession().map(s -> s.manageTransactions).orElse(false); + this.maintainStateAfterException = parentClient.settings.getSession().map(s -> s.maintainStateAfterException).orElse(false); + } + + public String getSessionId() { + return sessionId; + } + + @Override + public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) { + builder.processor("session"); + builder.addArg(Tokens.ARGS_SESSION, sessionId); + builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, manageTransactions); + builder.addArg(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION, maintainStateAfterException); + return builder; + } + + @Override + protected void initializeImplementation() { + // do nothing, parentClient is already initialized + } + + @Override + protected synchronized Connection chooseConnection(RequestMessage msg) throws TimeoutException, ConnectionException { + if (borrowedConnection == null) { + //Borrow from parentClient's pool instead of creating new connection + borrowedConnection = parentClient.chooseConnection(msg); + } + //Increment everytime, the connection is chosen, all these will be decremented when transaction is commited/rolledback + borrowedConnection.borrowed.incrementAndGet(); + return borrowedConnection; + } + + @Override + public synchronized CompletableFuture closeAsync() { + if (borrowedConnection != null && !borrowedConnection.isDead()) { + + //Decrement one last time which was incremented by parentClient when the connection is borrowed initially + borrowedConnection.borrowed.decrementAndGet(); + borrowedConnection.returnToPool(); + + borrowedConnection = null; + } + closed = true; + return CompletableFuture.completedFuture(null); + } + + @Override + public boolean isClosing() { + return parentClient.isClosing() || closed; + } + } + + /** * A {@code Client} implementation that operates in the context of a session. Requests are sent to a single * server, where each request is bound to the same thread with the same set of bindings across requests. diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 749789a86c4..3633d397c68 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -267,7 +267,7 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab return requestPromise; } - private void returnToPool() { + public void returnToPool() { try { if (pool != null) pool.returnConnection(this); } catch (ConnectionException ce) { diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java index ee2364ec445..f2db3ef99f9 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java @@ -19,6 +19,7 @@ package org.apache.tinkerpop.gremlin.driver.remote; import org.apache.commons.configuration2.Configuration; +import org.apache.tinkerpop.gremlin.driver.Channelizer; import org.apache.tinkerpop.gremlin.driver.Client; import org.apache.tinkerpop.gremlin.driver.Cluster; import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection; @@ -259,9 +260,16 @@ public void close() throws Exception { */ @Override public Transaction tx() { - final DriverRemoteConnection session = new DriverRemoteConnection( - client.getCluster().connect(UUID.randomUUID().toString()), remoteTraversalSourceName, true); - return new DriverRemoteTransaction(session); + if (client.getCluster().getChannelizer().equalsIgnoreCase(Channelizer.HttpChannelizer.class.getName())) { + throw new IllegalStateException(String.format("Cannot use sessions or tx() with %s", Channelizer.HttpChannelizer.class.getSimpleName())); + } + client.init(); + final Client.SessionedChildClient childSessionClient = new Client.SessionedChildClient( + this.client, // parent client with existing connection pool + UUID.randomUUID().toString()); + + return new DriverRemoteTransaction( + new DriverRemoteConnection(childSessionClient, remoteTraversalSourceName, true)); } @Override