Skip to content

Commit 7f11356

Browse files
Fixed RxBleAdapterStateObservable leak.
1 parent bc5d0b4 commit 7f11356

File tree

4 files changed

+40
-13
lines changed

4 files changed

+40
-13
lines changed

rxandroidble/src/main/java/com/polidea/rxandroidble2/internal/connection/DisconnectAction.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,23 @@
55

66
import bleshadow.javax.inject.Inject;
77

8-
import io.reactivex.internal.functions.Functions;
8+
import io.reactivex.functions.Action;
99

1010
@ConnectionScope
1111
class DisconnectAction implements ConnectionSubscriptionWatcher {
1212

1313
private final ClientOperationQueue clientOperationQueue;
1414
private final DisconnectOperation operationDisconnect;
15+
private final DisconnectionRouterInput disconnectionRouterInput;
1516

1617
@Inject
17-
DisconnectAction(ClientOperationQueue clientOperationQueue, DisconnectOperation operationDisconnect) {
18+
DisconnectAction(
19+
ClientOperationQueue clientOperationQueue,
20+
DisconnectOperation operationDisconnect,
21+
DisconnectionRouterInput disconnectionRouterInput) {
1822
this.clientOperationQueue = clientOperationQueue;
1923
this.operationDisconnect = operationDisconnect;
24+
this.disconnectionRouterInput = disconnectionRouterInput;
2025
}
2126

2227
@Override
@@ -28,9 +33,13 @@ public void onConnectionSubscribed() {
2833
public void onConnectionUnsubscribed() {
2934
clientOperationQueue
3035
.queue(operationDisconnect)
31-
.subscribe(
32-
Functions.emptyConsumer(),
33-
Functions.emptyConsumer()
34-
);
36+
.ignoreElements()
37+
.onErrorComplete()
38+
.subscribe(new Action() {
39+
@Override
40+
public void run() {
41+
disconnectionRouterInput.close();
42+
}
43+
});
3544
}
3645
}

rxandroidble/src/main/java/com/polidea/rxandroidble2/internal/connection/DisconnectionRouter.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.polidea.rxandroidble2.internal.connection;
22

33

4-
import com.jakewharton.rxrelay2.BehaviorRelay;
54
import com.polidea.rxandroidble2.RxBleAdapterStateObservable;
65
import com.polidea.rxandroidble2.exceptions.BleDisconnectedException;
76
import com.polidea.rxandroidble2.exceptions.BleException;
@@ -19,14 +18,15 @@
1918
import io.reactivex.functions.Consumer;
2019
import io.reactivex.functions.Function;
2120
import io.reactivex.functions.Predicate;
21+
import io.reactivex.subjects.BehaviorSubject;
2222

2323
/**
2424
* A class that is responsible for routing all potential sources of disconnection to an Observable that emits only errors.
2525
*/
2626
@ConnectionScope
2727
class DisconnectionRouter implements DisconnectionRouterInput, DisconnectionRouterOutput {
2828

29-
private final BehaviorRelay<BleException> bleExceptionBehaviorRelay = BehaviorRelay.create();
29+
private final BehaviorSubject<BleException> bleExceptionBehaviorSubject = BehaviorSubject.create();
3030
private final Observable<BleException> firstDisconnectionValueObs;
3131
private final Observable<Object> firstDisconnectionExceptionObs;
3232

@@ -59,14 +59,19 @@ public void accept(BleException exception) {
5959
RxBleLog.v("An exception received, indicating that the adapter has became unusable.");
6060
}
6161
})
62-
.subscribe(bleExceptionBehaviorRelay, new Consumer<Throwable>() {
62+
.subscribe(new Consumer<BleException>() {
63+
@Override
64+
public void accept(BleException t) {
65+
bleExceptionBehaviorSubject.onNext(t);
66+
}
67+
}, new Consumer<Throwable>() {
6368
@Override
6469
public void accept(Throwable throwable) {
6570
RxBleLog.e(throwable, "Failed to monitor adapter state.");
6671
}
6772
});
6873

69-
firstDisconnectionValueObs = bleExceptionBehaviorRelay
74+
firstDisconnectionValueObs = bleExceptionBehaviorSubject
7075
.firstElement()
7176
.toObservable()
7277
.doOnTerminate(new Action() {
@@ -107,12 +112,17 @@ public boolean test(Boolean isAdapterUsable) {
107112

108113
@Override
109114
public void onDisconnectedException(BleDisconnectedException disconnectedException) {
110-
bleExceptionBehaviorRelay.accept(disconnectedException);
115+
bleExceptionBehaviorSubject.onNext(disconnectedException);
111116
}
112117

113118
@Override
114119
public void onGattConnectionStateException(BleGattException disconnectedGattException) {
115-
bleExceptionBehaviorRelay.accept(disconnectedGattException);
120+
bleExceptionBehaviorSubject.onNext(disconnectedGattException);
121+
}
122+
123+
@Override
124+
public void close() {
125+
bleExceptionBehaviorSubject.onComplete();
116126
}
117127

118128
@Override

rxandroidble/src/main/java/com/polidea/rxandroidble2/internal/connection/DisconnectionRouterInput.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,9 @@ interface DisconnectionRouterInput {
2424
* @param disconnectedGattException the exception that happened
2525
*/
2626
void onGattConnectionStateException(BleGattException disconnectedGattException);
27+
28+
/**
29+
* Method to be called when no other calls are expected. Used for cleanup.
30+
*/
31+
void close();
2732
}

rxandroidble/src/test/groovy/com/polidea/rxandroidble2/internal/connection/DisconnectionRouterTest.groovy

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ class DisconnectionRouterTest extends Specification {
240240
}
241241

242242
@Unroll
243-
def "should unsubscribe from adapterStateObservable if it emits STATE_OFF/STATE_TURNING_* or if .on*Exception() is called"() {
243+
def "should unsubscribe from adapterStateObservable if it emits STATE_OFF/STATE_TURNING_* or .on*Exception()/.close() is called"() {
244244

245245
given:
246246
createObjectUnderTest(true)
@@ -268,6 +268,9 @@ class DisconnectionRouterTest extends Specification {
268268
{ PublishSubject<RxBleAdapterStateObservable.BleAdapterState> mockAdapterStateSubject, DisconnectionRouter objectUnderTest ->
269269
objectUnderTest.onDisconnectedException(new BleDisconnectedException("test"))
270270
},
271+
{ PublishSubject<RxBleAdapterStateObservable.BleAdapterState> mockAdapterStateSubject, DisconnectionRouter objectUnderTest ->
272+
objectUnderTest.close()
273+
},
271274
]
272275
}
273276
}

0 commit comments

Comments
 (0)