Skip to content

Commit f20d621

Browse files
authored
Merge pull request #23 from mbonneau/keepalive-hang
Fix keepalive from hanging the connection when input completes.
2 parents 6461c4f + d088f90 commit f20d621

File tree

2 files changed

+43
-3
lines changed

2 files changed

+43
-3
lines changed

src/MessageSubject.php

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Rx\Disposable\CompositeDisposable;
1515
use Rx\DisposableInterface;
1616
use Rx\Exception\TimeoutException;
17+
use Rx\Notification\OnNextNotification;
1718
use Rx\Observable;
1819
use Rx\ObserverInterface;
1920
use Rx\Subject\Subject;
@@ -100,15 +101,21 @@ function (FrameInterface $frame) {
100101
});
101102
})
102103
->switch()
103-
->flatMapTo(Observable::never());
104+
->flatMapTo(Observable::never())
105+
// This detects close or error notifications from the raw data input and stops the keepalive
106+
->takeUntil($this->rawDataIn
107+
->materialize()
108+
->filter(function($notification) { return ! $notification instanceof OnNextNotification; })
109+
->take(1));
104110
}
105111

106112
$this->rawDataDisp = $this->rawDataIn
107113
->merge($keepAliveObs)
108114
->subscribe(
109115
[$messageBuffer, 'onData'],
110116
function (\Throwable $e) { parent::onError($e); },
111-
function () { parent::onCompleted(); }
117+
// onCompleted needs to send an error. If a close frame comes in, this should be disposed already
118+
function () { parent::onError(new WebsocketErrorException(Frame::CLOSE_ABNORMAL)); }
112119
);
113120

114121
$this->subProtocol = $subProtocol;

test/MessageSubjectTest.php

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,4 +183,37 @@ public function testDisposeOnMessageSubjectClosesConnection()
183183
onCompleted(300)
184184
], $dataOut->getMessages());
185185
}
186-
}
186+
187+
public function testMessageSubjectErrorsIfDataInStreamEndsClosesOrErrors() {
188+
$dataIn = $this->createHotObservable([
189+
onNext(201, (new Frame('', true, Frame::OP_TEXT))->getContents()),
190+
onCompleted(205)
191+
]);
192+
193+
$dataOut = new MockObserver($this->scheduler);
194+
195+
$ms = new MessageSubject(
196+
$dataIn,
197+
$dataOut,
198+
true,
199+
false,
200+
'',
201+
new Request('GET', '/ws'),
202+
new Response(),
203+
300
204+
);
205+
206+
$result = $this->scheduler->startWithDispose(function () use ($ms) {
207+
return $ms;
208+
}, 500);
209+
210+
$this->assertMessages([
211+
onNext(201, ''),
212+
onError(205, new WebsocketErrorException(Frame::CLOSE_ABNORMAL))
213+
], $result->getMessages());
214+
215+
$this->assertSubscriptions([
216+
subscribe(0,205)
217+
], $dataIn->getSubscriptions());
218+
}
219+
}

0 commit comments

Comments
 (0)