4545import org .slf4j .LoggerFactory ;
4646
4747import java .io .Serializable ;
48+ import java .net .InetSocketAddress ;
4849import java .time .Duration ;
4950import java .util .ArrayList ;
5051import java .util .Arrays ;
5152import java .util .HashMap ;
5253import java .util .HashSet ;
5354import java .util .List ;
5455import java .util .Map ;
56+ import java .util .Properties ;
5557import java .util .Set ;
5658import java .util .concurrent .ConcurrentHashMap ;
5759import java .util .concurrent .ExecutorService ;
6971import static org .hamcrest .Matchers .is ;
7072import static org .hamcrest .Matchers .nullValue ;
7173import static org .junit .Assert .fail ;
74+ import org .terracotta .connection .ConnectionException ;
75+ import org .terracotta .connection .Diagnostics ;
76+ import org .terracotta .connection .DiagnosticsFactory ;
7277
7378
7479/**
@@ -98,7 +103,7 @@ public static Consistency[] data() {
98103
99104 @ ClassRule @ Rule
100105 public static final ParallelTestCluster CLUSTER = new ParallelTestCluster (newCluster (2 ).in (clusterPath ())
101- .withServiceFragment (offheapResource ("primary-server-resource" , 24 )).build ());
106+ .withServiceFragment (offheapResource ("primary-server-resource" , 24 )).withTcProperty ( "tc.messages.grouping.enabled" , "false" ). build ());
102107
103108 @ Rule
104109 public final TestName testName = new TestName ();
@@ -110,6 +115,8 @@ public static Consistency[] data() {
110115 private final ThreadLocalRandom random = ThreadLocalRandom .current ();
111116
112117 private final ExecutorService executorService = Executors .newWorkStealingPool (NUM_OF_THREADS );
118+
119+ private final Probe probe = new Probe ();
113120
114121 @ Before
115122 public void startServers () throws Exception {
@@ -134,6 +141,7 @@ public void startServers() throws Exception {
134141 cache2 = cacheManager2 .createCache (testName .getMethodName (), config );
135142
136143 caches = Arrays .asList (cache1 , cache2 );
144+ probe .loop ();
137145 }
138146
139147 @ After
@@ -148,9 +156,10 @@ public void tearDown() throws Exception {
148156 if (cacheManager2 != null && cacheManager2 .getStatus () != Status .UNINITIALIZED ) {
149157 cacheManager2 .close ();
150158 }
159+
151160 }
152161
153- @ Test (timeout =180000 )
162+ @ Test (timeout =360000 )
154163 public void testCRUD () throws Exception {
155164 Set <Long > universalSet = ConcurrentHashMap .newKeySet ();
156165 List <Future <?>> futures = new ArrayList <>();
@@ -192,7 +201,7 @@ public void testCRUD() throws Exception {
192201
193202 }
194203
195- @ Test (timeout =180000 )
204+ @ Test (timeout =360000 )
196205 public void testBulkOps () throws Exception {
197206 Set <Long > universalSet = ConcurrentHashMap .newKeySet ();
198207 List <Future <?>> futures = new ArrayList <>();
@@ -237,7 +246,7 @@ public void testBulkOps() throws Exception {
237246
238247 }
239248
240- @ Test (timeout =180000 )
249+ @ Test (timeout =360000 )
241250 public void testClear () throws Exception {
242251 List <Future <?>> futures = new ArrayList <>();
243252 Set <Long > universalSet = ConcurrentHashMap .newKeySet ();
@@ -287,4 +296,44 @@ private static class BlobValue implements Serializable {
287296 private final byte [] data = new byte [10 * 1024 ];
288297 }
289298
299+ public class Probe {
300+
301+ /**
302+ * @param args the command line arguments
303+ */
304+ public void loop () {
305+ String [] servers = CLUSTER .getClusterHostPorts ();
306+ for (String hostPort : servers ) {
307+ String [] hp = hostPort .split ("[:]" );
308+ InetSocketAddress inet = InetSocketAddress .createUnresolved (hp [0 ], Integer .parseInt (hp [1 ]));
309+ log .info ("starting probe for " + hostPort );
310+ new Thread (()->{
311+ while (!executorService .isShutdown ()) {
312+
313+ try (Diagnostics d = DiagnosticsFactory .connect (inet , new Properties ())) {
314+ while (!executorService .isShutdown ()) {
315+ try {
316+ probe (d );
317+ log .info ("sleeping for 10 sec." );
318+ Thread .sleep (10_000L );
319+ } catch (InterruptedException ie ) {
320+ ie .printStackTrace ();
321+ }
322+ }
323+ } catch (ConnectionException e ) {
324+ e .printStackTrace ();
325+ }
326+ }
327+
328+ }).start ();
329+ }
330+ }
331+
332+ private void probe (Diagnostics d ) {
333+ log .info ("===== PROBE =====" );
334+ log .info (d .getClusterState ());
335+ log .info (d .getThreadDump ());
336+ }
337+ }
338+
290339}
0 commit comments