Skip to content

Commit 334933f

Browse files
committed
Merge branch '0.4.5'
2 parents fcc9e7c + c3647ea commit 334933f

File tree

4 files changed

+181
-58
lines changed

4 files changed

+181
-58
lines changed

src/Buffer.php

Lines changed: 32 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,19 @@ class Buffer extends EventEmitter implements WritableStreamInterface
1010
{
1111
public $stream;
1212
public $listening = false;
13-
public $softLimit = 2048;
13+
public $softLimit = 65536;
1414
private $writable = true;
1515
private $loop;
1616
private $data = '';
17-
private $lastError;
1817

1918
public function __construct($stream, LoopInterface $loop)
2019
{
20+
if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
21+
throw new \InvalidArgumentException('First parameter must be a valid stream resource');
22+
}
23+
2124
$this->stream = $stream;
2225
$this->loop = $loop;
23-
$this->lastErrorFlush();
2426
}
2527

2628
public function isWritable()
@@ -42,9 +44,7 @@ public function write($data)
4244
$this->loop->addWriteStream($this->stream, array($this, 'handleWrite'));
4345
}
4446

45-
$belowSoftLimit = strlen($this->data) < $this->softLimit;
46-
47-
return $belowSoftLimit;
47+
return !isset($this->data[$this->softLimit - 1]);
4848
}
4949

5050
public function end($data = null)
@@ -73,15 +73,15 @@ public function close()
7373

7474
public function handleWrite()
7575
{
76-
if (!is_resource($this->stream)) {
77-
$this->emit('error', array(new \RuntimeException('Tried to write to invalid stream.'), $this));
78-
79-
return;
80-
}
81-
82-
$this->lastErrorFlush();
83-
84-
set_error_handler(array($this, 'errorHandler'));
76+
$error = null;
77+
set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
78+
$error = array(
79+
'message' => $errstr,
80+
'number' => $errno,
81+
'file' => $errfile,
82+
'line' => $errline
83+
);
84+
});
8585

8686
$sent = fwrite($this->stream, $this->data);
8787

@@ -94,55 +94,38 @@ public function handleWrite()
9494
// to keep the stream open for further tries to write.
9595
// Should this turn out to be a permanent error later, it will eventually
9696
// send *nothing* and we can detect this.
97-
if ($sent === 0 && $this->lastError['number'] > 0) {
98-
$this->emit('error', array(
99-
new \ErrorException(
100-
$this->lastError['message'],
97+
if ($sent === 0 || $sent === false) {
98+
if ($error === null) {
99+
$error = new \RuntimeException('Send failed');
100+
} else {
101+
$error = new \ErrorException(
102+
$error['message'],
101103
0,
102-
$this->lastError['number'],
103-
$this->lastError['file'],
104-
$this->lastError['line']
105-
),
106-
$this
107-
));
104+
$error['number'],
105+
$error['file'],
106+
$error['line']
107+
);
108+
}
108109

109-
return;
110-
}
110+
$this->emit('error', array(new \RuntimeException('Unable to write to stream: ' . $error->getMessage(), 0, $error), $this));
111111

112-
if ($sent === 0) {
113-
$this->emit('error', array(new \RuntimeException('Send failed'), $this));
114112
return;
115113
}
116114

117-
$len = strlen($this->data);
115+
$exceeded = isset($this->data[$this->softLimit - 1]);
118116
$this->data = (string) substr($this->data, $sent);
119117

120-
if ($len >= $this->softLimit && $len - $sent < $this->softLimit) {
118+
// buffer has been above limit and is now below limit
119+
if ($exceeded && !isset($this->data[$this->softLimit - 1])) {
121120
$this->emit('drain', array($this));
122121
}
123122

124-
if (0 === strlen($this->data)) {
123+
// buffer is now completely empty (and not closed already)
124+
if ($this->data === '' && $this->listening) {
125125
$this->loop->removeWriteStream($this->stream);
126126
$this->listening = false;
127127

128128
$this->emit('full-drain', array($this));
129129
}
130130
}
131-
132-
private function errorHandler($errno, $errstr, $errfile, $errline)
133-
{
134-
$this->lastError['number'] = $errno;
135-
$this->lastError['message'] = $errstr;
136-
$this->lastError['file'] = $errfile;
137-
$this->lastError['line'] = $errline;
138-
}
139-
140-
private function lastErrorFlush() {
141-
$this->lastError = array(
142-
'number' => 0,
143-
'message' => '',
144-
'file' => '',
145-
'line' => 0,
146-
);
147-
}
148131
}

src/Stream.php

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,24 @@
88

99
class Stream extends EventEmitter implements DuplexStreamInterface
1010
{
11-
public $bufferSize = 4096;
11+
/**
12+
* Controls the maximum buffer size in bytes to ready at once from the stream.
13+
*
14+
* This can be a positive number which means that up to X bytes will be read
15+
* at once from the underlying stream resource. Note that the actual number
16+
* of bytes read may be lower if the stream resource has less than X bytes
17+
* currently available.
18+
*
19+
* This can be `null` which means read everything available from the
20+
* underlying stream resource.
21+
* This should read until the stream resource is not readable anymore
22+
* (i.e. underlying buffer drained), note that this does not neccessarily
23+
* mean it reached EOF.
24+
*
25+
* @var int|null
26+
*/
27+
public $bufferSize = 65536;
28+
1229
public $stream;
1330
protected $readable = true;
1431
protected $writable = true;
@@ -139,7 +156,7 @@ public function handleData($stream)
139156
);
140157
});
141158

