From 075638c0ae799a60f2b7595ad3c99145b71351e6 Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 4 Sep 2022 13:54:32 +0000 Subject: [PATCH] Pause the worker execution when the load is too high --- src/Core/System.php | 27 +++++++++ src/Core/Worker.php | 121 +++++++++++++++++++++---------------- static/defaults.config.php | 10 ++- 3 files changed, 105 insertions(+), 53 deletions(-) diff --git a/src/Core/System.php b/src/Core/System.php index dbff4bf3f5..eb27fa3517 100644 --- a/src/Core/System.php +++ b/src/Core/System.php @@ -435,6 +435,33 @@ class System return max($load_arr[0], $load_arr[1]); } + /** + * Fetch the load and number of processes + * + * @return array + */ + public static function getLoadAvg(): array + { + $content = file_get_contents('/proc/loadavg'); + if (empty($content)) { + $content = shell_exec('cat /proc/loadavg'); + } + if (empty($content)) { + return []; + } + + if (!preg_match("#([.\d]+)\s([.\d]+)\s([.\d]+)\s(\d+)/(\d+)#", $content, $matches)) { + return []; + } + return [ + 'average1' => (float)$matches[1], + 'average5' => (float)$matches[2], + 'average15' => (float)$matches[3], + 'runnable' => (float)$matches[4], + 'scheduled' => (float)$matches[5] + ]; + } + /** * Redirects to an external URL (fully qualified URL) * If you want to route relative to the current Friendica base, use App->internalRedirect() diff --git a/src/Core/Worker.php b/src/Core/Worker.php index f7ab2bc7c8..830e029832 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -291,7 +291,7 @@ class Worker return false; } - $file = str_replace(getcwd() . "/", "", $file, $count); + $file = str_replace(getcwd() . '/', '', $file, $count); if ($count != 1) { return false; } @@ -301,11 +301,11 @@ class Worker } $valid = false; - if (strpos($file, "include/") === 0) { + if (strpos($file, 'include/') === 0) { $valid = true; } - if (strpos($file, "addon/") === 0) { + if (strpos($file, 'addon/') === 0) { $valid = true; } @@ -327,19 +327,19 @@ class Worker // Quit when in maintenance if (DI::config()->get('system', 'maintenance', false, true)) { - Logger::notice("Maintenance mode - quit process", ['pid' => $mypid]); + Logger::notice('Maintenance mode - quit process', ['pid' => $mypid]); return false; } // Constantly check the number of parallel database processes if (DI::system()->isMaxProcessesReached()) { - Logger::warning("Max processes reached for process", ['pid' => $mypid]); + Logger::warning('Max processes reached for process', ['pid' => $mypid]); return false; } // Constantly check the number of available database connections to let the frontend be accessible at any time if (self::maxConnectionsReached()) { - Logger::warning("Max connection reached for process", ['pid' => $mypid]); + Logger::warning('Max connection reached for process', ['pid' => $mypid]); return false; } @@ -363,7 +363,7 @@ class Worker if (method_exists(sprintf('Friendica\Worker\%s', $include), 'execute')) { // We constantly update the "executed" date every minute to avoid being killed too soon if (!isset(self::$last_update)) { - self::$last_update = strtotime($queue["executed"]); + self::$last_update = strtotime($queue['executed']); } $age = (time() - self::$last_update) / 60; @@ -393,13 +393,13 @@ class Worker // The script could be provided as full path or only with the function name if ($include == basename($include)) { - $include = "include/".$include.".php"; + $include = 'include/' . $include . '.php'; } if (!self::validateInclude($include)) { - Logger::warning("Include file is not valid", ['file' => $argv[0]]); + Logger::warning('Include file is not valid', ['file' => $argv[0]]); $stamp = (float)microtime(true); - DBA::delete('workerqueue', ['id' => $queue["id"]]); + DBA::delete('workerqueue', ['id' => $queue['id']]); self::$db_duration = (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); return true; @@ -407,12 +407,12 @@ class Worker require_once $include; - $funcname = str_replace(".php", "", basename($argv[0]))."_run"; + $funcname = str_replace('.php', '', basename($argv[0])) .'_run'; if (function_exists($funcname)) { // We constantly update the "executed" date every minute to avoid being killed too soon if (!isset(self::$last_update)) { - self::$last_update = strtotime($queue["executed"]); + self::$last_update = strtotime($queue['executed']); } $age = (time() - self::$last_update) / 60; @@ -428,15 +428,15 @@ class Worker self::execFunction($queue, $funcname, $argv, false); $stamp = (float)microtime(true); - if (DBA::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) { + if (DBA::update('workerqueue', ['done' => true], ['id' => $queue['id']])) { DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow()); } self::$db_duration = (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); } else { - Logger::warning("Function does not exist", ['function' => $funcname]); + Logger::warning('Function does not exist', ['function' => $funcname]); $stamp = (float)microtime(true); - DBA::delete('workerqueue', ['id' => $queue["id"]]); + DBA::delete('workerqueue', ['id' => $queue['id']]); self::$db_duration = (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); } @@ -458,15 +458,32 @@ class Worker { $a = DI::app(); - $cooldown = DI::config()->get("system", "worker_cooldown", 0); + $cooldown = DI::config()->get('system', 'worker_cooldown', 0); if ($cooldown > 0) { - Logger::info('Pre execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]); + Logger::debug('Pre execution cooldown.', ['cooldown' => $cooldown, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]); sleep($cooldown); } + $load_cooldown = DI::config()->get('system', 'worker_load_cooldown'); + $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown'); + + while ((($load_cooldown > 0) || ($processes_cooldown > 0)) && ($load = System::getLoadAvg())) { + if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) { + Logger::debug('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]); + sleep(1); + continue; + } + if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) { + Logger::debug('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]); + sleep(1); + continue; + } + break; + } + Logger::enableWorker($funcname); - Logger::info("Process start.", ['priority' => $queue['priority'], 'id' => $queue["id"]]); + Logger::info('Process start.', ['priority' => $queue['priority'], 'id' => $queue['id']]); $stamp = (float)microtime(true); @@ -518,21 +535,21 @@ class Worker self::$lock_duration = 0; if ($duration > 3600) { - Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]); } elseif ($duration > 600) { - Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]); } elseif ($duration > 300) { - Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]); } elseif ($duration > 120) { - Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]); } - Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration, 3)]); + Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration, 3)]); - DI::profiler()->saveLog(DI::logger(), "ID " . $queue["id"] . ": " . $funcname); + DI::profiler()->saveLog(DI::logger(), 'ID ' . $queue['id'] . ': ' . $funcname); if ($cooldown > 0) { - Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]); + Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'cooldown' => $cooldown]); sleep($cooldown); } } @@ -546,16 +563,16 @@ class Worker private static function maxConnectionsReached(): bool { // Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself. - $max = DI::config()->get("system", "max_connections"); + $max = DI::config()->get('system', 'max_connections'); // Fetch the percentage level where the worker will get active - $maxlevel = DI::config()->get("system", "max_connections_level", 75); + $maxlevel = DI::config()->get('system', 'max_connections_level', 75); if ($max == 0) { // the maximum number of possible user connections can be a system variable $r = DBA::fetchFirst("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'"); if (DBA::isResult($r)) { - $max = $r["Value"]; + $max = $r['Value']; } // Or it can be granted. This overrides the system variable $stamp = (float)microtime(true); @@ -581,12 +598,12 @@ class Worker $used = DBA::numRows($r); DBA::close($r); - Logger::info("Connection usage (user values)", ['usage' => $used, 'max' => $max]); + Logger::info('Connection usage (user values)', ['usage' => $used, 'max' => $max]); $level = ($used / $max) * 100; if ($level >= $maxlevel) { - Logger::warning("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max); + Logger::warning('Maximum level (' . $maxlevel . '%) of user connections reached: ' . $used .'/' . $max); return true; } } @@ -597,7 +614,7 @@ class Worker if (!DBA::isResult($r)) { return false; } - $max = intval($r["Value"]); + $max = intval($r['Value']); if ($max == 0) { return false; } @@ -605,18 +622,18 @@ class Worker if (!DBA::isResult($r)) { return false; } - $used = intval($r["Value"]); + $used = intval($r['Value']); if ($used == 0) { return false; } - Logger::info("Connection usage (system values)", ['used' => $used, 'max' => $max]); + Logger::info('Connection usage (system values)', ['used' => $used, 'max' => $max]); $level = $used / $max * 100; if ($level < $maxlevel) { return false; } - Logger::warning("Maximum level (".$level."%) of system connections reached: ".$used."/".$max); + Logger::warning('Maximum level (' . $level . '%) of system connections reached: ' . $used . '/' . $max); return true; } @@ -629,7 +646,7 @@ class Worker */ private static function tooMuchWorkers(): bool { - $queues = DI::config()->get("system", "worker_queues", 10); + $queues = DI::config()->get('system', 'worker_queues', 10); $maxqueues = $queues; @@ -638,7 +655,7 @@ class Worker // Decrease the number of workers at higher load $load = System::currentLoad(); if ($load) { - $maxsysload = intval(DI::config()->get("system", "maxloadavg", 20)); + $maxsysload = intval(DI::config()->get('system', 'maxloadavg', 20)); /* Default exponent 3 causes queues to rapidly decrease as load increases. * If you have 20 max queues at idle, then you get only 5 queues at 37.1% of $maxsysload. @@ -690,8 +707,8 @@ class Worker self::$db_duration += (microtime(true) - $stamp); self::$db_duration_stat += (microtime(true) - $stamp); $idle_workers -= $running; - $waiting_processes += $entry["entries"]; - $listitem[$entry['priority']] = $entry['priority'] . ":" . $running . "/" . $entry["entries"]; + $waiting_processes += $entry['entries']; + $listitem[$entry['priority']] = $entry['priority'] . ':' . $running . '/' . $entry['entries']; } DBA::close($jobs); } else { @@ -702,33 +719,33 @@ class Worker self::$db_duration_stat += (microtime(true) - $stamp); while ($entry = DBA::fetch($jobs)) { - $idle_workers -= $entry["running"]; - $listitem[$entry['priority']] = $entry['priority'].":".$entry["running"]; + $idle_workers -= $entry['running']; + $listitem[$entry['priority']] = $entry['priority'] . ':' . $entry['running']; } DBA::close($jobs); } $waiting_processes -= $deferred; - $listitem[0] = "0:" . max(0, $idle_workers); + $listitem[0] = '0:' . max(0, $idle_workers); $processlist .= ' ('.implode(', ', $listitem).')'; - if (DI::config()->get("system", "worker_fastlane", false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) { + if (DI::config()->get('system', 'worker_fastlane', false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) { $top_priority = self::highestPriority(); $high_running = self::processWithPriorityActive($top_priority); if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) { - Logger::info("Jobs with a higher priority are waiting but none is executed. Open a fastlane.", ['priority' => $top_priority]); + Logger::info('Jobs with a higher priority are waiting but none is executed. Open a fastlane.', ['priority' => $top_priority]); $queues = $active + 1; } } - Logger::notice("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $waiting_processes . $processlist . " - maximum: " . $queues . "/" . $maxqueues); + Logger::notice('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues); // Are there fewer workers running as possible? Then fork a new one. - if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) { - Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]); + if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists()) { + Logger::info('There are fewer workers as possible, fork a new worker.', ['active' => $active, 'queues' => $queues]); if (Worker\Daemon::isMode()) { Worker\IPC::SetJobState(true); } else { @@ -1117,12 +1134,12 @@ class Worker * @param (integer|array) priority or parameter array, strings are deprecated and are ignored * * next args are passed as $cmd command line - * or: Worker::add(PRIORITY_HIGH, "Notifier", Delivery::DELETION, $drop_id); - * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "Delivery", $post_id); + * or: Worker::add(PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id); + * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id); * - * @return int "0" if worker queue entry already existed or there had been an error, otherwise the ID of the worker task + * @return int '0' if worker queue entry already existed or there had been an error, otherwise the ID of the worker task * @throws \Friendica\Network\HTTPException\InternalServerErrorException - * @note $cmd and string args are surrounded with "" + * @note $cmd and string args are surrounded with '' * * @hooks 'proc_run' * array $arr @@ -1136,14 +1153,14 @@ class Worker $arr = ['args' => $args, 'run_cmd' => true]; - Hook::callAll("proc_run", $arr); + Hook::callAll('proc_run', $arr); if (!$arr['run_cmd'] || !count($args)) { return 1; } $priority = PRIORITY_MEDIUM; // Don't fork from frontend tasks by default - $dont_fork = DI::config()->get("system", "worker_dont_fork", false) || !DI::mode()->isBackend(); + $dont_fork = DI::config()->get('system', 'worker_dont_fork', false) || !DI::mode()->isBackend(); $created = DateTimeFormat::utcNow(); $delayed = DBA::NULL_DATETIME; $force_priority = false; diff --git a/static/defaults.config.php b/static/defaults.config.php index 70b0af7810..926cdd7d39 100644 --- a/static/defaults.config.php +++ b/static/defaults.config.php @@ -608,7 +608,7 @@ return [ 'username_max_length' => 48, // worker_cooldown (Integer) - // Cooldown period in seconds after each worker function call. + // Cooldown period in seconds before each worker function call. 'worker_cooldown' => 0, // worker_debug (Boolean) @@ -632,12 +632,20 @@ return [ // List of minutes for the jobs per minute (JPM) calculation 'worker_jpm_range' => '1, 10, 60', + // worker_load_cooldown (Integer) + // Maximum load that causes a cooldown before each worker function call. + 'worker_load_cooldown' => 0, + // worker_load_exponent (Integer) // Default 3, which allows only 25% of the maximum worker queues when server load reaches around 37% of maximum load. // For a linear response where 25% of worker queues are allowed at 75% of maximum load, set this to 1. // Setting 0 would allow maximum worker queues at all times, which is not recommended. 'worker_load_exponent' => 3, + // worker_processes_cooldown (Integer) + // Maximum number pro processes that causes a cooldown before each worker function call. + 'worker_processes_cooldown' => 0, + // worker_multiple_fetch (Boolean) // When activated, the worker fetches jobs for multiple workers (not only for itself). // This is an experimental setting without knowing the performance impact. -- 2.39.5