@@ -773,6 +773,80 @@ public Client alias(final Map<String, String> aliases) {
773773 }
774774 }
775775
776+ /**
777+ * A {@code Client} implementation that operates in the context of a session. {@code ChildSessionClient} is tied to
778+ * another client as a child, it borrows the connection from the parent client's pool for the transaction. Requests are
779+ * sent to a single server, where each request is bound to the same thread and same connection with the same set of
780+ * bindings across requests.
781+ * Transaction are not automatically committed. It is up the client to issue commit/rollback commands.
782+ */
783+ public final static class SessionedChildClient extends Client {
784+ private final String sessionId ;
785+ private final boolean manageTransactions ;
786+ private final boolean maintainStateAfterException ;
787+ private final Client parentClient ;
788+ private Connection borrowedConnection ;
789+ private boolean closed ;
790+
791+ public SessionedChildClient (final Client parentClient , String sessionId ) {
792+ super (parentClient .cluster , parentClient .settings );
793+ this .parentClient = parentClient ;
794+ this .sessionId = sessionId ;
795+ this .closed = false ;
796+ this .manageTransactions = parentClient .settings .getSession ().map (s -> s .manageTransactions ).orElse (false );
797+ this .maintainStateAfterException = parentClient .settings .getSession ().map (s -> s .maintainStateAfterException ).orElse (false );
798+ }
799+
800+ public String getSessionId () {
801+ return sessionId ;
802+ }
803+
804+ @ Override
805+ public RequestMessage .Builder buildMessage (final RequestMessage .Builder builder ) {
806+ builder .processor ("session" );
807+ builder .addArg (Tokens .ARGS_SESSION , sessionId );
808+ builder .addArg (Tokens .ARGS_MANAGE_TRANSACTION , manageTransactions );
809+ builder .addArg (Tokens .ARGS_MAINTAIN_STATE_AFTER_EXCEPTION , maintainStateAfterException );
810+ return builder ;
811+ }
812+
813+ @ Override
814+ protected void initializeImplementation () {
815+ // do nothing, parentClient is already initialized
816+ }
817+
818+ @ Override
819+ protected synchronized Connection chooseConnection (RequestMessage msg ) throws TimeoutException , ConnectionException {
820+ if (borrowedConnection == null ) {
821+ //Borrow from parentClient's pool instead of creating new connection
822+ borrowedConnection = parentClient .chooseConnection (msg );
823+ }
824+ //Increment everytime, the connection is chosen, all these will be decremented when transaction is commited/rolledback
825+ borrowedConnection .borrowed .incrementAndGet ();
826+ return borrowedConnection ;
827+ }
828+
829+ @ Override
830+ public synchronized CompletableFuture <Void > closeAsync () {
831+ if (borrowedConnection != null && !borrowedConnection .isDead ()) {
832+
833+ //Decrement one last time which was incremented by parentClient when the connection is borrowed initially
834+ borrowedConnection .borrowed .decrementAndGet ();
835+ borrowedConnection .returnToPool ();
836+
837+ borrowedConnection = null ;
838+ }
839+ closed = true ;
840+ return CompletableFuture .completedFuture (null );
841+ }
842+
843+ @ Override
844+ public boolean isClosing () {
845+ return parentClient .isClosing () || closed ;
846+ }
847+ }
848+
849+
776850 /**
777851 * A {@code Client} implementation that operates in the context of a session. Requests are sent to a single
778852 * server, where each request is bound to the same thread with the same set of bindings across requests.
0 commit comments