Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,80 @@ public Client alias(final Map<String, String> aliases) {
}
}

/**
* A {@code Client} implementation that operates in the context of a session. {@code ChildSessionClient} is tied to
* another client as a child, it borrows the connection from the parent client's pool for the transaction. Requests are
* sent to a single server, where each request is bound to the same thread and same connection with the same set of
* bindings across requests.
* Transaction are not automatically committed. It is up the client to issue commit/rollback commands.
*/
public final static class SessionedChildClient extends Client {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would have been best if there was a solution that kept this class package-private, as it now differs from the other Clients that are only supposed to be exposed via Cluster. It's fine for this PR, but I'd probably recommend adding a Jira to look into this again in the future after this is merged.

private final String sessionId;
private final boolean manageTransactions;
private final boolean maintainStateAfterException;
private final Client parentClient;
private Connection borrowedConnection;
private boolean closed;

public SessionedChildClient(final Client parentClient, String sessionId) {
super(parentClient.cluster, parentClient.settings);
this.parentClient = parentClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense for any type of Client to be the parent? SessionedClient only allows for one connection so maybe there should be a check here that disallows that particular client?

this.sessionId = sessionId;
this.closed = false;
this.manageTransactions = parentClient.settings.getSession().map(s -> s.manageTransactions).orElse(false);
this.maintainStateAfterException = parentClient.settings.getSession().map(s -> s.maintainStateAfterException).orElse(false);
}

public String getSessionId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return sessionId;
}

@Override
public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) {
builder.processor("session");
builder.addArg(Tokens.ARGS_SESSION, sessionId);
builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, manageTransactions);
builder.addArg(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION, maintainStateAfterException);
return builder;
}

@Override
protected void initializeImplementation() {
// do nothing, parentClient is already initialized
}

@Override
protected synchronized Connection chooseConnection(RequestMessage msg) throws TimeoutException, ConnectionException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
protected synchronized Connection chooseConnection(RequestMessage msg) throws TimeoutException, ConnectionException {
protected synchronized Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's the overhead we are trying to reduce. I have updated the PR description as well.

if (borrowedConnection == null) {
//Borrow from parentClient's pool instead of creating new connection
borrowedConnection = parentClient.chooseConnection(msg);
}
//Increment everytime, the connection is chosen, all these will be decremented when transaction is commited/rolledback
borrowedConnection.borrowed.incrementAndGet();
return borrowedConnection;
}

@Override
public synchronized CompletableFuture<Void> closeAsync() {
if (borrowedConnection != null && !borrowedConnection.isDead()) {

//Decrement one last time which was incremented by parentClient when the connection is borrowed initially
borrowedConnection.borrowed.decrementAndGet();
borrowedConnection.returnToPool();

borrowedConnection = null;
}
closed = true;
return CompletableFuture.completedFuture(null);
}

@Override
public boolean isClosing() {
return parentClient.isClosing() || closed;
}
}


/**
* A {@code Client} implementation that operates in the context of a session. Requests are sent to a single
* server, where each request is bound to the same thread with the same set of bindings across requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab
return requestPromise;
}

private void returnToPool() {
public void returnToPool() {
try {
if (pool != null) pool.returnConnection(this);
} catch (ConnectionException ce) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tinkerpop.gremlin.driver.remote;

import org.apache.commons.configuration2.Configuration;
import org.apache.tinkerpop.gremlin.driver.Channelizer;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
Expand Down Expand Up @@ -259,9 +260,16 @@ public void close() throws Exception {
*/
@Override
public Transaction tx() {
final DriverRemoteConnection session = new DriverRemoteConnection(
client.getCluster().connect(UUID.randomUUID().toString()), remoteTraversalSourceName, true);
return new DriverRemoteTransaction(session);
if (client.getCluster().getChannelizer().equalsIgnoreCase(Channelizer.HttpChannelizer.class.getName())) {
throw new IllegalStateException(String.format("Cannot use sessions or tx() with %s", Channelizer.HttpChannelizer.class.getSimpleName()));
}
client.init();
final Client.SessionedChildClient childSessionClient = new Client.SessionedChildClient(
this.client, // parent client with existing connection pool
UUID.randomUUID().toString());

return new DriverRemoteTransaction(
new DriverRemoteConnection(childSessionClient, remoteTraversalSourceName, true));
}

@Override
Expand Down
Loading