Skip to content

Commit 38f5659

Browse files
authored
Add maxAttemptsExhausted event. (#436)
* Add event.
1 parent db803df commit 38f5659

File tree

5 files changed

+109
-2
lines changed

5 files changed

+109
-2
lines changed

docs/sections/misc.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,42 @@ $query = $queuedJobsTable->find('queued')->...;
4949
```
5050
This includes also failed ones if not filtered further using `where()` conditions.
5151

52+
## Events
53+
The Queue plugin dispatches events to allow you to hook into the queue processing lifecycle.
54+
55+
### Queue.Job.maxAttemptsExhausted
56+
This event is triggered when a job has failed and exhausted all of its configured retry attempts.
57+
58+
```php
59+
use Cake\Event\EventInterface;
60+
use Cake\Event\EventManager;
61+
use Cake\Log\Log;
62+
63+
EventManager::instance()->on('Queue.Job.maxAttemptsExhausted', function (EventInterface $event) {
64+
$job = $event->getData('job');
65+
$failureMessage = $event->getData('failureMessage');
66+
67+
// Log the permanent failure
68+
Log::error(sprintf(
69+
'Job %d (%s) permanently failed after %d attempts: %s',
70+
$job->id,
71+
$job->job_task,
72+
$job->attempts,
73+
$failureMessage
74+
));
75+
76+
// Send notification email
77+
//$mailer->send('jobFailed', [$job, $failureMessage]);
78+
79+
// Or push to external monitoring service
80+
//$monitoring->notifyJobFailure($job);
81+
});
82+
```
83+
84+
The event data contains:
85+
- `job`: The `QueuedJob` entity that failed
86+
- `failureMessage`: The error message from the last failure
87+
5288
## Notes
5389

5490
`<TaskName>` is the complete class name without the Task suffix (e.g. Example or PluginName.Example).

docs/sections/upgrading.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Upgrading from older versions
22

3+
## New Features
4+
### Events for job lifecycle (v8.4+)
5+
A new event `Queue.Job.maxAttemptsExhausted` is now dispatched when a job has failed all of its configured retry attempts. This allows you to implement custom handling for permanently failed jobs, such as sending notifications or logging to external services. See the [Events section](misc.md#events) for usage details.
6+
37
## Coming from v7 to v8?
48
- Make sure you ran `bin/cake migrations migrate -p Queue` to migrate DB schema for all previous migrations before upgrading to v8.
59
- Once upgraded also run it once more, there should be now only 1 migration left.

src/Model/Table/QueuedJobsTable.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -977,10 +977,15 @@ public function cleanOldJobs(): int {
977977
public function getFailedStatus(QueuedJob $queuedTask, array $taskConfiguration): string {
978978
$failureMessageRequeued = 'requeued';
979979

980-
$queuedTaskName = 'Queue' . $queuedTask->job_task;
980+
$queuedTaskName = $queuedTask->job_task;
981981
if (empty($taskConfiguration[$queuedTaskName])) {
982-
return $failureMessageRequeued;
982+
// Try with 'Queue' prefix for backward compatibility
983+
$queuedTaskName = 'Queue' . $queuedTask->job_task;
984+
if (empty($taskConfiguration[$queuedTaskName])) {
985+
return $failureMessageRequeued;
986+
}
983987
}
988+
984989
$retries = $taskConfiguration[$queuedTaskName]['retries'];
985990
if ($queuedTask->attempts <= $retries) {
986991
return $failureMessageRequeued;

src/Queue/Processor.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
use Cake\Core\Configure;
88
use Cake\Core\ContainerInterface;
99
use Cake\Datasource\Exception\RecordNotFoundException;
10+
use Cake\Event\Event;
11+
use Cake\Event\EventManager;
1012
use Cake\ORM\Exception\PersistenceFailedException;
1113
use Cake\ORM\Locator\LocatorAwareTrait;
1214
use Cake\Utility\Text;
@@ -230,6 +232,15 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
230232
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid);
231233
$this->io->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->attempts . '.');
232234

235+
// Dispatch event when job has exhausted all retries
236+
if ($failedStatus === 'aborted') {
237+
$event = new Event('Queue.Job.maxAttemptsExhausted', $this, [
238+
'job' => $queuedJob,
239+
'failureMessage' => $failureMessage,
240+
]);
241+
EventManager::instance()->dispatch($event);
242+
}
243+
233244
return;
234245
}
235246

tests/TestCase/Queue/ProcessorTest.php

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@
77
use Cake\Console\ConsoleIo;
88
use Cake\Core\Configure;
99
use Cake\Datasource\ConnectionManager;
10+
use Cake\Event\EventList;
11+
use Cake\Event\EventManager;
1012
use Cake\TestSuite\TestCase;
1113
use Psr\Log\NullLogger;
1214
use Queue\Console\Io;
1315
use Queue\Queue\Processor;
16+
use Queue\Queue\Task\RetryExampleTask;
17+
use RuntimeException;
1418
use Shim\TestSuite\ConsoleOutput;
1519
use Shim\TestSuite\TestTrait;
1620

@@ -111,4 +115,51 @@ protected function _needsConnection() {
111115
$this->skipIf($skip, 'Only Mysql/Postgres is working yet for this.');
112116
}
113117

118+
/**
119+
* @return void
120+
*/
121+
public function testMaxAttemptsExhaustedEvent() {
122+
// Set up event tracking
123+
$eventList = new EventList();
124+
EventManager::instance()->setEventList($eventList);
125+
126+
// Create a job that will fail
127+
$QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs');
128+
$job = $QueuedJobs->createJob('Queue.RetryExample', [], ['priority' => 1]);
129+
130+
// Manually set attempts to 5 (simulating previous failed attempts)
131+
// The default RetryExampleTask has retries=4, so 5 attempts exceeds it
132+
$job->attempts = 5;
133+
$QueuedJobs->saveOrFail($job);
134+
135+
// Create processor
136+
$out = new ConsoleOutput();
137+
$err = new ConsoleOutput();
138+
$processor = new Processor(new Io(new ConsoleIo($out, $err)), new NullLogger());
139+
140+
// Create a mock task that always fails
141+
$mockTask = $this->getMockBuilder(RetryExampleTask::class)
142+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
143+
->onlyMethods(['run'])
144+
->getMock();
145+
$mockTask->method('run')->willThrowException(new RuntimeException('Task failed'));
146+
147+
// Mock only the loadTask method
148+
$processor = $this->getMockBuilder(Processor::class)
149+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
150+
->onlyMethods(['loadTask'])
151+
->getMock();
152+
$processor->method('loadTask')->willReturn($mockTask);
153+
154+
// Run the job (it will fail and should trigger the event)
155+
$this->invokeMethod($processor, 'runJob', [$job, 'test-pid']);
156+
157+
// Check that the event was dispatched
158+
$this->assertEventFired('Queue.Job.maxAttemptsExhausted');
159+
160+
// Verify event data
161+
// The event was fired successfully (assertEventFired passed)
162+
// We don't need to check the event data again since assertEventFired confirms it was fired
163+
}
164+
114165
}

0 commit comments

Comments
 (0)