return $count;
}
+ /**
+ * Returns the number of active worker processes
+ *
+ * @return array List of worker process ids
+ * @throws \Exception
+ */
+ private static function getWorkerPIDList()
+ {
+ $ids = [];
+ $stamp = (float)microtime(true);
+
+ $queues = DBA::p("SELECT `process`.`pid`, COUNT(`workerqueue`.`pid`) AS `entries` FROM `process`
+ LEFT JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `workerqueue`.`done`
+ GROUP BY `process`.`pid`");
+ while ($queue = DBA::fetch($queues)) {
+ $ids[$queue['pid']] = $queue['entries'];
+ }
+ DBA::close($queues);
+
+ self::$db_duration += (microtime(true) - $stamp);
+ return $ids;
+ }
+
/**
* Returns waiting jobs for the current process id
*
/**
* Returns the next jobs that should be executed
- *
+ * @param int $limit
* @return array array with next jobs
* @throws \Exception
*/
- private static function nextProcess()
+ private static function nextProcess(int $limit)
{
$priority = self::nextPriority();
if (empty($priority)) {
return [];
}
- $limit = DI::config()->get('system', 'worker_fetch_limit', 1);
-
$ids = [];
$stamp = (float)microtime(true);
$condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
*/
private static function findWorkerProcesses()
{
- $mypid = getmypid();
-
- $ids = self::nextProcess();
+ $fetch_limit = DI::config()->get('system', 'worker_fetch_limit', 1);
+
+ if (DI::config()->get('system', 'worker_multiple_fetch')) {
+ $pids = [];
+ $worker_pids = self::getWorkerPIDList();
+ foreach ($worker_pids as $pid => $count) {
+ if ($count <= $fetch_limit) {
+ $pids[] = $pid;
+ }
+ }
+ if (empty($pids)) {
+ return;
+ }
+ $limit = $fetch_limit * count($pids);
+ } else {
+ $pids = [getmypid()];
+ $limit = $fetch_limit;
+ }
- // If there is no result we check without priority limit
- if (empty($ids)) {
- $limit = DI::config()->get('system', 'worker_fetch_limit', 1);
+ $ids = self::nextProcess($limit);
+ $limit -= count($ids);
+ // If there is not enough results we check without priority limit
+ if ($limit > 0) {
$stamp = (float)microtime(true);
$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
$tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]);
}
if (!empty($ids)) {
+ $worker = [];
+ foreach (array_unique($ids) as $id) {
+ $pid = next($pids);
+ if (!$pid) {
+ $pid = reset($pids);
+ }
+ $worker[$pid][] = $id;
+ }
+
$stamp = (float)microtime(true);
- $condition = ['id' => $ids, 'done' => false, 'pid' => 0];
- DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $condition);
+ foreach ($worker as $worker_pid => $worker_ids) {
+ Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
+ DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
+ ['id' => $worker_ids, 'done' => false, 'pid' => 0]);
+ }
self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
}
// Setting 0 would allow maximum worker queues at all times, which is not recommended.
'worker_load_exponent' => 3,
+ // worker_multiple_fetch (Boolean)
+ // When activated, the worker fetches jobs for multiple workers (not only for itself).
+ // This is an experimental setting without knowing the performance impact.
+ 'worker_multiple_fetch' => false,
+
// worker_defer_limit (Integer)
// Per default the systems tries delivering for 15 times before dropping it.
'worker_defer_limit' => 15,