X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=src%2FCore%2FWorker.php;h=0f4a527f6bd21aa95cb9f387ba1f29e5264aab73;hb=58c8959da0ece9a23966b315310a3962542bc7f4;hp=111cd85f59f33cecf0dbbdaaae1d1e4f545c1272;hpb=505a34d40e24295a60499d40b890683360b160d6;p=friendica.git diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 111cd85f59..0f4a527f6b 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -22,10 +22,13 @@ use Friendica\Util\Network; */ class Worker { - const STATE_STARTUP = 1; - const STATE_SHORT_LOOP = 2; - const STATE_REFETCH = 3; - const STATE_LONG_LOOP = 4; + const STATE_STARTUP = 1; // Worker is in startup. This takes most time. + const STATE_LONG_LOOP = 2; // Worker is processing the whole - long - loop. + const STATE_REFETCH = 3; // Worker had refetched jobs in the execution loop. + const STATE_SHORT_LOOP = 4; // Worker is processing preassigned jobs, thus saving much time. + + const FAST_COMMANDS = ['APDelivery', 'Delivery', 'CreateShadowEntry']; + private static $up_start; private static $db_duration = 0; @@ -102,6 +105,7 @@ class Worker // We fetch the next queue entry that is about to be executed while ($r = self::workerProcess()) { + $refetched = false; foreach ($r as $entry) { // Assure that the priority is an integer value $entry['priority'] = (int)$entry['priority']; @@ -112,39 +116,42 @@ class Worker return; } - // If possible we will fetch new jobs for this worker - if (!self::getWaitingJobForPID() && Lock::acquire('worker_process', 0)) { + // Trying to fetch new processes - but only once when successful + if (!$refetched && Lock::acquire('worker_process', 0)) { self::findWorkerProcesses(); Lock::release('worker_process'); self::$state = self::STATE_REFETCH; + $refetched = true; + } else { + self::$state = self::STATE_SHORT_LOOP; } } - if (self::$state != self::STATE_REFETCH) { + // To avoid the quitting of multiple workers only one worker at a time will execute the check + if (!self::getWaitingJobForPID()) { self::$state = self::STATE_LONG_LOOP; - } - // To avoid the quitting of multiple workers only one worker at a time will execute the check - if (Lock::acquire('worker', 0)) { + if (Lock::acquire('worker', 0)) { // Count active workers and compare them with a maximum value that depends on the load - if (self::tooMuchWorkers()) { - Logger::log('Active worker limit reached, quitting.', Logger::DEBUG); - Lock::release('worker'); - return; - } + if (self::tooMuchWorkers()) { + Logger::log('Active worker limit reached, quitting.', Logger::DEBUG); + Lock::release('worker'); + return; + } - // Check free memory - if ($a->isMinMemoryReached()) { - Logger::log('Memory limit reached, quitting.', Logger::DEBUG); + // Check free memory + if ($a->isMinMemoryReached()) { + Logger::log('Memory limit reached, quitting.', Logger::DEBUG); + Lock::release('worker'); + return; + } Lock::release('worker'); - return; } - Lock::release('worker'); } // Quit the worker once every cron interval if (time() > ($starttime + (Config::get('system', 'cron_interval') * 60))) { - Logger::log('Process lifetime reached, respawning.', Logger::DEBUG); + Logger::info('Process lifetime reached, respawning.'); self::spawnWorker(); return; } @@ -424,7 +431,6 @@ class Worker self::$db_duration_stat = 0; self::$db_duration_write = 0; self::$lock_duration = 0; - self::$state = self::STATE_SHORT_LOOP; if ($duration > 3600) { $logger->info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); @@ -611,7 +617,7 @@ class Worker */ private static function tooMuchWorkers() { - $queues = Config::get("system", "worker_queues", 4); + $queues = Config::get("system", "worker_queues", 10); $maxqueues = $queues; @@ -620,7 +626,7 @@ class Worker // Decrease the number of workers at higher load $load = System::currentLoad(); if ($load) { - $maxsysload = intval(Config::get("system", "maxloadavg", 50)); + $maxsysload = intval(Config::get("system", "maxloadavg", 20)); /* Default exponent 3 causes queues to rapidly decrease as load increases. * If you have 20 max queues at idle, then you get only 5 queues at 37.1% of $maxsysload. @@ -702,7 +708,7 @@ class Worker $processlist .= ' ('.implode(', ', $listitem).')'; - if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && self::entriesExists() && ($active >= $queues)) { + if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) { $top_priority = self::highestPriority(); $high_running = self::processWithPriorityActive($top_priority); @@ -715,7 +721,7 @@ class Worker Logger::log("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $waiting_processes . $processlist . " - maximum: " . $queues . "/" . $maxqueues, Logger::DEBUG); // Are there fewer workers running as possible? Then fork a new one. - if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) { + if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) { Logger::log("Active workers: ".$active."/".$queues." Fork a new worker.", Logger::DEBUG); if (Config::get('system', 'worker_daemon_mode', false)) { self::IPCSetJobState(true); @@ -780,23 +786,24 @@ class Worker return []; } - if ($priority <= PRIORITY_MEDIUM) { - $limit = Config::get('system', 'worker_fetch_limit', 1); - } else { - $limit = 1; - } + $limit = Config::get('system', 'worker_fetch_limit', 1); $ids = []; $stamp = (float)microtime(true); $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()]; - $tasks = DBA::select('workerqueue', ['id'], $condition, ['limit' => $limit, 'order' => ['created']]); + $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['created']]); self::$db_duration += (microtime(true) - $stamp); while ($task = DBA::fetch($tasks)) { $ids[] = $task['id']; + // Only continue that loop while we are storing commands that can be processed quickly + $command = json_decode($task['parameter'])[0]; + if (!in_array($command, self::FAST_COMMANDS)) { + break; + } } DBA::close($tasks); - Logger::info('Found:', ['id' => $ids, 'priority' => $priority]); + Logger::info('Found:', ['priority' => $priority, 'id' => $ids]); return $ids; } @@ -887,15 +894,22 @@ class Worker // If there is no result we check without priority limit if (empty($ids)) { + $limit = Config::get('system', 'worker_fetch_limit', 1); + $stamp = (float)microtime(true); $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()]; - $result = DBA::select('workerqueue', ['id'], $condition, ['limit' => 1, 'order' => ['priority', 'created']]); + $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]); self::$db_duration += (microtime(true) - $stamp); - while ($id = DBA::fetch($result)) { - $ids[] = $id["id"]; + while ($task = DBA::fetch($tasks)) { + $ids[] = $task['id']; + // Only continue that loop while we are storing commands that can be processed quickly + $command = json_decode($task['parameter'])[0]; + if (!in_array($command, self::FAST_COMMANDS)) { + break; + } } - DBA::close($result); + DBA::close($tasks); } if (!empty($ids)) {