Skip to content

Commit 0fd93fe

Browse files
author
RJ Garcia
committed
Fixing issue where transport didn't ack if message has additional stamps
- If an envelope was wrapped with sendable stamps from the moment they were received till acked/received, then the system would be unable to remove the item from the processing queue - Added docker support for php to make testing easier Signed-off-by: RJ Garcia <[email protected]>
1 parent b834ae1 commit 0fd93fe

File tree

6 files changed

+43
-13
lines changed

6 files changed

+43
-13
lines changed

Dockerfile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
FROM php:7.2-cli
2+
3+
RUN apt-get update && apt-get install -y git zip
4+
5+
COPY --from=mlocati/php-extension-installer /usr/bin/install-php-extensions /usr/bin/
6+
RUN install-php-extensions redis pcntl
7+
8+
COPY --from=composer:1.9.1 /usr/bin/composer /usr/bin/composer

docker-compose.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
version: '3'
22

33
services:
4+
php:
5+
build: .
6+
command: "tail -f /dev/null"
7+
working_dir: /var/www/html
8+
volumes:
9+
- ./:/var/www/html
410
redis:
511
image: redis
612
environment: { TERM: xterm }

phpunit.xml.dist

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<php>
1010
<ini name="error_reporting" value="-1" />
1111
<env name="SHELL_VERBOSITY" value="-1" />
12-
<env name="REDIS_DSN" value="redis://localhost:6379?queue=messenger"/>
12+
<env name="REDIS_DSN" value="redis://redis:6379?queue=messenger"/>
1313
</php>
1414
<testsuites>
1515
<testsuite name="feature">

src/Transport/RedisTransport.php

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Redis;
77
use Symfony\Component\Messenger\Envelope;
88
use Symfony\Component\Messenger\Stamp\DelayStamp;
9+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
910
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1011
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1112
use Symfony\Component\Messenger\Transport\TransportInterface;
@@ -65,10 +66,11 @@ function(Redis $conn) use ($query, $parsedUrl, $options) {
6566
);
6667
}
6768

69+
/** @return Envelope[] */
6870
public function get(): iterable {
6971
$this->connect();
7072
$this->redis->clearLastError();
71-
$message = $this->redis->eval($this->popLuaScript(), [
73+
$encodedMessage = $this->redis->eval($this->popLuaScript(), [
7274
$this->getUniqueSetName(),
7375
$this->getDelayedSetName(),
7476
$this->queue,
@@ -80,24 +82,26 @@ public function get(): iterable {
8082
throw new TransportException('Failed to retrieve message from queue. Redis Error: ' . $this->redis->getLastError());
8183
}
8284

83-
if (!$message) {
85+
if (!$encodedMessage) {
8486
return [];
8587
}
8688

87-
$res = json_decode($message, true);
89+
$res = json_decode($encodedMessage, true);
8890
$message = isset($res[0], $res[1]) ? ['body' => $res[0], 'headers' => $res[1]] : $res;
8991
$envelope = $this->serializer->decode($message);
90-
return [$envelope];
92+
return [$envelope->with(new TransportMessageIdStamp($encodedMessage))];
9193
}
9294

9395
public function ack(Envelope $env): void {
9496
$this->connect();
95-
$this->clearMessageFromProcessingQueue($this->redis, $this->encodeEnvelope($env));
97+
$transportIdStamp = $env->last(TransportMessageIdStamp::class);
98+
$this->clearMessageFromProcessingQueue($this->redis, $transportIdStamp ? $transportIdStamp->getId() : $this->encodeEnvelope($env));
9699
}
97100

98101
public function reject(Envelope $env): void {
99102
$this->connect();
100-
$this->clearMessageFromProcessingQueue($this->redis, $this->encodeEnvelope($env));
103+
$transportIdStamp = $env->last(TransportMessageIdStamp::class);
104+
$this->clearMessageFromProcessingQueue($this->redis, $transportIdStamp ? $transportIdStamp->getId() : $this->encodeEnvelope($env));
101105
}
102106

103107
public function send(Envelope $env): Envelope {

tests/Feature/Fixtures/redis-config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ framework:
33
enabled: true
44
messenger:
55
transports:
6-
krak_redis: 'redis://localhost:6379?queue=messenger'
6+
krak_redis: 'redis://redis:6379?queue=messenger'
77
sf_redis:
8-
dsn: 'redis://localhost:6379'
8+
dsn: 'redis://redis:6379'
99
options: { use_krak_redis: false }
1010
routing:
1111
'Krak\SymfonyMessengerRedis\Tests\Feature\Fixtures\KrakRedisMessage': krak_redis

tests/Feature/TransportTest.php

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
use Krak\SymfonyMessengerRedis\Transport\RedisTransport;
77
use PHPUnit\Framework\TestCase;
88
use Symfony\Component\Messenger\Envelope;
9+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
910
use Symfony\Component\Messenger\Stamp\DelayStamp;
11+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
1012
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1113
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
1214

@@ -22,7 +24,7 @@ protected function setUp() {
2224
parent::setUp();
2325
$this->transport = RedisTransport::fromDsn(Serializer::create(), getenv('REDIS_DSN'));
2426
$this->redis = new \Redis();
25-
$this->redis->connect('127.0.0.1');
27+
$this->redis->connect('redis');
2628
$this->redis->flushAll();
2729
}
2830

@@ -40,6 +42,15 @@ public function can_ack_a_message() {
4042
$this->then_the_queues_are_empty();
4143
}
4244

45+
/** @test */
46+
public function can_ack_a_message_with_new_stamps() {
47+
$this->given_there_is_a_wrapped_message();
48+
$this->when_the_message_is_sent_received_and_acked(function(Envelope $env) {
49+
return $env->with(new BusNameStamp('bus'));
50+
});
51+
$this->then_the_queues_are_empty();
52+
}
53+
4354
/** @test */
4455
public function can_reject_a_message() {
4556
$this->given_there_is_a_wrapped_message();
@@ -116,9 +127,10 @@ private function when_the_message_is_sent_on_the_transport(int $numberOfTimes =
116127
}
117128
}
118129

119-
private function when_the_message_is_sent_received_and_acked() {
130+
private function when_the_message_is_sent_received_and_acked(callable $stampEnv = null) {
120131
$this->transport->send($this->envelope);
121132
[$env] = $this->transport->get();
133+
$env = $stampEnv ? $stampEnv($env) : $env;
122134
$this->transport->ack($env);
123135
}
124136

@@ -140,8 +152,8 @@ private function then_the_queue_contains_the_message() {
140152
$this->assertEquals(
141153
<<<'CONTENT'
142154
{"body":"{\"id\":null}","headers":{"type":"Krak\\SymfonyMessengerRedis\\Tests\\Feature\\Fixtures\\KrakRedisMessage","Content-Type":"application\/json"},"uniqueId":null}
143-
CONTENT,
144-
$encodedMessage
155+
CONTENT
156+
, $encodedMessage
145157
);
146158
}
147159

0 commit comments

Comments
 (0)