Skip to content

Commit 0374a08

Browse files
committed
Add CompositeStream, make ThroughStream extend it
1 parent c9cabf9 commit 0374a08

File tree

3 files changed

+102
-51
lines changed

3 files changed

+102
-51
lines changed

CompositeStream.php

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
<?php
2+
3+
namespace React\Stream;
4+
5+
use Evenement\EventEmitter;
6+
7+
class CompositeStream extends EventEmitter implements ReadableStreamInterface, WritableStreamInterface
8+
{
9+
protected $readable;
10+
protected $writable;
11+
protected $pipeSource;
12+
13+
public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable)
14+
{
15+
$this->readable = $readable;
16+
$this->writable = $writable;
17+
18+
$this->forwardEvents($this->readable, ['data', 'end', 'error', 'close']);
19+
$this->forwardEvents($this->writable, ['drain', 'error', 'close', 'pipe']);
20+
21+
$this->readable->on('close', array($this, 'close'));
22+
$this->writable->on('close', array($this, 'close'));
23+
24+
$this->on('pipe', array($this, 'handlePipeEvent'));
25+
}
26+
27+
public function handlePipeEvent($source)
28+
{
29+
$this->pipeSource = $source;
30+
}
31+
32+
public function isReadable()
33+
{
34+
return $this->readable->isReadable();
35+
}
36+
37+
public function pause()
38+
{
39+
if ($this->pipeSource) {
40+
$this->pipeSource->pause();
41+
}
42+
43+
$this->readable->pause();
44+
}
45+
46+
public function resume()
47+
{
48+
if ($this->pipeSource) {
49+
$this->pipeSource->resume();
50+
}
51+
52+
$this->readable->resume();
53+
}
54+
55+
public function pipe(WritableStreamInterface $dest, array $options = array())
56+
{
57+
Util::pipe($this, $dest, $options);
58+
59+
return $dest;
60+
}
61+
62+
public function isWritable()
63+
{
64+
return $this->writable->isWritable();
65+
}
66+
67+
public function write($data)
68+
{
69+
return $this->writable->write($data);
70+
}
71+
72+
public function end($data = null)
73+
{
74+
$this->writable->end($data);
75+
}
76+
77+
public function close()
78+
{
79+
$this->pipeSource = true;
80+
81+
$this->readable->close();
82+
$this->writable->close();
83+
}
84+
85+
protected function forwardEvents($stream, array $events)
86+
{
87+
$that = $this;
88+
89+
foreach ($events as $event) {
90+
$stream->on($event, function () use ($event, $that) {
91+
$that->emit($event, func_get_args());
92+
});
93+
}
94+
}
95+
}

ThroughStream.php

Lines changed: 7 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,14 @@
22

33
namespace React\Stream;
44

5-
class ThroughStream extends ReadableStream implements WritableStreamInterface
5+
class ThroughStream extends CompositeStream
66
{
7-
private $pipeSource;
8-
97
public function __construct()
108
{
11-
$this->on('pipe', array($this, 'handlePipeEvent'));
12-
}
9+
$readable = new ReadableStream();
10+
$writable = new WritableStream();
1311

14-
public function handlePipeEvent($source)
15-
{
16-
$this->pipeSource = $source;
12+
parent::__construct($readable, $writable);
1713
}
1814

1915
public function filter($data)
@@ -23,44 +19,15 @@ public function filter($data)
2319

2420
public function write($data)
2521
{
26-
$this->emit('data', array($this->filter($data)));
22+
$this->readable->emit('data', array($this->filter($data)));
2723
}
2824

2925
public function end($data = null)
3026
{
3127
if (null !== $data) {
32-
$this->write($data);
33-
}
34-
35-
$this->close();
36-
}
37-
38-
public function isWritable()
39-
{
40-
return !$this->closed;
41-
}
42-
43-
public function pause()
44-
{
45-
if ($this->pipeSource) {
46-
$this->pipeSource->pause();
47-
}
48-
}
49-
50-
public function resume()
51-
{
52-
if ($this->pipeSource) {
53-
$this->pipeSource->resume();
54-
}
55-
}
56-
57-
public function close()
58-
{
59-
if ($this->closed) {
60-
return;
28+
$this->readable->emit('data', array($this->filter($data)));
6129
}
6230

63-
parent::close();
64-
$this->pipeSource = null;
31+
$this->writable->end($data);
6532
}
6633
}

WritableStream.php

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,6 @@
77
class WritableStream extends EventEmitter implements WritableStreamInterface
88
{
99
public $closed = false;
10-
private $pipeSource;
11-
12-
public function __construct()
13-
{
14-
$this->on('pipe', array($this, 'handlePipeEvent'));
15-
}
16-
17-
public function handlePipeEvent($source)
18-
{
19-
$this->pipeSource = $source;
20-
}
2110

2211
public function write($data)
2312
{

0 commit comments

Comments
 (0)