Skip to content

Commit 0994794

Browse files
committed
Fix getMessageCount to also count delayed queue messages
- adding up messages queued in delay set with main queue in getMessageCount() method.
1 parent f0d03fc commit 0994794

File tree

2 files changed

+36
-4
lines changed

2 files changed

+36
-4
lines changed

src/Transport/RedisTransport.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,10 @@ private function isDebounceStampExist(Envelope $env): bool
162162

163163
public function getMessageCount(): int {
164164
$this->connect();
165-
return (int) $this->redis->lLen($this->queue);
165+
$pipe = $this->redis->multi(Redis::PIPELINE);
166+
$pipe->lLen($this->queue);
167+
$pipe->zCount($this->getDelayedSetName(), '-inf', '+inf');
168+
return array_sum(array_map('intval', $pipe->exec()));
166169
}
167170

168171
private function connect(): void {

tests/Feature/TransportTest.php

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Symfony\Component\Messenger\Stamp\BusNameStamp;
1212
use Symfony\Component\Messenger\Stamp\DelayStamp;
1313
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
14+
use Symfony\Component\Messenger\Stamp\StampInterface;
1415
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1516
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
1617
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
@@ -153,6 +154,24 @@ public function can_debounce_messages_and_wait_for_receiving() {
153154
$this->then_the_queue_has_size(0);
154155
}
155156

157+
/**
158+
* @test
159+
* @dataProvider provide_get_message_count_for_stamps
160+
*/
161+
public function get_message_count_for_stamps(StampInterface $stamp, int $count) {
162+
$this->given_there_is_a_wrapped_message();
163+
$this->given_there_is_a_stamp_on_the_message($stamp);
164+
$this->when_the_message_is_sent_on_the_transport();
165+
$this->then_message_count_in_the_queue($count);
166+
}
167+
168+
public function provide_get_message_count_for_stamps()
169+
{
170+
yield 'unique stamp' => [new UniqueStamp(1), 1];
171+
yield 'delay stamp' => [new DelayStamp(100), 1];
172+
yield 'debounce stamp' => [new DebounceStamp(100, 1), 1];
173+
}
174+
156175
private function given_there_is_a_message_on_the_queue_with_legacy_serialization() {
157176
$this->redis->lPush('messenger', json_encode([
158177
json_encode(['id' => null]),
@@ -165,15 +184,20 @@ private function given_there_is_a_wrapped_message() {
165184
}
166185

167186
private function given_there_is_a_unique_stamp_on_the_message(?string $id = null) {
168-
$this->envelope = $this->envelope->with(new UniqueStamp($id));
187+
$this->given_there_is_a_stamp_on_the_message(new UniqueStamp($id));
169188
}
170189

171190
private function given_there_is_a_delay_stamp_on_the_message(int $delayMs) {
172-
$this->envelope = $this->envelope->with(new DelayStamp($delayMs));
191+
$this->given_there_is_a_stamp_on_the_message(new DelayStamp($delayMs));
173192
}
174193

175194
private function given_there_is_a_debounce_stamp_on_the_message(int $delay, ?string $id = null): void {
176-
$this->envelope = $this->envelope->with(new DebounceStamp($delay, $id));
195+
$this->given_there_is_a_stamp_on_the_message(new DebounceStamp($delay, $id));
196+
}
197+
198+
private function given_there_is_a_stamp_on_the_message(StampInterface $stamp): void
199+
{
200+
$this->envelope = $this->envelope->with($stamp);
177201
}
178202

179203
public function given_there_is_a_wrapped_message_in_the_queue() {
@@ -282,4 +306,9 @@ private function then_the_redis_transport_connect_params_use_tls() {
282306
TransportTest::assertEquals(['tls://redis', 6379], $this->connectParams);
283307
}, $this->transport, RedisTransport::class)();
284308
}
309+
310+
private function then_message_count_in_the_queue(int $count): void
311+
{
312+
$this->assertEquals($count, $this->transport->getMessageCount());
313+
}
285314
}

0 commit comments

Comments
 (0)