Skip to content

Commit 950d5ac

Browse files
authored
Merge pull request #15 from xp-forge/feature/watch
Change streams: Watch databases and collections for changes
2 parents 55f9125 + 1279d41 commit 950d5ac

File tree

4 files changed

+74
-2
lines changed

4 files changed

+74
-2
lines changed

src/main/php/com/mongodb/Collection.class.php

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<?php namespace com\mongodb;
22

33
use com\mongodb\io\Protocol;
4-
use com\mongodb\result\{Insert, Update, Delete, Cursor};
4+
use com\mongodb\result\{Insert, Update, Delete, Cursor, ChangeStream};
55

66
/**
77
* A collection inside a database.
@@ -195,4 +195,24 @@ public function aggregate(array $pipeline= [], Session $session= null): Cursor {
195195

196196
return new Cursor($this->proto, $session, $result['body']['cursor']);
197197
}
198+
199+
/**
200+
* Watch for changes in this collection
201+
*
202+
* @param [:var][] $pipeline
203+
* @param [:var] $options
204+
* @param ?com.mongodb.Session $session
205+
* @return com.mongodb.result.ChangeStream
206+
* @throws com.mongodb.Error
207+
*/
208+
public function watch(array $pipeline= [], array $options= [], Session $session= null): ChangeStream {
209+
array_unshift($pipeline, ['$changeStream' => (object)$options]);
210+
$result= $this->proto->read($session, [
211+
'aggregate' => $this->name,
212+
'pipeline' => $pipeline,
213+
'cursor' => (object)[],
214+
'$db' => $this->database,
215+
]);
216+
return new ChangeStream($this->proto, $session, $result['body']['cursor']);
217+
}
198218
}

src/main/php/com/mongodb/Database.class.php

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<?php namespace com\mongodb;
22

33
use com\mongodb\io\Protocol;
4-
use com\mongodb\result\Cursor;
4+
use com\mongodb\result\{Cursor, ChangeStream};
55

66
class Database {
77
private $proto, $name;
@@ -34,4 +34,24 @@ public function collections($session= null) {
3434
]);
3535
return new Cursor($this->proto, $session, $result['body']['cursor']);
3636
}
37+
38+
/**
39+
* Watch for changes in this database
40+
*
41+
* @param [:var][] $pipeline
42+
* @param [:var] $options
43+
* @param ?com.mongodb.Session $session
44+
* @return com.mongodb.result.ChangeStream
45+
* @throws com.mongodb.Error
46+
*/
47+
public function watch(array $pipeline= [], array $options= [], Session $session= null): ChangeStream {
48+
array_unshift($pipeline, ['$changeStream' => (object)$options]);
49+
$result= $this->proto->read($session, [
50+
'aggregate' => 1,
51+
'pipeline' => $pipeline,
52+
'cursor' => (object)[],
53+
'$db' => $this->name,
54+
]);
55+
return new ChangeStream($this->proto, $session, $result['body']['cursor']);
56+
}
3757
}

src/main/php/com/mongodb/MongoConnection.class.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
<?php namespace com\mongodb;
22

33
use com\mongodb\io\Protocol;
4+
use com\mongodb\result\ChangeStream;
45
use lang\{IllegalArgumentException, Value};
56
use peer\AuthenticationException;
67
use util\{Bytes, UUID, Objects};
@@ -106,6 +107,28 @@ public function databases($filter= null, $session= null) {
106107
}
107108
}
108109

110+
/**
111+
* Watch for changes in all databases.
112+
*
113+
* @param [:var][] $pipeline
114+
* @param [:var] $options
115+
* @param ?com.mongodb.Session $session
116+
* @return com.mongodb.result.ChangeStream
117+
* @throws com.mongodb.Error
118+
*/
119+
public function watch(array $pipeline= [], array $options= [], Session $session= null): ChangeStream {
120+
$this->proto->connect();
121+
122+
array_unshift($pipeline, ['$changeStream' => ['allChangesForCluster' => true] + $options]);
123+
$result= $this->proto->read($session, [
124+
'aggregate' => 1,
125+
'pipeline' => $pipeline,
126+
'cursor' => (object)[],
127+
'$db' => 'admin',
128+
]);
129+
return new ChangeStream($this->proto, $session, $result['body']['cursor']);
130+
}
131+
109132
/**
110133
* Close connection
111134
*
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?php namespace com\mongodb\result;
2+
3+
class ChangeStream extends Cursor {
4+
5+
/** @return ?[:var] */
6+
public function resumeToken() {
7+
return $this->current['postBatchResumeToken'] ?? null;
8+
}
9+
}

0 commit comments

Comments
 (0)