Skip to content

Commit 3e2082c

Browse files
committed
Upgrade to TC Core 5.10.8 & Platform 5.10.7 : Adopt new async client implementation
1 parent 7818ce8 commit 3e2082c

File tree

14 files changed

+199
-146
lines changed

14 files changed

+199
-146
lines changed

clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/SimpleClusterTierManagerClientEntity.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@
3030
import org.terracotta.entity.EndpointDelegate;
3131
import org.terracotta.entity.EntityClientEndpoint;
3232
import org.terracotta.entity.EntityResponse;
33-
import org.terracotta.entity.InvokeFuture;
3433
import org.terracotta.entity.MessageCodecException;
35-
import org.terracotta.exception.EntityException;
3634

3735
import java.util.Set;
36+
import java.util.concurrent.ExecutionException;
37+
import java.util.concurrent.Future;
3838

3939
/**
4040
* The client-side {@link Entity} through which clustered cache operations are performed.
@@ -76,38 +76,38 @@ public void close() {
7676

7777
@Override
7878
public void validate(ServerSideConfiguration config) throws ClusterException {
79-
invokeInternal(messageFactory.validateStoreManager(config), false);
79+
invokeInternal(messageFactory.validateStoreManager(config));
8080
}
8181

8282
@Override
8383
public Set<String> prepareForDestroy() {
8484
try {
85-
PrepareForDestroy response = (PrepareForDestroy) invokeInternal(messageFactory.prepareForDestroy(), true);
85+
PrepareForDestroy response = (PrepareForDestroy) invokeInternal(messageFactory.prepareForDestroy());
8686
return response.getStores();
8787
} catch (ClusterException e) {
8888
// TODO handle this
8989
}
9090
return null;
9191
}
9292

93-
private EhcacheEntityResponse invokeInternal(EhcacheEntityMessage message, boolean replicate)
93+
private EhcacheEntityResponse invokeInternal(EhcacheEntityMessage message)
9494
throws ClusterException {
9595

9696
try {
97-
EhcacheEntityResponse response = waitFor(invokeAsync(message, replicate));
97+
EhcacheEntityResponse response = waitFor(invokeAsync(message));
9898
if (EhcacheResponseType.FAILURE.equals(response.getResponseType())) {
9999
throw ((Failure)response).getCause();
100100
} else {
101101
return response;
102102
}
103-
} catch (EntityException | MessageCodecException e) {
104-
throw new RuntimeException(message + " error: " + e.toString(), e);
103+
} catch (ExecutionException | MessageCodecException e) {
104+
throw new RuntimeException(message + " error: " + e, e);
105105
}
106106
}
107107

108-
private InvokeFuture<EhcacheEntityResponse> invokeAsync(EhcacheEntityMessage message, boolean replicate)
108+
private Future<EhcacheEntityResponse> invokeAsync(EhcacheEntityMessage message)
109109
throws MessageCodecException {
110-
return endpoint.beginInvoke().message(message).replicate(replicate).invoke();
110+
return endpoint.message(message).invoke();
111111
}
112112

113113
/**
@@ -117,10 +117,9 @@ private InvokeFuture<EhcacheEntityResponse> invokeAsync(EhcacheEntityMessage mes
117117
* @param future Future we want to get
118118
* @param <T> type of the response
119119
* @return the result of the get
120-
* @throws EntityException exception that might be thrown by the future in case of error
121120
*/
122-
private static <T extends EntityResponse> T waitFor(InvokeFuture<T> future)
123-
throws EntityException {
121+
private static <T extends EntityResponse> T waitFor(Future<T> future)
122+
throws ExecutionException {
124123
boolean interrupted = Thread.interrupted();
125124
try {
126125
while (true) {

clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/lock/VoltronReadWriteLockClient.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.ehcache.clustered.client.internal.lock;
1818

19+
import java.util.concurrent.ExecutionException;
20+
import java.util.concurrent.Future;
1921
import java.util.concurrent.Semaphore;
2022
import org.ehcache.clustered.common.internal.lock.LockMessaging;
2123
import org.ehcache.clustered.common.internal.lock.LockMessaging.LockOperation;
@@ -24,9 +26,7 @@
2426
import org.terracotta.connection.entity.Entity;
2527
import org.terracotta.entity.EndpointDelegate;
2628
import org.terracotta.entity.EntityClientEndpoint;
27-
import org.terracotta.entity.InvokeFuture;
2829
import org.terracotta.entity.MessageCodecException;
29-
import org.terracotta.exception.EntityException;
3030

3131
public class VoltronReadWriteLockClient implements Entity {
3232

@@ -106,26 +106,22 @@ private LockOperation getCurrentState() {
106106
}
107107

108108
private LockTransition invoke(LockOperation operation) {
109+
Future<LockTransition> result = endpoint.message(operation).invoke();
110+
boolean interrupted = Thread.interrupted();
109111
try {
110-
InvokeFuture<LockTransition> result = endpoint.beginInvoke().message(operation).replicate(false).invoke();
111-
boolean interrupted = false;
112-
try {
113-
while (true) {
114-
try {
115-
return result.get();
116-
} catch (InterruptedException ex) {
117-
interrupted = true;
118-
} catch (EntityException ex) {
119-
throw new IllegalStateException(ex);
120-
}
121-
}
122-
} finally {
123-
if (interrupted) {
124-
Thread.currentThread().interrupt();
112+
while (true) {
113+
try {
114+
return result.get();
115+
} catch (InterruptedException ex) {
116+
interrupted = true;
117+
} catch (ExecutionException ex) {
118+
throw new IllegalStateException(ex.getCause());
125119
}
126120
}
127-
} catch (MessageCodecException ex) {
128-
throw new AssertionError(ex);
121+
} finally {
122+
if (interrupted) {
123+
Thread.currentThread().interrupt();
124+
}
129125
}
130126
}
131127
}

clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/service/ConnectionState.java

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141

4242
import java.io.IOException;
4343
import java.util.Properties;
44-
import java.util.Random;
4544
import java.util.concurrent.ConcurrentHashMap;
4645
import java.util.concurrent.ConcurrentMap;
4746
import java.util.concurrent.Executor;
@@ -50,6 +49,7 @@
5049
import java.util.concurrent.atomic.AtomicInteger;
5150

5251
import static java.util.Objects.requireNonNull;
52+
import static org.ehcache.core.util.ExceptionUtil.containsCause;
5353

5454
class ConnectionState {
5555

@@ -114,9 +114,13 @@ public ClusterTierClientEntity createClusterTierClientEntity(String cacheId,
114114
break;
115115
} catch (EntityNotFoundException e) {
116116
throw new PerpetualCachePersistenceException("Cluster tier proxy '" + cacheId + "' for entity '" + entityIdentifier + "' does not exist.", e);
117-
} catch (ConnectionClosedException | ConnectionShutdownException e) {
118-
LOGGER.info("Disconnected from the server", e);
119-
handleConnectionClosedException(true);
117+
} catch (Throwable t) {
118+
if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) {
119+
LOGGER.info("Disconnected from the server", t);
120+
handleConnectionClosedException(true);
121+
} else {
122+
throw t;
123+
}
120124
}
121125
}
122126

@@ -151,6 +155,12 @@ private void reconnect() {
151155
break;
152156
} catch (ConnectionClosedException | ConnectionException e) {
153157
LOGGER.error("Re-connection to server failed, trying again", e);
158+
} catch (Throwable t) {
159+
if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) {
160+
LOGGER.error("Re-connection to server failed, trying again", t);
161+
} else {
162+
throw t;
163+
}
154164
}
155165
}
156166
}
@@ -180,6 +190,14 @@ private boolean silentDestroyUtil() {
180190
LOGGER.info("Disconnected from the server", e);
181191
reconnect();
182192
return false;
193+
} catch (Throwable t) {
194+
if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) {
195+
LOGGER.info("Disconnected from the server", t);
196+
reconnect();
197+
return false;
198+
} else {
199+
throw t;
200+
}
183201
}
184202
}
185203

