File tree Expand file tree Collapse file tree 2 files changed +28
-5
lines changed
main/java/com/netflix/hollow/api/consumer
test/java/com/netflix/hollow/api/consumer Expand file tree Collapse file tree 2 files changed +28
-5
lines changed Original file line number Diff line number Diff line change @@ -244,8 +244,8 @@ public void triggerRefresh() {
244244 * <p>
245245 * This is an asynchronous call.
246246 */
247- public void triggerAsyncRefresh () {
248- triggerAsyncRefreshWithDelay (0 );
247+ public CompletableFuture < Void > triggerAsyncRefresh () {
248+ return triggerAsyncRefreshWithDelay (0 );
249249 }
250250
251251 /**
@@ -256,10 +256,10 @@ public void triggerAsyncRefresh() {
256256 *
257257 * @param delayMillis the delay, in millseconds, before triggering the refresh
258258 */
259- public void triggerAsyncRefreshWithDelay (int delayMillis ) {
259+ public CompletableFuture < Void > triggerAsyncRefreshWithDelay (int delayMillis ) {
260260 final long targetBeginTime = System .currentTimeMillis () + delayMillis ;
261261
262- refreshExecutor . execute (() -> {
262+ return CompletableFuture . runAsync (() -> {
263263 try {
264264 long delay = targetBeginTime - System .currentTimeMillis ();
265265 if (delay > 0 )
@@ -278,7 +278,7 @@ public void triggerAsyncRefreshWithDelay(int delayMillis) {
278278 LOG .log (Level .SEVERE , "Async refresh failed" , e );
279279 throw e ;
280280 }
281- });
281+ }, refreshExecutor );
282282 }
283283
284284 /**
Original file line number Diff line number Diff line change 3232import com .netflix .hollow .tools .compact .HollowCompactor .CompactionConfig ;
3333import java .time .Duration ;
3434import java .util .BitSet ;
35+ import java .util .concurrent .CompletionException ;
3536import java .util .concurrent .atomic .AtomicInteger ;
3637import org .junit .Assert ;
3738import org .junit .Before ;
@@ -487,6 +488,28 @@ public void consumerFilteringSupport() {
487488 Assert .fail (); // fail if UnsupportedOperationException was not thrown
488489 }
489490
491+ @ Test
492+ public void consumerErrorsDuringRefreshArePropagated () {
493+ HollowProducer producer = HollowProducer .withPublisher (blobStore )
494+ .withAnnouncer (announcement )
495+ .withBlobStager (new HollowInMemoryBlobStager ())
496+ .build ();
497+ long v1 = runCycle (producer , 1 );
498+
499+ InMemoryBlobStore otherBlobStore = new InMemoryBlobStore ();
500+ HollowConsumer consumer = HollowConsumer .withBlobRetriever (otherBlobStore )
501+ .withAnnouncementWatcher (announcement )
502+ .build ();
503+
504+ try {
505+ consumer .triggerAsyncRefresh ().toCompletableFuture ().join ();
506+ Assert .fail ("Expected exception to be thrown by async refresh." );
507+ } catch (Exception e ) {
508+ Assert .assertTrue (e instanceof CompletionException );
509+ Assert .assertTrue (e .getCause () instanceof IllegalArgumentException );
510+ }
511+ }
512+
490513 private long runCycle (HollowProducer producer , final int cycleNumber ) {
491514 return producer .runCycle (state -> state .add (cycleNumber ));
492515 }
You can’t perform that action at this time.
0 commit comments