File tree Expand file tree Collapse file tree 2 files changed +42
-3
lines changed
driver-core/src/test/java/com/datastax/driver/core Expand file tree Collapse file tree 2 files changed +42
-3
lines changed Original file line number Diff line number Diff line change @@ -50,7 +50,12 @@ public void connectionLeakTest() throws Exception {
5050 assertOpenConnections (1 , cluster );
5151
5252 // ensure sessions.size() returns with 1 control connection + core pool size.
53- int corePoolSize = TestUtils .numberOfLocalCoreConnections (cluster );
53+ int corePoolSize ;
54+ if (ccm ().getScyllaVersion () != null ) {
55+ corePoolSize = TestUtils .numberOfLocalCoreConnectionsSharded (cluster );
56+ } else {
57+ corePoolSize = TestUtils .numberOfLocalCoreConnections (cluster );
58+ }
5459 Session session = cluster .connect ();
5560
5661 assertThat (cluster .manager .sessions .size ()).isEqualTo (1 );
@@ -114,13 +119,21 @@ public void should_not_leak_session_when_wrong_keyspace() throws Exception {
114119 // Ensure no channels remain open.
115120 channelMonitor .stop ();
116121 channelMonitor .report ();
117- assertThat (channelMonitor .openChannels (ccm ().addressOfNode (1 ), ccm ().addressOfNode (2 )).size ())
122+ assertThat (
123+ channelMonitor
124+ .openChannelsPortAgnostic (
125+ ccm ().addressOfNode (1 ).getAddress (), ccm ().addressOfNode (2 ).getAddress ())
126+ .size ())
118127 .isEqualTo (0 );
119128 }
120129
121130 private void assertOpenConnections (int expected , Cluster cluster ) {
122131 assertThat (cluster .getMetrics ().getOpenConnections ().getValue ()).isEqualTo (expected );
123- assertThat (channelMonitor .openChannels (ccm ().addressOfNode (1 ), ccm ().addressOfNode (2 )).size ())
132+ assertThat (
133+ channelMonitor
134+ .openChannelsPortAgnostic (
135+ ccm ().addressOfNode (1 ).getAddress (), ccm ().addressOfNode (2 ).getAddress ())
136+ .size ())
124137 .isEqualTo (expected );
125138 }
126139}
Original file line number Diff line number Diff line change 2626import io .netty .channel .socket .SocketChannel ;
2727import java .io .Closeable ;
2828import java .io .IOException ;
29+ import java .net .InetAddress ;
2930import java .net .InetSocketAddress ;
3031import java .util .Arrays ;
3132import java .util .Collection ;
@@ -192,6 +193,31 @@ public boolean apply(SocketChannel input) {
192193 return channels ;
193194 }
194195
196+ public Collection <SocketChannel > openChannelsPortAgnostic (InetAddress ... addresses ) {
197+ return openChannelsPortAgnostic (Arrays .asList (addresses ));
198+ }
199+
200+ /**
201+ * @param addresses The InetAddresses to include. The port is ignored in this case.
202+ * @return Open channels matching the given InetAddresses.
203+ */
204+ public Collection <SocketChannel > openChannelsPortAgnostic (
205+ final Collection <InetAddress > addresses ) {
206+ List <SocketChannel > channels =
207+ Lists .newArrayList (
208+ matchingChannels (
209+ new Predicate <SocketChannel >() {
210+ @ Override
211+ public boolean apply (SocketChannel input ) {
212+ return input .isOpen ()
213+ && input .remoteAddress () != null
214+ && addresses .contains (input .remoteAddress ().getAddress ());
215+ }
216+ }));
217+ Collections .sort (channels , BY_REMOTE_ADDRESS );
218+ return channels ;
219+ }
220+
195221 /**
196222 * @param channelFilter {@link Predicate} to use to determine whether or not a socket shall be
197223 * considered.
You can’t perform that action at this time.
0 commit comments