*/
class Worker
{
- const STATE_STARTUP = 1;
- const STATE_SHORT_LOOP = 2;
- const STATE_REFETCH = 3;
- const STATE_LONG_LOOP = 4;
+ const STATE_STARTUP = 1; // Worker is in startup. This takes most time.
+ const STATE_LONG_LOOP = 2; // Worker is processing the whole - long - loop.
+ const STATE_REFETCH = 3; // Worker had refetched jobs in the execution loop.
+ const STATE_SHORT_LOOP = 4; // Worker is processing preassigned jobs, thus saving much time.
+
+ const FAST_COMMANDS = ['APDelivery', 'Delivery', 'CreateShadowEntry'];
+
private static $up_start;
private static $db_duration = 0;
// We fetch the next queue entry that is about to be executed
while ($r = self::workerProcess()) {
+ $refetched = false;
foreach ($r as $entry) {
// Assure that the priority is an integer value
$entry['priority'] = (int)$entry['priority'];
return;
}
- // If possible we will fetch new jobs for this worker
- if (!self::getWaitingJobForPID() && Lock::acquire('worker_process', 0)) {
+ // Trying to fetch new processes - but only once when successful
+ if (!$refetched && Lock::acquire('worker_process', 0)) {
self::findWorkerProcesses();
Lock::release('worker_process');
self::$state = self::STATE_REFETCH;
+ $refetched = true;
+ } else {
+ self::$state = self::STATE_SHORT_LOOP;
}
}
- if (self::$state != self::STATE_REFETCH) {
+ // To avoid the quitting of multiple workers only one worker at a time will execute the check
+ if (!self::getWaitingJobForPID()) {
self::$state = self::STATE_LONG_LOOP;
- }
- // To avoid the quitting of multiple workers only one worker at a time will execute the check
- if (Lock::acquire('worker', 0)) {
+ if (Lock::acquire('worker', 0)) {
// Count active workers and compare them with a maximum value that depends on the load
- if (self::tooMuchWorkers()) {
- Logger::log('Active worker limit reached, quitting.', Logger::DEBUG);
- Lock::release('worker');
- return;
- }
+ if (self::tooMuchWorkers()) {
+ Logger::log('Active worker limit reached, quitting.', Logger::DEBUG);
+ Lock::release('worker');
+ return;
+ }
- // Check free memory
- if ($a->isMinMemoryReached()) {
- Logger::log('Memory limit reached, quitting.', Logger::DEBUG);
+ // Check free memory
+ if ($a->isMinMemoryReached()) {
+ Logger::log('Memory limit reached, quitting.', Logger::DEBUG);
+ Lock::release('worker');
+ return;
+ }
Lock::release('worker');
- return;
}
- Lock::release('worker');
}
// Quit the worker once every cron interval
if (time() > ($starttime + (Config::get('system', 'cron_interval') * 60))) {
- Logger::log('Process lifetime reached, respawning.', Logger::DEBUG);
+ Logger::info('Process lifetime reached, respawning.');
self::spawnWorker();
return;
}
self::$db_duration_stat = 0;
self::$db_duration_write = 0;
self::$lock_duration = 0;
- self::$state = self::STATE_SHORT_LOOP;
if ($duration > 3600) {
- $logger->info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+ $logger->info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
} elseif ($duration > 600) {
- $logger->info('Longer than 10 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+ $logger->info('Longer than 10 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
} elseif ($duration > 300) {
- $logger->info('Longer than 5 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+ $logger->info('Longer than 5 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
} elseif ($duration > 120) {
- $logger->info('Longer than 2 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+ $logger->info('Longer than 2 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
}
- $workerLogger->info('Process done. ', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => number_format($duration, 4)]);
+ $workerLogger->info('Process done.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration, 3)]);
$a->getProfiler()->saveLog($a->getLogger(), "ID " . $queue["id"] . ": " . $funcname);
*/
private static function tooMuchWorkers()
{
- $queues = Config::get("system", "worker_queues", 4);
+ $queues = Config::get("system", "worker_queues", 10);
$maxqueues = $queues;
// Decrease the number of workers at higher load
$load = System::currentLoad();
if ($load) {
- $maxsysload = intval(Config::get("system", "maxloadavg", 50));
+ $maxsysload = intval(Config::get("system", "maxloadavg", 20));
/* Default exponent 3 causes queues to rapidly decrease as load increases.
* If you have 20 max queues at idle, then you get only 5 queues at 37.1% of $maxsysload.
$processlist .= ' ('.implode(', ', $listitem).')';
- if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && self::entriesExists() && ($active >= $queues)) {
+ if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) {
$top_priority = self::highestPriority();
$high_running = self::processWithPriorityActive($top_priority);
Logger::log("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $waiting_processes . $processlist . " - maximum: " . $queues . "/" . $maxqueues, Logger::DEBUG);
// Are there fewer workers running as possible? Then fork a new one.
- if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) {
+ if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) {
Logger::log("Active workers: ".$active."/".$queues." Fork a new worker.", Logger::DEBUG);
if (Config::get('system', 'worker_daemon_mode', false)) {
self::IPCSetJobState(true);
return [];
}
- if ($priority <= PRIORITY_MEDIUM) {
- $limit = Config::get('system', 'worker_fetch_limit', 1);
- } else {
- $limit = 1;
- }
+ $limit = Config::get('system', 'worker_fetch_limit', 1);
$ids = [];
$stamp = (float)microtime(true);
$condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
- $tasks = DBA::select('workerqueue', ['id'], $condition, ['limit' => $limit, 'order' => ['created']]);
+ $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['created']]);
self::$db_duration += (microtime(true) - $stamp);
while ($task = DBA::fetch($tasks)) {
$ids[] = $task['id'];
+ // Only continue that loop while we are storing commands that can be processed quickly
+ $command = json_decode($task['parameter'])[0];
+ if (!in_array($command, self::FAST_COMMANDS)) {
+ break;
+ }
}
DBA::close($tasks);
- Logger::info('Found:', ['id' => $ids, 'priority' => $priority]);
+ Logger::info('Found:', ['priority' => $priority, 'id' => $ids]);
return $ids;
}
// If there is no result we check without priority limit
if (empty($ids)) {
+ $limit = Config::get('system', 'worker_fetch_limit', 1);
+
$stamp = (float)microtime(true);
$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
- $result = DBA::select('workerqueue', ['id'], $condition, ['limit' => 1, 'order' => ['priority', 'created']]);
+ $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]);
self::$db_duration += (microtime(true) - $stamp);
- while ($id = DBA::fetch($result)) {
- $ids[] = $id["id"];
+ while ($task = DBA::fetch($tasks)) {
+ $ids[] = $task['id'];
+ // Only continue that loop while we are storing commands that can be processed quickly
+ $command = json_decode($task['parameter'])[0];
+ if (!in_array($command, self::FAST_COMMANDS)) {
+ break;
+ }
}
- DBA::close($result);
+ DBA::close($tasks);
}
if (!empty($ids)) {