if (DI::config()->get('system', 'worker_multiple_fetch')) {
$pids = [];
- $worker_pids = self::getWorkerPIDList();
- foreach ($worker_pids as $pid => $count) {
+ foreach (self::getWorkerPIDList() as $pid => $count) {
if ($count <= $fetch_limit) {
$pids[] = $pid;
}
DBA::close($tasks);
}
- if (!empty($ids)) {
- $worker = [];
- foreach (array_unique($ids) as $id) {
- $pid = next($pids);
- if (!$pid) {
- $pid = reset($pids);
- }
- $worker[$pid][] = $id;
- }
+ if (empty($ids)) {
+ return;
+ }
- $stamp = (float)microtime(true);
- foreach ($worker as $worker_pid => $worker_ids) {
- Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
- DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
- ['id' => $worker_ids, 'done' => false, 'pid' => 0]);
+ // Assign the task ids to the workers
+ $worker = [];
+ foreach (array_unique($ids) as $id) {
+ $pid = next($pids);
+ if (!$pid) {
+ $pid = reset($pids);
}
- self::$db_duration += (microtime(true) - $stamp);
- self::$db_duration_write += (microtime(true) - $stamp);
+ $worker[$pid][] = $id;
}
- return !empty($ids);
+ $stamp = (float)microtime(true);
+ foreach ($worker as $worker_pid => $worker_ids) {
+ Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
+ DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
+ ['id' => $worker_ids, 'done' => false, 'pid' => 0]);
+ }
+ self::$db_duration += (microtime(true) - $stamp);
+ self::$db_duration_write += (microtime(true) - $stamp);
}
/**
}
self::$lock_duration += (microtime(true) - $stamp);
- $found = self::findWorkerProcesses();
+ self::findWorkerProcesses();
DI::lock()->release(self::LOCK_PROCESS);
- if ($found) {
- $stamp = (float)microtime(true);
- $r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]);
- self::$db_duration += (microtime(true) - $stamp);
- return DBA::toArray($r);
- }
- return false;
+ return self::getWaitingJobForPID();
}
/**