]> git.mxchange.org Git - friendica.git/commitdiff
Pause the worker execution when the load is too high
authorMichael <heluecht@pirati.ca>
Sun, 4 Sep 2022 13:54:32 +0000 (13:54 +0000)
committerMichael <heluecht@pirati.ca>
Sun, 4 Sep 2022 13:54:32 +0000 (13:54 +0000)
src/Core/System.php
src/Core/Worker.php
static/defaults.config.php

index dbff4bf3f5cb5f5d21c2bd537a230b315c30bbfe..eb27fa351775b3e90ec6be9d230d42701f2acee0 100644 (file)
@@ -435,6 +435,33 @@ class System
                return max($load_arr[0], $load_arr[1]);
        }
 
+       /**
+        * Fetch the load and number of processes
+        *
+        * @return array
+        */
+       public static function getLoadAvg(): array
+       {
+               $content = file_get_contents('/proc/loadavg');
+               if (empty($content)) {
+                       $content = shell_exec('cat /proc/loadavg');
+               }
+               if (empty($content)) {
+                       return [];
+               }
+
+               if (!preg_match("#([.\d]+)\s([.\d]+)\s([.\d]+)\s(\d+)/(\d+)#", $content, $matches)) {
+                       return [];
+               }
+               return [
+                       'average1'  => (float)$matches[1],
+                       'average5'  => (float)$matches[2],
+                       'average15' => (float)$matches[3],
+                       'runnable'  => (float)$matches[4],
+                       'scheduled' => (float)$matches[5]
+               ];
+       }
+
        /**
         * Redirects to an external URL (fully qualified URL)
         * If you want to route relative to the current Friendica base, use App->internalRedirect()
index f7ab2bc7c81e1e560f0b7ebf357aab6ac803e19e..830e02983210d208f55db7e83520f943a5bcc0c6 100644 (file)
@@ -291,7 +291,7 @@ class Worker
                        return false;
                }
 
-               $file = str_replace(getcwd() . "/", "", $file, $count);
+               $file = str_replace(getcwd() . '/', '', $file, $count);
                if ($count != 1) {
                        return false;
                }
@@ -301,11 +301,11 @@ class Worker
                }
 
                $valid = false;
-               if (strpos($file, "include/") === 0) {
+               if (strpos($file, 'include/') === 0) {
                        $valid = true;
                }
 
-               if (strpos($file, "addon/") === 0) {
+               if (strpos($file, 'addon/') === 0) {
                        $valid = true;
                }
 
@@ -327,19 +327,19 @@ class Worker
 
                // Quit when in maintenance
                if (DI::config()->get('system', 'maintenance', false, true)) {
-                       Logger::notice("Maintenance mode - quit process", ['pid' => $mypid]);
+                       Logger::notice('Maintenance mode - quit process', ['pid' => $mypid]);
                        return false;
                }
 
                // Constantly check the number of parallel database processes
                if (DI::system()->isMaxProcessesReached()) {
-                       Logger::warning("Max processes reached for process", ['pid' => $mypid]);
+                       Logger::warning('Max processes reached for process', ['pid' => $mypid]);
                        return false;
                }
 
                // Constantly check the number of available database connections to let the frontend be accessible at any time
                if (self::maxConnectionsReached()) {
-                       Logger::warning("Max connection reached for process", ['pid' => $mypid]);
+                       Logger::warning('Max connection reached for process', ['pid' => $mypid]);
                        return false;
                }
 
@@ -363,7 +363,7 @@ class Worker
                if (method_exists(sprintf('Friendica\Worker\%s', $include), 'execute')) {
                        // We constantly update the "executed" date every minute to avoid being killed too soon
                        if (!isset(self::$last_update)) {
-                               self::$last_update = strtotime($queue["executed"]);
+                               self::$last_update = strtotime($queue['executed']);
                        }
 
                        $age = (time() - self::$last_update) / 60;
@@ -393,13 +393,13 @@ class Worker
 
                // The script could be provided as full path or only with the function name
                if ($include == basename($include)) {
-                       $include = "include/".$include.".php";
+                       $include = 'include/' . $include . '.php';
                }
 
                if (!self::validateInclude($include)) {
-                       Logger::warning("Include file is not valid", ['file' => $argv[0]]);
+                       Logger::warning('Include file is not valid', ['file' => $argv[0]]);
                        $stamp = (float)microtime(true);
-                       DBA::delete('workerqueue', ['id' => $queue["id"]]);
+                       DBA::delete('workerqueue', ['id' => $queue['id']]);
                        self::$db_duration = (microtime(true) - $stamp);
                        self::$db_duration_write += (microtime(true) - $stamp);
                        return true;
@@ -407,12 +407,12 @@ class Worker
 
                require_once $include;
 
-               $funcname = str_replace(".php", "", basename($argv[0]))."_run";
+               $funcname = str_replace('.php', '', basename($argv[0])) .'_run';
 
                if (function_exists($funcname)) {
                        // We constantly update the "executed" date every minute to avoid being killed too soon
                        if (!isset(self::$last_update)) {
-                               self::$last_update = strtotime($queue["executed"]);
+                               self::$last_update = strtotime($queue['executed']);
                        }
 
                        $age = (time() - self::$last_update) / 60;
@@ -428,15 +428,15 @@ class Worker
                        self::execFunction($queue, $funcname, $argv, false);
 
                        $stamp = (float)microtime(true);
-                       if (DBA::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) {
+                       if (DBA::update('workerqueue', ['done' => true], ['id' => $queue['id']])) {
                                DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow());
                        }
                        self::$db_duration = (microtime(true) - $stamp);
                        self::$db_duration_write += (microtime(true) - $stamp);
                } else {
-                       Logger::warning("Function does not exist", ['function' => $funcname]);
+                       Logger::warning('Function does not exist', ['function' => $funcname]);
                        $stamp = (float)microtime(true);
-                       DBA::delete('workerqueue', ['id' => $queue["id"]]);
+                       DBA::delete('workerqueue', ['id' => $queue['id']]);
                        self::$db_duration = (microtime(true) - $stamp);
                        self::$db_duration_write += (microtime(true) - $stamp);
                }
@@ -458,15 +458,32 @@ class Worker
        {
                $a = DI::app();
 
-               $cooldown = DI::config()->get("system", "worker_cooldown", 0);
+               $cooldown = DI::config()->get('system', 'worker_cooldown', 0);
                if ($cooldown > 0) {
-                       Logger::info('Pre execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]);
+                       Logger::debug('Pre execution cooldown.', ['cooldown' => $cooldown, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
                        sleep($cooldown);
                }
 
+               $load_cooldown      = DI::config()->get('system', 'worker_load_cooldown');
+               $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');
+
+               while ((($load_cooldown > 0) || ($processes_cooldown > 0)) && ($load = System::getLoadAvg())) {
+                       if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) {
+                               Logger::debug('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
+                               sleep(1);
+                               continue;
+                       }
+                       if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) {
+                               Logger::debug('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
+                               sleep(1);
+                               continue;
+                       }
+                       break;
+               }
+
                Logger::enableWorker($funcname);
 
-               Logger::info("Process start.", ['priority' => $queue['priority'], 'id' => $queue["id"]]);
+               Logger::info('Process start.', ['priority' => $queue['priority'], 'id' => $queue['id']]);
 
                $stamp = (float)microtime(true);
 
@@ -518,21 +535,21 @@ class Worker
                self::$lock_duration = 0;
 
                if ($duration > 3600) {
-                       Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+                       Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
                } elseif ($duration > 600) {
-                       Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+                       Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
                } elseif ($duration > 300) {
-                       Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+                       Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
                } elseif ($duration > 120) {
-                       Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+                       Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
                }
 
-               Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration, 3)]);
+               Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration, 3)]);
 
-               DI::profiler()->saveLog(DI::logger(), "ID " . $queue["id"] . ": " . $funcname);
+               DI::profiler()->saveLog(DI::logger(), 'ID ' . $queue['id'] . ': ' . $funcname);
 
                if ($cooldown > 0) {
-                       Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]);
+                       Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'cooldown' => $cooldown]);
                        sleep($cooldown);
                }
        }
@@ -546,16 +563,16 @@ class Worker
        private static function maxConnectionsReached(): bool
        {
                // Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself.
-               $max = DI::config()->get("system", "max_connections");
+               $max = DI::config()->get('system', 'max_connections');
 
                // Fetch the percentage level where the worker will get active
-               $maxlevel = DI::config()->get("system", "max_connections_level", 75);
+               $maxlevel = DI::config()->get('system', 'max_connections_level', 75);
 
                if ($max == 0) {
                        // the maximum number of possible user connections can be a system variable
                        $r = DBA::fetchFirst("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
                        if (DBA::isResult($r)) {
-                               $max = $r["Value"];
+                               $max = $r['Value'];
                        }
                        // Or it can be granted. This overrides the system variable
                        $stamp = (float)microtime(true);
@@ -581,12 +598,12 @@ class Worker
                        $used = DBA::numRows($r);
                        DBA::close($r);
 
-                       Logger::info("Connection usage (user values)", ['usage' => $used, 'max' => $max]);
+                       Logger::info('Connection usage (user values)', ['usage' => $used, 'max' => $max]);
 
                        $level = ($used / $max) * 100;
 
                        if ($level >= $maxlevel) {
-                               Logger::warning("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max);
+                               Logger::warning('Maximum level (' . $maxlevel . '%) of user connections reached: ' . $used .'/' . $max);
                                return true;
                        }
                }
@@ -597,7 +614,7 @@ class Worker
                if (!DBA::isResult($r)) {
                        return false;
                }
-               $max = intval($r["Value"]);
+               $max = intval($r['Value']);
                if ($max == 0) {
                        return false;
                }
@@ -605,18 +622,18 @@ class Worker
                if (!DBA::isResult($r)) {
                        return false;
                }
-               $used = intval($r["Value"]);
+               $used = intval($r['Value']);
                if ($used == 0) {
                        return false;
                }
-               Logger::info("Connection usage (system values)", ['used' => $used, 'max' => $max]);
+               Logger::info('Connection usage (system values)', ['used' => $used, 'max' => $max]);
 
                $level = $used / $max * 100;
 
                if ($level < $maxlevel) {
                        return false;
                }
-               Logger::warning("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
+               Logger::warning('Maximum level (' . $level . '%) of system connections reached: ' . $used . '/' . $max);
                return true;
        }
 
@@ -629,7 +646,7 @@ class Worker
         */
        private static function tooMuchWorkers(): bool
        {
-               $queues = DI::config()->get("system", "worker_queues", 10);
+               $queues = DI::config()->get('system', 'worker_queues', 10);
 
                $maxqueues = $queues;
 
@@ -638,7 +655,7 @@ class Worker
                // Decrease the number of workers at higher load
                $load = System::currentLoad();
                if ($load) {
-                       $maxsysload = intval(DI::config()->get("system", "maxloadavg", 20));
+                       $maxsysload = intval(DI::config()->get('system', 'maxloadavg', 20));
 
                        /* Default exponent 3 causes queues to rapidly decrease as load increases.
                         * If you have 20 max queues at idle, then you get only 5 queues at 37.1% of $maxsysload.
@@ -690,8 +707,8 @@ class Worker
                                        self::$db_duration += (microtime(true) - $stamp);
                                        self::$db_duration_stat += (microtime(true) - $stamp);
                                        $idle_workers -= $running;
-                                       $waiting_processes += $entry["entries"];
-                                       $listitem[$entry['priority']] = $entry['priority'] . ":" . $running . "/" . $entry["entries"];
+                                       $waiting_processes += $entry['entries'];
+                                       $listitem[$entry['priority']] = $entry['priority'] . ':' . $running . '/' . $entry['entries'];
                                }
                                DBA::close($jobs);
                        } else {
@@ -702,33 +719,33 @@ class Worker
                                self::$db_duration_stat += (microtime(true) - $stamp);
 
                                while ($entry = DBA::fetch($jobs)) {
-                                       $idle_workers -= $entry["running"];
-                                       $listitem[$entry['priority']] = $entry['priority'].":".$entry["running"];
+                                       $idle_workers -= $entry['running'];
+                                       $listitem[$entry['priority']] = $entry['priority'] . ':' . $entry['running'];
                                }
                                DBA::close($jobs);
                        }
 
                        $waiting_processes -= $deferred;
 
-                       $listitem[0] = "0:" . max(0, $idle_workers);
+                       $listitem[0] = '0:' . max(0, $idle_workers);
 
                        $processlist .= ' ('.implode(', ', $listitem).')';
 
-                       if (DI::config()->get("system", "worker_fastlane", false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) {
+                       if (DI::config()->get('system', 'worker_fastlane', false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) {
                                $top_priority = self::highestPriority();
                                $high_running = self::processWithPriorityActive($top_priority);
 
                                if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
-                                       Logger::info("Jobs with a higher priority are waiting but none is executed. Open a fastlane.", ['priority' => $top_priority]);
+                                       Logger::info('Jobs with a higher priority are waiting but none is executed. Open a fastlane.', ['priority' => $top_priority]);
                                        $queues = $active + 1;
                                }
                        }
 
-                       Logger::notice("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $waiting_processes . $processlist . " - maximum: " . $queues . "/" . $maxqueues);
+                       Logger::notice('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues);
 
                        // Are there fewer workers running as possible? Then fork a new one.
-                       if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) {
-                               Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]);
+                       if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists()) {
+                               Logger::info('There are fewer workers as possible, fork a new worker.', ['active' => $active, 'queues' => $queues]);
                                if (Worker\Daemon::isMode()) {
                                        Worker\IPC::SetJobState(true);
                                } else {
@@ -1117,12 +1134,12 @@ class Worker
         * @param (integer|array) priority or parameter array, strings are deprecated and are ignored
         *
         * next args are passed as $cmd command line
-        * or: Worker::add(PRIORITY_HIGH, "Notifier", Delivery::DELETION, $drop_id);
-        * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "Delivery", $post_id);
+        * or: Worker::add(PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id);
+        * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id);
         *
-        * @return int "0" if worker queue entry already existed or there had been an error, otherwise the ID of the worker task
+        * @return int '0' if worker queue entry already existed or there had been an error, otherwise the ID of the worker task
         * @throws \Friendica\Network\HTTPException\InternalServerErrorException
-        * @note $cmd and string args are surrounded with ""
+        * @note $cmd and string args are surrounded with ''
         *
         * @hooks 'proc_run'
         *    array $arr
@@ -1136,14 +1153,14 @@ class Worker
 
                $arr = ['args' => $args, 'run_cmd' => true];
 
-               Hook::callAll("proc_run", $arr);
+               Hook::callAll('proc_run', $arr);
                if (!$arr['run_cmd'] || !count($args)) {
                        return 1;
                }
 
                $priority = PRIORITY_MEDIUM;
                // Don't fork from frontend tasks by default
-               $dont_fork = DI::config()->get("system", "worker_dont_fork", false) || !DI::mode()->isBackend();
+               $dont_fork = DI::config()->get('system', 'worker_dont_fork', false) || !DI::mode()->isBackend();
                $created = DateTimeFormat::utcNow();
                $delayed = DBA::NULL_DATETIME;
                $force_priority = false;
index 70b0af78105b424816bfc3b9a9b5a70fd037fc86..926cdd7d397803726465f754638f499383a3baa1 100644 (file)
@@ -608,7 +608,7 @@ return [
                'username_max_length' => 48,
 
                // worker_cooldown (Integer)
-               // Cooldown period in seconds after each worker function call.
+               // Cooldown period in seconds before each worker function call.
                'worker_cooldown' => 0,
 
                // worker_debug (Boolean)
@@ -632,12 +632,20 @@ return [
                // List of minutes for the jobs per minute (JPM) calculation
                'worker_jpm_range' => '1, 10, 60',
 
+               // worker_load_cooldown (Integer)
+               // Maximum load that causes a cooldown before each worker function call.
+               'worker_load_cooldown' => 0,
+
                // worker_load_exponent (Integer)
                // Default 3, which allows only 25% of the maximum worker queues when server load reaches around 37% of maximum load.
                // For a linear response where 25% of worker queues are allowed at 75% of maximum load, set this to 1.
                // Setting 0 would allow maximum worker queues at all times, which is not recommended.
                'worker_load_exponent' => 3,
 
+               // worker_processes_cooldown (Integer)
+               // Maximum number pro processes that causes a cooldown before each worker function call.
+               'worker_processes_cooldown' => 0,
+
                // worker_multiple_fetch (Boolean)
                // When activated, the worker fetches jobs for multiple workers (not only for itself).
                // This is an experimental setting without knowing the performance impact.