4545import org .slf4j .LoggerFactory ;
4646
4747import java .io .Serializable ;
48+ import java .net .InetSocketAddress ;
4849import java .time .Duration ;
4950import java .util .ArrayList ;
5051import java .util .Arrays ;
5152import java .util .HashMap ;
5253import java .util .HashSet ;
5354import java .util .List ;
5455import java .util .Map ;
56+ import java .util .Properties ;
5557import java .util .Set ;
5658import java .util .concurrent .ConcurrentHashMap ;
5759import java .util .concurrent .ExecutorService ;
6264import java .util .concurrent .TimeoutException ;
6365
6466import static org .ehcache .testing .StandardCluster .clusterPath ;
67+ import static org .ehcache .testing .StandardCluster .leaseLength ;
6568import static org .ehcache .testing .StandardCluster .newCluster ;
6669import static org .ehcache .testing .StandardCluster .offheapResource ;
6770import static org .hamcrest .MatcherAssert .assertThat ;
6871import static org .hamcrest .Matchers .equalTo ;
6972import static org .hamcrest .Matchers .is ;
7073import static org .hamcrest .Matchers .nullValue ;
7174import static org .junit .Assert .fail ;
75+ import org .terracotta .connection .ConnectionException ;
76+ import org .terracotta .connection .Diagnostics ;
77+ import org .terracotta .connection .DiagnosticsFactory ;
7278
7379
7480/**
@@ -98,7 +104,8 @@ public static Consistency[] data() {
98104
99105 @ ClassRule @ Rule
100106 public static final ParallelTestCluster CLUSTER = new ParallelTestCluster (newCluster (2 ).in (clusterPath ())
101- .withServiceFragment (offheapResource ("primary-server-resource" , 24 )).build ());
107+ .withServiceFragment (offheapResource ("primary-server-resource" , 24 )).withServiceFragment (leaseLength (Duration .ofDays (1 )))
108+ .build ());
102109
103110 @ Rule
104111 public final TestName testName = new TestName ();
@@ -110,6 +117,8 @@ public static Consistency[] data() {
110117 private final ThreadLocalRandom random = ThreadLocalRandom .current ();
111118
112119 private final ExecutorService executorService = Executors .newWorkStealingPool (NUM_OF_THREADS );
120+
121+ private final Probe probe = new Probe ();
113122
114123 @ Before
115124 public void startServers () throws Exception {
@@ -134,6 +143,7 @@ public void startServers() throws Exception {
134143 cache2 = cacheManager2 .createCache (testName .getMethodName (), config );
135144
136145 caches = Arrays .asList (cache1 , cache2 );
146+ probe .loop ();
137147 }
138148
139149 @ After
@@ -148,6 +158,7 @@ public void tearDown() throws Exception {
148158 if (cacheManager2 != null && cacheManager2 .getStatus () != Status .UNINITIALIZED ) {
149159 cacheManager2 .close ();
150160 }
161+
151162 }
152163
153164 @ Test (timeout =180000 )
@@ -287,4 +298,44 @@ private static class BlobValue implements Serializable {
287298 private final byte [] data = new byte [10 * 1024 ];
288299 }
289300
301+ public class Probe {
302+
303+ /**
304+ * @param args the command line arguments
305+ */
306+ public void loop () {
307+ String [] servers = CLUSTER .getClusterHostPorts ();
308+ for (String hostPort : servers ) {
309+ String [] hp = hostPort .split ("[:]" );
310+ InetSocketAddress inet = InetSocketAddress .createUnresolved (hp [0 ], Integer .parseInt (hp [1 ]));
311+ log .info ("starting probe for " + hostPort );
312+ new Thread (()->{
313+ while (!executorService .isShutdown ()) {
314+
315+ try (Diagnostics d = DiagnosticsFactory .connect (inet , new Properties ())) {
316+ while (!executorService .isShutdown ()) {
317+ try {
318+ probe (d );
319+ log .info ("sleeping for 10 sec." );
320+ Thread .sleep (10_000L );
321+ } catch (InterruptedException ie ) {
322+ ie .printStackTrace ();
323+ }
324+ }
325+ } catch (ConnectionException e ) {
326+ e .printStackTrace ();
327+ }
328+ }
329+
330+ }).start ();
331+ }
332+ }
333+
334+ private void probe (Diagnostics d ) {
335+ log .info ("===== PROBE =====" );
336+ log .info (d .getClusterState ());
337+ log .info (d .getThreadDump ());
338+ }
339+ }
340+
290341}
0 commit comments