@@ -262,6 +280,12 @@ public void destroyAll() throws CachePersistenceException {
262280
throw new CachePersistenceException("Cannot delete cluster tiers on " + connectionSource, e);
263281
} catch (ConnectionClosedException | ConnectionShutdownException e) {
264282
handleConnectionClosedException(false);
283+
} catch (Throwable t) {
284+
if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) {
285+
handleConnectionClosedException(false);
286+
} else {
287+
throw t;
288+
}
265289
}
266290
}
267291
}
@@ -285,6 +309,12 @@ public void destroy(String name) throws CachePersistenceException {
285309
}
286310
} catch (ConnectionClosedException | ConnectionShutdownException e) {
287311
reconnect();
312+
} catch (Throwable t) {
313+
if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) {
314+
reconnect();
315+
} else {
316+
throw t;
317+
}
288318
}
289319
}
290320

@@ -299,6 +329,12 @@ public void destroy(String name) throws CachePersistenceException {
299329
break;
300330
} catch (ConnectionClosedException | ConnectionShutdownException e) {
301331
handleConnectionClosedException(false);
332+
} catch (Throwable t) {
333+
if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) {
334+
handleConnectionClosedException(false);
335+
} else {
336+
throw t;
337+
}
302338
}
303339
}
304340
}
@@ -315,6 +351,14 @@ private void autoCreateEntity() throws ClusterTierManagerValidationException, Il
315351
LOGGER.info("Disconnected from the server", e);
316352
reconnect();
317353
continue;
354+
} catch (Throwable t) {
355+
if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) {
356+
LOGGER.info("Disconnected from the server", t);
357+
reconnect();
358+
continue;
359+
} else {
360+
throw t;
361+
}
318362
}
319363