142-
$data = fread($stream, $this->bufferSize);
159+
$data = stream_get_contents($stream, $this->bufferSize === null ? -1 : $this->bufferSize);
143160

144161
restore_error_handler();
145162

tests/BufferTest.php

Lines changed: 78 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,18 @@ public function testConstructor()
1818
$buffer->on('error', $this->expectCallableNever());
1919
}
2020

21+
/**
22+
* @covers React\Stream\Buffer::__construct
23+
* @expectedException InvalidArgumentException
24+
*/
25+
public function testConstructorThrowsIfNotAValidStreamResource()
26+
{
27+
$stream = null;
28+
$loop = $this->createLoopMock();
29+
30+
new Buffer($stream, $loop);
31+
}
32+
2133
/**
2234
* @covers React\Stream\Buffer::write
2335
* @covers React\Stream\Buffer::handleWrite
@@ -84,6 +96,20 @@ public function testWriteReturnsFalseWhenBufferIsFull()
8496
$this->assertFalse($buffer->write("bar\n"));
8597
}
8698

99+
/**
100+
* @covers React\Stream\Buffer::write
101+
*/
102+
public function testWriteReturnsFalseWhenBufferIsExactlyFull()
103+
{
104+
$stream = fopen('php://temp', 'r+');
105+
$loop = $this->createLoopMock();
106+
107+
$buffer = new Buffer($stream, $loop);
108+
$buffer->softLimit = 3;
109+
110+
$this->assertFalse($buffer->write("foo"));
111+
}
112+
87113
/**
88114
* @covers React\Stream\Buffer::write
89115
* @covers React\Stream\Buffer::handleWrite
@@ -152,8 +178,6 @@ public function testDrain()
152178
*/
153179
public function testWriteInDrain()
154180
{
155-
$writeStreams = array();
156-
157181
$stream = fopen('php://temp', 'r+');
158182
$loop = $this->createWriteableLoopMock();
159183
$loop->preventWrites = true;
@@ -176,6 +200,47 @@ public function testWriteInDrain()
176200
$this->assertSame("foo\nbar\n", stream_get_contents($stream));
177201
}
178202

203+
/**
204+
* @covers React\Stream\Buffer::write
205+
* @covers React\Stream\Buffer::handleWrite
206+
*/
207+
public function testDrainAndFullDrainAfterWrite()
208+
{
209+
$stream = fopen('php://temp', 'r+');
210+
$loop = $this->createLoopMock();
211+
212+
$buffer = new Buffer($stream, $loop);
213+
$buffer->softLimit = 2;
214+
215+
$buffer->on('drain', $this->expectCallableOnce());
216+
$buffer->on('full-drain', $this->expectCallableOnce());
217+
218+
$buffer->write("foo");
219+
$buffer->handleWrite();
220+
}
221+
222+
/**
223+
* @covers React\Stream\Buffer::write
224+
* @covers React\Stream\Buffer::handleWrite
225+
*/
226+
public function testCloseDuringDrainWillNotEmitFullDrain()
227+
{
228+
$stream = fopen('php://temp', 'r+');
229+
$loop = $this->createLoopMock();
230+
231+
$buffer = new Buffer($stream, $loop);
232+
$buffer->softLimit = 2;
233+
234+
// close buffer on drain event => expect close event, but no full-drain after
235+
$buffer->on('drain', $this->expectCallableOnce());
236+
$buffer->on('drain', array($buffer, 'close'));
237+
$buffer->on('close', $this->expectCallableOnce());
238+
$buffer->on('full-drain', $this->expectCallableNever());
239+
240+
$buffer->write("foo");
241+
$buffer->handleWrite();
242+
}
243+
179244
/**
180245
* @covers React\Stream\Buffer::end
181246
*/
@@ -249,11 +314,10 @@ public function testWritingToClosedBufferShouldNotWriteToStream()
249314

