Skip to content

Commit 0d61cb1

Browse files
GustavoPeixotoGustavo
andauthored
fixed runningJobs modified condition (#433)
* fixed runningJos modified condition * fixed worker timeout * fixed cs errors * fixed cleanEndedProcesses * fixed cleanEndedProcesses * fixed cleanEndedProcesses * fixed cleanEndedProcesses * fixed cleanEndedProcesses * fixed cleanEndedProcesses --------- Co-authored-by: Gustavo <[email protected]>
1 parent 1f74a03 commit 0d61cb1

File tree

3 files changed

+13
-5
lines changed

3 files changed

+13
-5
lines changed

src/Model/Table/QueueProcessesTable.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Queue\Model\Table;
44

55
use Cake\Core\Configure;
6+
use Cake\Database\Expression\QueryExpression;
67
use Cake\I18n\FrozenTime;
78
use Cake\ORM\Table;
89
use Cake\ORM\TableRegistry;
@@ -189,10 +190,12 @@ public function remove(string $pid): void {
189190
* @return int
190191
*/
191192
public function cleanEndedProcesses(): int {
192-
$timeout = Config::defaultworkertimeout();
193-
$thresholdTime = (new FrozenTime())->subSeconds($timeout);
193+
$activeProcesses = $this->findActive()->select(['id']);
194+
$ids = $activeProcesses->all()->extract('id')->toArray();
194195

195-
return $this->deleteAll(['modified <' => $thresholdTime]);
196+
return $this->deleteAll(function (QueryExpression $exp) use ($ids): QueryExpression {
197+
return $exp->notIn('id', count($ids) > 0 ? $ids : [-1]);
198+
});
196199
}
197200

198201
/**

src/Model/Table/QueuedJobsTable.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ public function requestJob(array $tasks, array $groups = [], array $types = [])
521521
$constraintJobs = array_keys($costConstraints + $uniqueConstraints);
522522
$runningJobs = $this->find('queued')
523523
->contain(['WorkerProcesses'])
524-
->where(['QueuedJobs.job_task IN' => $constraintJobs, 'QueuedJobs.workerkey IS NOT' => null, 'QueuedJobs.workerkey !=' => $this->_key, 'WorkerProcesses.modified >' => Config::defaultworkertimeout()])
524+
->where(['QueuedJobs.job_task IN' => $constraintJobs, 'QueuedJobs.workerkey IS NOT' => null, 'QueuedJobs.workerkey !=' => $this->_key, 'WorkerProcesses.modified >' => (new FrozenTime())->subSeconds(Config::defaultworkertimeout())])
525525
->all()
526526
->toArray();
527527
}

src/Queue/Config.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@ class Config {
1515
* @return int
1616
*/
1717
public static function defaultworkertimeout() {
18-
return Configure::read('Queue.defaultworkertimeout', 600); // 10min
18+
$timeout = Configure::read('Queue.defaultworkertimeout', 600); // 10min
19+
if ($timeout <= 0) {
20+
throw new InvalidArgumentException('Queue.defaultworkertimeout is less or eqaul than zero. Indefinite running of workers is not supported.');
21+
}
22+
23+
return $timeout;
1924
}
2025

2126
/**

0 commit comments

Comments
 (0)