]> git.mxchange.org Git - friendica.git/blobdiff - src/Core/Worker.php
DDEV and some PHP8.1 fixes.
[friendica.git] / src / Core / Worker.php
index 830e02983210d208f55db7e83520f943a5bcc0c6..96f3e7ae042e30fae459cff6ca272d6f1bdba261 100644 (file)
@@ -31,6 +31,21 @@ use Friendica\Util\DateTimeFormat;
  */
 class Worker
 {
+       /**
+        * @name Priority
+        *
+        * Process priority for the worker
+        * @{
+        */
+       const PRIORITY_UNDEFINED  = 0;
+       const PRIORITY_CRITICAL   = 10;
+       const PRIORITY_HIGH       = 20;
+       const PRIORITY_MEDIUM     = 30;
+       const PRIORITY_LOW        = 40;
+       const PRIORITY_NEGLIGIBLE = 50;
+       const PRIORITIES          = [self::PRIORITY_CRITICAL, self::PRIORITY_HIGH, self::PRIORITY_MEDIUM, self::PRIORITY_LOW, self::PRIORITY_NEGLIGIBLE];
+       /* @}*/
+
        const STATE_STARTUP    = 1; // Worker is in startup. This takes most time.
        const STATE_LONG_LOOP  = 2; // Worker is processing the whole - long - loop.
        const STATE_REFETCH    = 3; // Worker had refetched jobs in the execution loop.
@@ -143,7 +158,7 @@ class Worker
                        }
 
                        // Quit the worker once every cron interval
-                       if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
+                       if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60)) && !self::systemLimitReached()) {
                                Logger::info('Process lifetime reached, respawning.');
                                self::unclaimProcess($process);
                                if (Worker\Daemon::isMode()) {
@@ -300,17 +315,7 @@ class Worker
                        return false;
                }
 
-               $valid = false;
-               if (strpos($file, 'include/') === 0) {
-                       $valid = true;
-               }
-
-               if (strpos($file, 'addon/') === 0) {
-                       $valid = true;
-               }
-
-               // Simply return flag
-               return $valid;
+               return (strpos($file, 'addon/') === 0);
        }
 
        /**
@@ -391,11 +396,6 @@ class Worker
                        return true;
                }
 
-               // The script could be provided as full path or only with the function name
-               if ($include == basename($include)) {
-                       $include = 'include/' . $include . '.php';
-               }
-
                if (!self::validateInclude($include)) {
                        Logger::warning('Include file is not valid', ['file' => $argv[0]]);
                        $stamp = (float)microtime(true);
@@ -445,42 +445,110 @@ class Worker
        }
 
        /**
-        * Execute a function from the queue
+        * Checks if system limits are reached.
         *
-        * @param array   $queue       Workerqueue entry
-        * @param string  $funcname    name of the function
-        * @param array   $argv        Array of values to be passed to the function
-        * @param boolean $method_call boolean
-        * @return void
-        * @throws \Friendica\Network\HTTPException\InternalServerErrorException
+        * @return boolean
         */
-       private static function execFunction(array $queue, string $funcname, array $argv, bool $method_call)
+       private static function systemLimitReached(): bool
        {
-               $a = DI::app();
+               $load_cooldown      = DI::config()->get('system', 'worker_load_cooldown');
+               $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');
+
+               if ($load_cooldown == 0) {
+                       $load_cooldown = DI::config()->get('system', 'maxloadavg');
+               }
+
+               if (($load_cooldown == 0) && ($processes_cooldown == 0)) {
+                       return false;
+               }
+
+               $load = System::getLoadAvg();
+               if (empty($load)) {
+                       return false;
+               }
+
+               if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) {
+                       return true;
+               }
+
+               if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) {
+                       return true;
+               }
+
+               return false;
+       }
 