320364
try {
@@ -330,6 +374,13 @@ private void autoCreateEntity() throws ClusterTierManagerValidationException, Il
330374
} catch (ConnectionClosedException | ConnectionShutdownException e) {
331375
LOGGER.info("Disconnected from the server", e);
332376
reconnect();
377+
} catch (Throwable t) {
378+
if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) {
379+
LOGGER.info("Disconnected from the server", t);
380+
reconnect();
381+
} else {
382+
throw t;
383+
}
333384
}
334385
}
335386

@@ -349,8 +400,12 @@ private void handleConnectionClosedException(boolean retrieve) throws ClusterTie
349400
}
350401
connectionRecoveryListener.run();
351402
break;
352-
} catch (ConnectionClosedException | ConnectionShutdownException e) {
353-
LOGGER.info("Disconnected from the server", e);
403+
} catch (Throwable t) {
404+
if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) {
405+
LOGGER.info("Disconnected from the server", t);
406+
} else {
407+
throw t;
408+
}
354409
}
355410
}
356411
}

clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/CommonServerStoreProxy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public ChainEntry get(long key) throws TimeoutException {
143143
@Override
144144
public void append(long key, ByteBuffer payLoad) {
145145
try {
146-
entity.invokeAndWaitForReceive(new AppendMessage(key, payLoad), true);
146+
entity.invokeAndWaitForComplete(new AppendMessage(key, payLoad), true);
147147
} catch (Exception e) {
148148
throw new ServerStoreProxyException(e);
149149
}

clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ReconnectingServerStoreProxy.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package org.ehcache.clustered.client.internal.store;
1717

1818
import org.ehcache.clustered.client.internal.store.lock.LockingServerStoreProxy;
19-
import org.ehcache.clustered.client.internal.store.lock.LockingServerStoreProxyImpl;
2019
import org.ehcache.clustered.common.internal.store.Chain;
2120
import org.slf4j.Logger;
2221
import org.slf4j.LoggerFactory;
@@ -29,6 +28,8 @@
2928
import java.util.concurrent.TimeoutException;
3029
import java.util.concurrent.atomic.AtomicReference;
3130

31+
import static org.ehcache.core.util.ExceptionUtil.containsCause;
32+
3233
public class ReconnectingServerStoreProxy implements LockingServerStoreProxy {
3334

3435
private static final Logger LOGGER = LoggerFactory.getLogger(ReconnectingServerStoreProxy.class);
@@ -54,8 +55,12 @@ public String getCacheId() {
5455
public void close() {
5556
try {
5657
proxy().close();
57-
} catch (ConnectionClosedException | ConnectionShutdownException e) {
58-
LOGGER.debug("Store was already closed, since connection was closed");
58+
} catch (Throwable t) {
59+
if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) {
60+
LOGGER.debug("Store was already closed, since connection was closed");
61+
} else {
62+
throw t;
63+
}
5964
}
6065
}
6166

@@ -119,7 +124,7 @@ private <T> T onStoreProxy(TimeoutExceptionFunction<LockingServerStoreProxy, T>
119124
try {
120125
return function.apply(storeProxy);
121126
} catch (ServerStoreProxyException sspe) {
122-
if (sspe.getCause() instanceof ConnectionClosedException) {
127+
if (containsCause(sspe, ConnectionClosedException.class)) {
123128
if (delegateRef.compareAndSet(storeProxy, new ReconnectInProgressProxy(storeProxy.getCacheId()))) {
124129
onReconnect.run();
125130
}

0 commit comments

Comments
 (0)