Skip to content

Commit a7c7dfe

Browse files
authored
Merge branch 'avaje:master' into andponlin-error-for-missing-dep
2 parents 9881419 + eb8e053 commit a7c7dfe

File tree

7 files changed

+110
-37
lines changed

7 files changed

+110
-37
lines changed

inject-events/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<dependency>
1313
<groupId>io.avaje</groupId>
1414
<artifactId>avaje-inject</artifactId>
15-
<version>10.0</version>
15+
<version>12.0</version>
1616
<scope>provided</scope>
1717
</dependency>
1818
<dependency>
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
1-
package io.avaje.inject.events.spi;
1+
package io.avaje.inject.events;
22

33
import java.lang.reflect.Type;
44
import java.util.ArrayList;
55
import java.util.HashMap;
66
import java.util.List;
77
import java.util.Map;
8-
9-
import io.avaje.inject.events.Observer;
10-
import io.avaje.inject.events.ObserverManager;
8+
import java.util.concurrent.Executor;
119

1210
final class DObserverManager implements ObserverManager {
1311

14-
private final Map<Type, List<Observer<?>>> observeMap = new HashMap<>();
12+
private final Map<Type, List<Observer<?>>> observeMap = new HashMap<>();
13+
private final Executor executor;
14+
15+
DObserverManager(Executor executor) {
16+
this.executor = executor;
17+
}
1518

1619
@Override
1720
public <T> void registerObserver(Type type, Observer<T> observer) {
@@ -23,4 +26,9 @@ public <T> void registerObserver(Type type, Observer<T> observer) {
2326
public List<Observer<?>> observersByType(Type eventType) {
2427
return observeMap.computeIfAbsent(eventType, k -> new ArrayList<>());
2528
}
29+
30+
@Override
31+
public Executor asyncExecutor() {
32+
return executor;
33+
}
2634
}

inject-events/src/main/java/io/avaje/inject/events/Event.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,18 @@
1515
* injected:
1616
*
1717
* <pre>{@code
18-
*
19-
* @Inject
20-
* Event<LoggedInEvent> loggedInEvent;
18+
* @Inject
19+
* Event<LoggedInEvent> loggedInEvent;
2120
*
2221
* }</pre>
2322
*
2423
* <p>The <code>fire()</code> method accepts an event object:
2524
*
2625
* <pre>{@code
27-
*
28-
* public void login() {
29-
* ...
30-
* loggedInEvent.fire(new LoggedInEvent(user));
31-
* }
26+
* public void login() {
27+
* ...
28+
* loggedInEvent.fire(new LoggedInEvent(user));
29+
* }
3230
*
3331
* }</pre>
3432
*
@@ -37,7 +35,7 @@
3735
public abstract class Event<T> {
3836

3937
private static final Comparator<Observer<?>> PRIORITY = Comparator.comparing(Observer::priority);
40-
38+
protected final ObserverManager manager;
4139
protected final List<Observer<T>> observers;
4240
protected final String defaultQualifier;
4341

@@ -46,6 +44,7 @@ protected Event(ObserverManager manager, Type type) {
4644
}
4745

4846
protected Event(ObserverManager manager, Type type, String qualifier) {
47+
this.manager = manager;
4948
this.observers = manager.observersByType(type);
5049
this.defaultQualifier = qualifier;
5150
}
@@ -72,7 +71,6 @@ public void fire(T event, String qualifier) {
7271
*/
7372
public CompletionStage<T> fireAsync(T event, String qualifier) {
7473
var exceptionHandler = new CollectingExceptionHandler();
75-
7674
return observers.stream()
7775
.sorted(PRIORITY)
7876
.reduce(CompletableFuture.<Void>completedFuture(null), (future, observer) ->
@@ -82,7 +80,7 @@ public CompletionStage<T> fireAsync(T event, String qualifier) {
8280
} catch (Exception e) {
8381
exceptionHandler.handle(e);
8482
}
85-
}),
83+
}, manager.asyncExecutor()),
8684
(future1, future2) -> future1)
8785
.thenApply(v -> {
8886
handleExceptions(exceptionHandler);

inject-events/src/main/java/io/avaje/inject/events/ObserverManager.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,59 @@
22

33
import java.lang.reflect.Type;
44
import java.util.List;
5+
import java.util.concurrent.Executor;
56

67
/**
78
* Manages all {@link Observer} instances in the BeanScope.
8-
* <p>
9-
* A default implementation is provided by avaje-inject.
9+
*
10+
* <p>A default implementation is provided by avaje-inject.
1011
*/
1112
public interface ObserverManager {
1213

14+
/**
15+
* Return a builder for the default ObserverManager implementation.
16+
*/
17+
static Builder builder() {
18+
return new ObserverManagerBuilder();
19+
}
20+
1321
/**
1422
* Registers the given Consumer as an observer.
1523
*
16-
* @param <T> the type of the event
24+
* @param <T> the type of the event
1725
* @param eventType the type of the event ()
18-
* @param observer the consumer to execute when a matching event is found
26+
* @param observer the consumer to execute when a matching event is found
1927
*/
2028
<T> void registerObserver(Type eventType, Observer<T> observer);
2129

2230
/**
2331
* Retrieves a list of all Observers registered by the given type
2432
*
25-
* @param <T> the Type of the Event
33+
* @param <T> the Type of the Event
2634
* @param eventType the type of the event
2735
* @return all observers registered
2836
*/
2937
<T> List<Observer<T>> observersByType(Type eventType);
38+
39+
/**
40+
* The Executor used for sending async events
41+
*/
42+
Executor asyncExecutor();
43+
44+
/**
45+
* Builder for the default ObserverManager implementation.
46+
*/
47+
interface Builder {
48+
49+
/**
50+
* Specify an Executor to run asynchronous events. Defaults to using
51+
* the Fork join common pool when not specified.
52+
*/
53+
Builder asyncExecutor(Executor executor);
54+
55+
/**
56+
* Build the ObserverManager.
57+
*/
58+
ObserverManager build();
59+
}
3060
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.avaje.inject.events;
2+
3+
import java.util.Objects;
4+
import java.util.concurrent.Executor;
5+
import java.util.concurrent.ForkJoinPool;
6+
7+
final class ObserverManagerBuilder implements ObserverManager.Builder {
8+
9+
private Executor executor = ForkJoinPool.commonPool();
10+
11+
@Override
12+
public ObserverManager.Builder asyncExecutor(Executor executor) {
13+
this.executor = Objects.requireNonNull(executor);
14+
return this;
15+
}
16+
17+
@Override
18+
public ObserverManager build() {
19+
return new DObserverManager(executor);
20+
}
21+
}
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
package io.avaje.inject.events.spi;
22

33
import io.avaje.inject.BeanScopeBuilder;
4+
import io.avaje.inject.events.ObserverManager;
45
import io.avaje.inject.spi.InjectPlugin;
56

67
/** Plugin for avaje inject that provides a default ObserverManager instance. */
78
public final class ObserverManagerPlugin implements InjectPlugin {
89

910
@Override
1011
public Class<?>[] provides() {
11-
return new Class<?>[] {DObserverManager.class};
12+
return new Class<?>[] {ObserverManager.class};
1213
}
1314

1415
@Override
1516
public void apply(BeanScopeBuilder builder) {
16-
builder.provideDefault(null, DObserverManager.class, DObserverManager::new);
17+
builder.provideDefault(null, ObserverManager.class, () -> ObserverManager.builder().build());
1718
}
1819
}

inject-events/src/test/java/io/avaje/inject/events/spi/DObserverManagerTest.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,20 @@
66
import java.util.ArrayList;
77
import java.util.List;
88
import java.util.concurrent.CompletionException;
9-
import java.util.concurrent.ExecutionException;
9+
import java.util.concurrent.Executors;
1010
import java.util.concurrent.atomic.AtomicBoolean;
1111

12+
import io.avaje.inject.events.ObserverManager;
1213
import org.junit.jupiter.api.Test;
1314

1415
import io.avaje.inject.events.Observer;
15-
import io.avaje.inject.events.ObserverManager;
1616
import io.avaje.inject.events.events.TestEvent;
1717
import io.avaje.inject.events.events.TestGenericEvent;
1818
import io.avaje.inject.spi.GenericType;
1919

2020
class DObserverManagerTest {
2121

22-
ObserverManager manager = new DObserverManager();
22+
ObserverManager manager = ObserverManager.builder().build();
2323

2424
@Test
2525
void test() {
@@ -36,7 +36,7 @@ void test() {
3636
}
3737

3838
@Test
39-
void testPriority() throws InterruptedException, ExecutionException {
39+
void testPriority() {
4040
var l = new ArrayList<String>();
4141

4242
manager.<String>registerObserver(String.class, new Observer<>(0, false, s -> l.add("1"), ""));
@@ -48,42 +48,57 @@ void testPriority() throws InterruptedException, ExecutionException {
4848
}
4949

5050
@Test
51-
void testAsync() throws InterruptedException, ExecutionException {
51+
void testAsync() {
5252
AtomicBoolean aBoolean = new AtomicBoolean();
5353

5454
manager.<String>registerObserver(
5555
String.class, new Observer<>(0, true, s -> aBoolean.set(true), ""));
5656

57-
new TestEvent(manager).fireAsync("str").toCompletableFuture().get();
57+
new TestEvent(manager).fireAsync("str").toCompletableFuture().join();
5858
assertThat(aBoolean.get()).isTrue();
5959
}
6060

6161
@Test
62-
void testAsyncPriority() throws InterruptedException, ExecutionException {
63-
var l = new ArrayList<String>();
62+
void testAsyncExecutor() {
63+
AtomicBoolean aBoolean = new AtomicBoolean();
64+
var exec = Executors.newSingleThreadScheduledExecutor();
65+
var manager = ObserverManager.builder()
66+
.asyncExecutor(exec)
67+
.build();
68+
69+
assertThat(exec).isSameAs(manager.asyncExecutor());
70+
manager.<String>registerObserver(
71+
String.class, new Observer<>(0, true, s -> aBoolean.set(true), ""));
72+
73+
new TestEvent(manager).fireAsync("str").toCompletableFuture().join();
74+
assertThat(aBoolean.get()).isTrue();
75+
}
6476

77+
@Test
78+
void testAsyncPriority() {
79+
var l = new ArrayList<String>();
6580
manager.<String>registerObserver(String.class, new Observer<>(0, true, s -> l.add("1"), ""));
6681
manager.<String>registerObserver(String.class, new Observer<>(5, true, s -> l.add("5"), ""));
6782
manager.<String>registerObserver(String.class, new Observer<>(2, true, s -> l.add("2"), ""));
6883

69-
new TestEvent(manager).fireAsync("str").toCompletableFuture().get();
84+
new TestEvent(manager).fireAsync("str").toCompletableFuture().join();
7085
assertThat(l).containsExactly("1", "2", "5");
7186
}
7287

7388
@Test
74-
void testGenericAsync() throws InterruptedException, ExecutionException {
89+
void testGenericAsync() {
7590
AtomicBoolean aBoolean = new AtomicBoolean();
7691

7792
manager.<List<String>>registerObserver(
7893
new GenericType<List<String>>() {}.type(),
7994
new Observer<>(0, true, s -> aBoolean.set(true), ""));
8095

81-
new TestGenericEvent(manager).fireAsync(List.of("str")).toCompletableFuture().get();
96+
new TestGenericEvent(manager).fireAsync(List.of("str")).toCompletableFuture().join();
8297
assertThat(aBoolean.get()).isTrue();
8398
}
8499

85100
@Test
86-
void testError() throws InterruptedException {
101+
void testError() {
87102
final var testEvent = new TestEvent(manager);
88103
testEvent.fire("sus");
89104

@@ -102,7 +117,7 @@ void testError() throws InterruptedException {
102117
}
103118

104119
@Test
105-
void testAsyncError() throws InterruptedException, ExecutionException {
120+
void testAsyncError() {
106121
final var testEvent = new TestEvent(manager);
107122

108123
manager.<String>registerObserver(

0 commit comments

Comments
 (0)