+       /**
+        * Slow the execution down if the system load is too high
+        *
+        * @return void
+        */
+       public static function coolDown()
+       {
                $cooldown = DI::config()->get('system', 'worker_cooldown', 0);
                if ($cooldown > 0) {
-                       Logger::debug('Pre execution cooldown.', ['cooldown' => $cooldown, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
-                       sleep($cooldown);
+                       Logger::debug('Wait for cooldown.', ['cooldown' => $cooldown]);
+                       if ($cooldown < 1) {
+                               usleep($cooldown * 1000000);
+                       } else {
+                               sleep($cooldown);
+                       }
                }
 
                $load_cooldown      = DI::config()->get('system', 'worker_load_cooldown');
                $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');
 
-               while ((($load_cooldown > 0) || ($processes_cooldown > 0)) && ($load = System::getLoadAvg())) {
+               if ($load_cooldown == 0) {
+                       $load_cooldown = DI::config()->get('system', 'maxloadavg');
+               }
+
+               if (($load_cooldown == 0) && ($processes_cooldown == 0)) {
+                       return;
+               }
+
+               $sleeping = false;
+
+               while ($load = System::getLoadAvg()) {
                        if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) {
-                               Logger::debug('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
+                               if (!$sleeping) {
+                                       Logger::notice('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+                                       $sleeping = true;
+                               }
                                sleep(1);
                                continue;
                        }
                        if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) {
-                               Logger::debug('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
+                               if (!$sleeping) {
+                                       Logger::notice('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+                                       $sleeping = true;
+                               }
                                sleep(1);
                                continue;
                        }
                        break;
                }
 
+               if ($sleeping) {
+                       Logger::notice('Cooldown ended.', ['max-load' => $load_cooldown, 'max-processes' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+               }
+       }
+
+       /**
+        * Execute a function from the queue
+        *
+        * @param array   $queue       Workerqueue entry
+        * @param string  $funcname    name of the function
+        * @param array   $argv        Array of values to be passed to the function
+        * @param boolean $method_call boolean
+        * @return void
+        * @throws \Friendica\Network\HTTPException\InternalServerErrorException
+        */
+       private static function execFunction(array $queue, string $funcname, array $argv, bool $method_call)
+       {
+               $a = DI::app();
+
+               self::coolDown();
+
                Logger::enableWorker($funcname);
 
                Logger::info('Process start.', ['priority' => $queue['priority'], 'id' => $queue['id']]);
@@ -527,6 +595,8 @@ class Worker
 
                Logger::info('Performance:', ['state' => self::$state, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'lock' => $dblock, 'total' => $dbtotal, 'rest' => $rest, 'exec' => $exec]);
 
+               self::coolDown();
+
                self::$up_start = microtime(true);
                self::$db_duration = 0;
                self::$db_duration_count = 0;
@@ -547,11 +617,6 @@ class Worker
                Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration, 3)]);
 
                DI::profiler()->saveLog(DI::logger(), 'ID ' . $queue['id'] . ': ' . $funcname);
-
-               if ($cooldown > 0) {
-                       Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'cooldown' => $cooldown]);
-                       sleep($cooldown);
-               }
        }
 
        /**
@@ -735,7 +800,7 @@ class Worker
                                $top_priority = self::highestPriority();
                                $high_running = self::processWithPriorityActive($top_priority);
 
-                               if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
+                               if (!$high_running && ($top_priority > self::PRIORITY_UNDEFINED) && ($top_priority < self::PRIORITY_NEGLIGIBLE)) {
                                        Logger::info('Jobs with a higher priority are waiting but none is executed. Open a fastlane.', ['priority' => $top_priority]);
                                        $queues = $active + 1;
                                }
@@ -744,7 +809,7 @@ class Worker
                        Logger::notice('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues);
 
                        // Are there fewer workers running as possible? Then fork a new one.
-                       if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists()) {
+                       if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists() && !self::systemLimitReached()) {
                                Logger::info('There are fewer workers as possible, fork a new worker.', ['active' => $active, 'queues' => $queues]);
                                if (Worker\Daemon::isMode()) {
                                        Worker\IPC::SetJobState(true);
@@ -867,7 +932,7 @@ class Worker
        private static function nextPriority()
        {
                $waiting = [];
-               $priorities = [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE];
+               $priorities = [self::PRIORITY_CRITICAL, self::PRIORITY_HIGH, self::PRIORITY_MEDIUM, self::PRIORITY_LOW, self::PRIORITY_NEGLIGIBLE];
                foreach ($priorities as $priority) {
                        $stamp = (float)microtime(true);
                        if (DBA::exists('workerqueue', ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()])) {
@@ -876,8 +941,8 @@ class Worker
                        self::$db_duration += (microtime(true) - $stamp);
                }
 
-               if (!empty($waiting[PRIORITY_CRITICAL])) {
-                       return PRIORITY_CRITICAL;
+               if (!empty($waiting[self::PRIORITY_CRITICAL])) {
+                       return self::PRIORITY_CRITICAL;
                }
 
                $running = [];
@@ -1134,8 +1199,8 @@ class Worker
         * @param (integer|array) priority or parameter array, strings are deprecated and are ignored
         *
         * next args are passed as $cmd command line
-        * or: Worker::add(PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id);
-        * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id);
+        * or: Worker::add(Worker::PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id);
+        * or: Worker::add(array('priority' => Worker::PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id);
         *
         * @return int '0' if worker queue entry already existed or there had been an error, otherwise the ID of the worker task
         * @throws \Friendica\Network\HTTPException\InternalServerErrorException
@@ -1158,7 +1223,7 @@ class Worker
                        return 1;
                }
 
-               $priority = PRIORITY_MEDIUM;
+               $priority = self::PRIORITY_MEDIUM;
                // Don't fork from frontend tasks by default
                $dont_fork = DI::config()->get('system', 'worker_dont_fork', false) || !DI::mode()->isBackend();
                $created = DateTimeFormat::utcNow();
@@ -1194,9 +1259,9 @@ class Worker
                $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
                $added = 0;
 
-               if (!is_int($priority) || !in_array($priority, PRIORITIES)) {
+               if (!is_int($priority) || !in_array($priority, self::PRIORITIES)) {
                        Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]);
-                       $priority = PRIORITY_MEDIUM;
+                       $priority = self::PRIORITY_MEDIUM;
                }
 
                // Quit if there was a database error - a precaution for the update process to 3.5.3
@@ -1222,7 +1287,7 @@ class Worker
                Worker\Daemon::checkState();
 
                // Should we quit and wait for the worker to be called as a cronjob?
-               if ($dont_fork) {
+               if ($dont_fork || self::systemLimitReached()) {
                        return $added;
                }
 
@@ -1311,12 +1376,12 @@ class Worker
                $delay = (($new_retrial + 2) ** 4) + (rand(1, 30) * ($new_retrial));
                $next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
 
-               if (($priority < PRIORITY_MEDIUM) && ($new_retrial > 3)) {
-                       $priority = PRIORITY_MEDIUM;
-               } elseif (($priority < PRIORITY_LOW) && ($new_retrial > 6)) {
-                       $priority = PRIORITY_LOW;
-               } elseif (($priority < PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) {
-                       $priority = PRIORITY_NEGLIGIBLE;
+               if (($priority < self::PRIORITY_MEDIUM) && ($new_retrial > 3)) {
+                       $priority = self::PRIORITY_MEDIUM;
+               } elseif (($priority < self::PRIORITY_LOW) && ($new_retrial > 6)) {
+                       $priority = self::PRIORITY_LOW;
+               } elseif (($priority < self::PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) {
+                       $priority = self::PRIORITY_NEGLIGIBLE;
                }
 
                Logger::info('Deferred task', ['id' => $id, 'retrial' => $new_retrial, 'created' => $queue['created'], 'next_execution' => $next, 'old_prio' => $queue['priority'], 'new_prio' => $priority]);