250315
/**
251316
* @covers React\Stream\Buffer::handleWrite
252-
* @covers React\Stream\Buffer::errorHandler
253317
*/
254-
public function testError()
318+
public function testErrorWhenStreamResourceIsInvalid()
255319
{
256-
$stream = null;
320+
$stream = fopen('php://temp', 'r+');
257321
$loop = $this->createWriteableLoopMock();
258322

259323
$error = null;
@@ -263,9 +327,16 @@ public function testError()
263327
$error = $message;
264328
});
265329

330+
// invalidate stream resource
331+
fclose($stream);
332+
266333
$buffer->write('Attempting to write to bad stream');
334+
267335
$this->assertInstanceOf('Exception', $error);
268-
$this->assertSame('Tried to write to invalid stream.', $error->getMessage());
336+
337+
// the error messages differ between PHP versions, let's just check substrings
338+
$this->assertContains('Unable to write to stream: ', $error->getMessage());
339+
$this->assertContains(' not a valid stream resource', $error->getMessage(), '', true);
269340
}
270341

271342
public function testWritingToClosedStream()
@@ -290,7 +361,7 @@ public function testWritingToClosedStream()
290361
$buffer->write('bar');
291362

292363
$this->assertInstanceOf('Exception', $error);
293-
$this->assertSame('fwrite(): send of 3 bytes failed with errno=32 Broken pipe', $error->getMessage());
364+
$this->assertSame('Unable to write to stream: fwrite(): send of 3 bytes failed with errno=32 Broken pipe', $error->getMessage());
294365
}
295366

296367
private function createWriteableLoopMock()

tests/StreamTest.php

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,58 @@ public function testDataEvent()
5151
$this->assertSame("foobar\n", $capturedData);
5252
}
5353

54+
/**
55+
* @covers React\Stream\Stream::__construct
56+
* @covers React\Stream\Stream::handleData
57+
*/
58+
public function testDataEventDoesEmitOneChunkMatchingBufferSize()
59+
{
60+
$stream = fopen('php://temp', 'r+');
61+
$loop = $this->createLoopMock();
62+
63+
$capturedData = null;
64+
65+
$conn = new Stream($stream, $loop);
66+
$conn->on('data', function ($data) use (&$capturedData) {
67+
$capturedData = $data;
68+
});
69+
70+
fwrite($stream, str_repeat("a", 100000));
71+
rewind($stream);
72+
73+
$conn->handleData($stream);
74+
75+
$this->assertTrue($conn->isReadable());
76+
$this->assertEquals($conn->bufferSize, strlen($capturedData));
77+
}
78+
79+
/**
80+
* @covers React\Stream\Stream::__construct
81+
* @covers React\Stream\Stream::handleData
82+
*/
83+
public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfinite()
84+
{
85+
$stream = fopen('php://temp', 'r+');
86+
$loop = $this->createLoopMock();
87+
88+
$capturedData = null;
89+
90+
$conn = new Stream($stream, $loop);
91+
$conn->bufferSize = null;
92+
93+
$conn->on('data', function ($data) use (&$capturedData) {
94+
$capturedData = $data;
95+
});
96+
97+
fwrite($stream, str_repeat("a", 100000));
98+
rewind($stream);
99+
100+
$conn->handleData($stream);
101+
102+
$this->assertFalse($conn->isReadable());
103+
$this->assertEquals(100000, strlen($capturedData));
104+
}
105+
54106
/**
55107
* @covers React\Stream\Stream::handleData
56108
*/

0 commit comments

Comments
 (0)