const STATE_REFETCH = 3; // Worker had refetched jobs in the execution loop.
const STATE_SHORT_LOOP = 4; // Worker is processing preassigned jobs, thus saving much time.
+ const FAST_COMMANDS = ['APDelivery', 'Delivery', 'CreateShadowEntry'];
+
+
private static $up_start;
private static $db_duration = 0;
private static $db_duration_count = 0;
return [];
}
- if ($priority <= PRIORITY_MEDIUM) {
- $limit = Config::get('system', 'worker_fetch_limit', 1);
- } else {
- $limit = 1;
- }
+ $limit = Config::get('system', 'worker_fetch_limit', 1);
$ids = [];
$stamp = (float)microtime(true);
$condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
- $tasks = DBA::select('workerqueue', ['id'], $condition, ['limit' => $limit, 'order' => ['created']]);
+ $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['created']]);
self::$db_duration += (microtime(true) - $stamp);
while ($task = DBA::fetch($tasks)) {
$ids[] = $task['id'];
+ // Only continue that loop while we are storing commands that can be processed quickly
+ $command = json_decode($task['parameter'])[0];
+ if (!in_array($command, self::FAST_COMMANDS)) {
+ break;
+ }
}
DBA::close($tasks);
- Logger::info('Found:', ['id' => $ids, 'priority' => $priority]);
+ Logger::info('Found:', ['priority' => $priority, 'id' => $ids]);
return $ids;
}
// If there is no result we check without priority limit
if (empty($ids)) {
+ $limit = Config::get('system', 'worker_fetch_limit', 1);
+
$stamp = (float)microtime(true);
$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
- $result = DBA::select('workerqueue', ['id'], $condition, ['limit' => 1, 'order' => ['priority', 'created']]);
+ $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]);
self::$db_duration += (microtime(true) - $stamp);
- while ($id = DBA::fetch($result)) {
- $ids[] = $id["id"];
+ while ($task = DBA::fetch($tasks)) {
+ $ids[] = $task['id'];
+ // Only continue that loop while we are storing commands that can be processed quickly
+ $command = json_decode($task['parameter'])[0];
+ if (!in_array($command, self::FAST_COMMANDS)) {
+ break;
+ }
}
- DBA::close($result);
+ DBA::close($tasks);
}
if (!empty($ids)) {