]> git.mxchange.org Git - friendica.git/blobdiff - src/Core/Worker.php
Merge remote-tracking branch 'upstream/develop' into user-defined-channels
[friendica.git] / src / Core / Worker.php
index 6bf6168b2fa2835913f6ac6e0645d525fda37bff..c84b59e2b7324c5ec6838c3962706e3f306c54c9 100644 (file)
@@ -1,6 +1,6 @@
 <?php
 /**
- * @copyright Copyright (C) 2010-2022, the Friendica project
+ * @copyright Copyright (C) 2010-2023, the Friendica project
  *
  * @license GNU AGPL version 3 or any later version
  *
@@ -31,6 +31,21 @@ use Friendica\Util\DateTimeFormat;
  */
 class Worker
 {
+       /**
+        * @name Priority
+        *
+        * Process priority for the worker
+        * @{
+        */
+       const PRIORITY_UNDEFINED  = 0;
+       const PRIORITY_CRITICAL   = 10;
+       const PRIORITY_HIGH       = 20;
+       const PRIORITY_MEDIUM     = 30;
+       const PRIORITY_LOW        = 40;
+       const PRIORITY_NEGLIGIBLE = 50;
+       const PRIORITIES          = [self::PRIORITY_CRITICAL, self::PRIORITY_HIGH, self::PRIORITY_MEDIUM, self::PRIORITY_LOW, self::PRIORITY_NEGLIGIBLE];
+       /* @}*/
+
        const STATE_STARTUP    = 1; // Worker is in startup. This takes most time.
        const STATE_LONG_LOOP  = 2; // Worker is processing the whole - long - loop.
        const STATE_REFETCH    = 3; // Worker had refetched jobs in the execution loop.
@@ -74,9 +89,9 @@ class Worker
                self::$process = $process;
 
                // Kill stale processes every 5 minutes
-               $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0);
+               $last_cleanup = DI::keyValue()->get('worker_last_cleaned') ?? 0;
                if (time() > ($last_cleanup + 300)) {
-                       DI::config()->set('system', 'worker_last_cleaned', time());
+                       DI::keyValue()->set( 'worker_last_cleaned', time());
                        Worker\Cron::killStaleWorkers();
                }
 
@@ -126,7 +141,7 @@ class Worker
                                if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
                                // Count active workers and compare them with a maximum value that depends on the load
                                        if (self::tooMuchWorkers()) {
-                                               Logger::notice('Active worker limit reached, quitting.');
+                                               Logger::info('Active worker limit reached, quitting.');
                                                DI::lock()->release(self::LOCK_WORKER);
                                                return;
                                        }
@@ -173,7 +188,7 @@ class Worker
        {
                // Count active workers and compare them with a maximum value that depends on the load
                if (self::tooMuchWorkers()) {
-                       Logger::notice('Active worker limit reached, quitting.');
+                       Logger::info('Active worker limit reached, quitting.');
                        return false;
                }
 
@@ -300,17 +315,7 @@ class Worker
                        return false;
                }
 
-               $valid = false;
-               if (strpos($file, 'include/') === 0) {
-                       $valid = true;
-               }
-
-               if (strpos($file, 'addon/') === 0) {
-                       $valid = true;
-               }
-
-               // Simply return flag
-               return $valid;
+               return (strpos($file, 'addon/') === 0);
        }
 
        /**
@@ -326,7 +331,7 @@ class Worker
                $mypid = getmypid();
 
                // Quit when in maintenance
-               if (DI::config()->get('system', 'maintenance', false, true)) {
+               if (DI::config()->get('system', 'maintenance', false)) {
                        Logger::notice('Maintenance mode - quit process', ['pid' => $mypid]);
                        return false;
                }
@@ -357,7 +362,7 @@ class Worker
                        return false;
                }
 
-               // Check for existance and validity of the include file
+               // Check for existence and validity of the include file
                $include = $argv[0];
 
                if (method_exists(sprintf('Friendica\Worker\%s', $include), 'execute')) {
@@ -383,7 +388,7 @@ class Worker
                        $stamp = (float)microtime(true);
                        $condition = ["`id` = ? AND `next_try` < ?", $queue['id'], DateTimeFormat::utcNow()];
                        if (DBA::update('workerqueue', ['done' => true], $condition)) {
-                               DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow());
+                               DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow());
                        }
                        self::$db_duration = (microtime(true) - $stamp);
                        self::$db_duration_write += (microtime(true) - $stamp);
@@ -391,11 +396,6 @@ class Worker
                        return true;
                }
 
-               // The script could be provided as full path or only with the function name
-               if ($include == basename($include)) {
-                       $include = 'include/' . $include . '.php';
-               }
-
                if (!self::validateInclude($include)) {
                        Logger::warning('Include file is not valid', ['file' => $argv[0]]);
                        $stamp = (float)microtime(true);
@@ -429,7 +429,7 @@ class Worker
 
                        $stamp = (float)microtime(true);
                        if (DBA::update('workerqueue', ['done' => true], ['id' => $queue['id']])) {
-                               DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow());
+                               DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow());
                        }
                        self::$db_duration = (microtime(true) - $stamp);
                        self::$db_duration_write += (microtime(true) - $stamp);
@@ -454,11 +454,15 @@ class Worker
                $load_cooldown      = DI::config()->get('system', 'worker_load_cooldown');
                $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');
 
+               if ($load_cooldown == 0) {
+                       $load_cooldown = DI::config()->get('system', 'maxloadavg');
+               }
+
                if (($load_cooldown == 0) && ($processes_cooldown == 0)) {
                        return false;
                }
 
-               $load = System::getLoadAvg();
+               $load = System::getLoadAvg($processes_cooldown != 0);
                if (empty($load)) {
                        return false;
                }
@@ -481,19 +485,33 @@ class Worker
         */
        public static function coolDown()
        {
+               $cooldown = DI::config()->get('system', 'worker_cooldown', 0);
+               if ($cooldown > 0) {
+                       Logger::debug('Wait for cooldown.', ['cooldown' => $cooldown]);
+                       if ($cooldown < 1) {
+                               usleep($cooldown * 1000000);
+                       } else {
+                               sleep($cooldown);
+                       }
+               }
+
                $load_cooldown      = DI::config()->get('system', 'worker_load_cooldown');
                $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');
 
+               if ($load_cooldown == 0) {
+                       $load_cooldown = DI::config()->get('system', 'maxloadavg');
+               }
+
                if (($load_cooldown == 0) && ($processes_cooldown == 0)) {
                        return;
                }
 
                $sleeping = false;
 
-               while ($load = System::getLoadAvg()) {
+               while ($load = System::getLoadAvg($processes_cooldown != 0)) {
                        if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) {
                                if (!$sleeping) {
-                                       Logger::notice('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+                                       Logger::info('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
                                        $sleeping = true;
                                }
                                sleep(1);
@@ -501,7 +519,7 @@ class Worker
                        }
                        if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) {
                                if (!$sleeping) {
-                                       Logger::notice('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+                                       Logger::info('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
                                        $sleeping = true;
                                }
                                sleep(1);
@@ -511,7 +529,7 @@ class Worker
                }
 
                if ($sleeping) {
-                       Logger::notice('Cooldown ended.', ['max-load' => $load_cooldown, 'max-processes' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+                       Logger::info('Cooldown ended.', ['max-load' => $load_cooldown, 'max-processes' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
                }
        }
 
@@ -529,12 +547,6 @@ class Worker
        {
                $a = DI::app();
 
-               $cooldown = DI::config()->get('system', 'worker_cooldown', 0);
-               if ($cooldown > 0) {
-                       Logger::notice('Pre execution cooldown.', ['cooldown' => $cooldown, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
-                       sleep($cooldown);
-               }
-
                self::coolDown();
 
                Logger::enableWorker($funcname);
@@ -556,7 +568,15 @@ class Worker
 
                // Set the workerLogger as new default logger
                if ($method_call) {
-                       call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv);
+                       try {
+                               call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv);
+                       } catch (\TypeError $e) {
+                               // No need to defer a worker queue entry if the arguments are invalid
+                               Logger::notice('Wrong worker arguments', ['class' => $funcname, 'argv' => $argv, 'queue' => $queue, 'message' => $e->getMessage()]);
+                       } catch (\Throwable $e) {
+                               Logger::error('Uncaught exception in worker execution', ['class' => get_class($e), 'message' => $e->getMessage(), 'code' => $e->getCode(), 'file' => $e->getFile() . ':' . $e->getLine(), 'trace' => $e->getTraceAsString(), 'previous' => $e->getPrevious()]);
+                               Worker::defer();
+                       }
                } else {
                        $funcname($argv, count($argv));
                }
@@ -570,7 +590,7 @@ class Worker
                /* With these values we can analyze how effective the worker is.
                 * The database and rest time should be low since this is the unproductive time.
                 * The execution time is the productive time.
-                * By changing parameters like the maximum number of workers we can check the effectivness.
+                * By changing parameters like the maximum number of workers we can check the effectiveness.
                */
                $dbtotal = round(self::$db_duration, 2);
                $dbread  = round(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 2);
@@ -581,7 +601,7 @@ class Worker
                $rest    = round(max(0, $up_duration - (self::$db_duration + self::$lock_duration)), 2);
                $exec    = round($duration, 2);
 
-               Logger::info('Performance:', ['state' => self::$state, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'lock' => $dblock, 'total' => $dbtotal, 'rest' => $rest, 'exec' => $exec]);
+               Logger::info('Performance:', ['function' => $funcname, 'state' => self::$state, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'lock' => $dblock, 'total' => $dbtotal, 'rest' => $rest, 'exec' => $exec]);
 
                self::coolDown();
 
@@ -602,14 +622,9 @@ class Worker
                        Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
                }
 
-               Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration, 3)]);
+               Logger::info('Process done.', ['function' => $funcname, 'priority' => $queue['priority'], 'retrial' => $queue['retrial'], 'id' => $queue['id'], 'duration' => round($duration, 3)]);
 
                DI::profiler()->saveLog(DI::logger(), 'ID ' . $queue['id'] . ': ' . $funcname);
-
-               if ($cooldown > 0) {
-                       Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'cooldown' => $cooldown]);
-                       sleep($cooldown);
-               }
        }
 
        /**
@@ -647,16 +662,24 @@ class Worker
                        DBA::close($r);
                }
 
+               $stamp = (float)microtime(true);
+               $used  = 0;
+               $sleep = 0;
+               $data = DBA::p("SHOW PROCESSLIST");
+               while ($row = DBA::fetch($data)) {
+                       if ($row['Command'] != 'Sleep') {
+                               ++$used;
+                       } else {
+                               ++$sleep;
+                       }
+               }
+               DBA::close($data);
+               self::$db_duration += (microtime(true) - $stamp);
+
                // If $max is set we will use the processlist to determine the current number of connections
                // The processlist only shows entries of the current user
                if ($max != 0) {
-                       $stamp = (float)microtime(true);
-                       $r = DBA::p('SHOW PROCESSLIST');
-                       self::$db_duration += (microtime(true) - $stamp);
-                       $used = DBA::numRows($r);
-                       DBA::close($r);
-
-                       Logger::info('Connection usage (user values)', ['usage' => $used, 'max' => $max]);
+                       Logger::info('Connection usage (user values)', ['working' => $used, 'sleeping' => $sleep, 'max' => $max]);
 
                        $level = ($used / $max) * 100;
 
@@ -680,11 +703,11 @@ class Worker
                if (!DBA::isResult($r)) {
                        return false;
                }
-               $used = intval($r['Value']);
+               $used = max($used, intval($r['Value'])) - $sleep;
                if ($used == 0) {
                        return false;
                }
-               Logger::info('Connection usage (system values)', ['used' => $used, 'max' => $max]);
+               Logger::info('Connection usage (system values)', ['working' => $used, 'sleeping' => $sleep, 'max' => $max]);
 
                $level = $used / $max * 100;
 
@@ -793,13 +816,13 @@ class Worker
                                $top_priority = self::highestPriority();
                                $high_running = self::processWithPriorityActive($top_priority);
 
-                               if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
+                               if (!$high_running && ($top_priority > self::PRIORITY_UNDEFINED) && ($top_priority < self::PRIORITY_NEGLIGIBLE)) {
                                        Logger::info('Jobs with a higher priority are waiting but none is executed. Open a fastlane.', ['priority' => $top_priority]);
                                        $queues = $active + 1;
                                }
                        }
 
-                       Logger::notice('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues);
+                       Logger::info('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues);
 
                        // Are there fewer workers running as possible? Then fork a new one.
                        if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists() && !self::systemLimitReached()) {
@@ -862,7 +885,7 @@ class Worker
        /**
         * Returns waiting jobs for the current process id
         *
-        * @return array|bool waiting workerqueue jobs or FALSE on failture
+        * @return array|bool waiting workerqueue jobs or FALSE on failure
         * @throws \Exception
         */
        private static function getWaitingJobForPID()
@@ -925,7 +948,7 @@ class Worker
        private static function nextPriority()
        {
                $waiting = [];
-               $priorities = [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE];
+               $priorities = [self::PRIORITY_CRITICAL, self::PRIORITY_HIGH, self::PRIORITY_MEDIUM, self::PRIORITY_LOW, self::PRIORITY_NEGLIGIBLE];
                foreach ($priorities as $priority) {
                        $stamp = (float)microtime(true);
                        if (DBA::exists('workerqueue', ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()])) {
@@ -934,8 +957,8 @@ class Worker
                        self::$db_duration += (microtime(true) - $stamp);
                }
 
-               if (!empty($waiting[PRIORITY_CRITICAL])) {
-                       return PRIORITY_CRITICAL;
+               if (!empty($waiting[self::PRIORITY_CRITICAL])) {
+                       return self::PRIORITY_CRITICAL;
                }
 
                $running = [];
@@ -1192,8 +1215,8 @@ class Worker
         * @param (integer|array) priority or parameter array, strings are deprecated and are ignored
         *
         * next args are passed as $cmd command line
-        * or: Worker::add(PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id);
-        * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id);
+        * or: Worker::add(Worker::PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id);
+        * or: Worker::add(array('priority' => Worker::PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id);
         *
         * @return int '0' if worker queue entry already existed or there had been an error, otherwise the ID of the worker task
         * @throws \Friendica\Network\HTTPException\InternalServerErrorException
@@ -1216,7 +1239,7 @@ class Worker
                        return 1;
                }
 
-               $priority = PRIORITY_MEDIUM;
+               $priority = self::PRIORITY_MEDIUM;
                // Don't fork from frontend tasks by default
                $dont_fork = DI::config()->get('system', 'worker_dont_fork', false) || !DI::mode()->isBackend();
                $created = DateTimeFormat::utcNow();
@@ -1249,12 +1272,12 @@ class Worker
 
                $command = array_shift($args);
                $parameters = json_encode($args);
-               $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
+               $queue = DBA::selectFirst('workerqueue', ['id', 'priority'], ['command' => $command, 'parameter' => $parameters, 'done' => false]);
                $added = 0;
 
-               if (!is_int($priority) || !in_array($priority, PRIORITIES)) {
+               if (!is_int($priority) || !in_array($priority, self::PRIORITIES)) {
                        Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]);
-                       $priority = PRIORITY_MEDIUM;
+                       $priority = self::PRIORITY_MEDIUM;
                }
 
                // Quit if there was a database error - a precaution for the update process to 3.5.3
@@ -1262,14 +1285,17 @@ class Worker
                        return 0;
                }
 
-               if (!$found) {
+               if (empty($queue)) {
                        if (!DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created,
                                'priority' => $priority, 'next_try' => $delayed])) {
                                return 0;
                        }
                        $added = DBA::lastInsertId();
                } elseif ($force_priority) {
-                       DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
+                       $ret = DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
+                       if ($ret && ($priority != $queue['priority'])) {
+                               $added = $queue['id'];
+                       }
                }
 
                // Set the IPC flag to ensure an immediate process execution via daemon
@@ -1297,8 +1323,8 @@ class Worker
                        return $added;
                }
 
-               // Quit on daemon mode
-               if (Worker\Daemon::isMode()) {
+               // Quit on daemon mode, except the priority is critical (like for db updates)
+               if (Worker\Daemon::isMode() && $priority !== self::PRIORITY_CRITICAL) {
                        return $added;
                }
 
@@ -1339,6 +1365,17 @@ class Worker
                return $new_retrial;
        }
 
+       /**
+        * Get the number of retrials for the current worker task
+        *
+        * @return integer
+        */
+       public static function getRetrial(): int
+       {
+               $queue = DI::app()->getQueue();
+               return $queue['retrial'] ?? 0;
+       }
+
        /**
         * Defers the current worker entry
         *
@@ -1369,12 +1406,12 @@ class Worker
                $delay = (($new_retrial + 2) ** 4) + (rand(1, 30) * ($new_retrial));
                $next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
 
-               if (($priority < PRIORITY_MEDIUM) && ($new_retrial > 3)) {
-                       $priority = PRIORITY_MEDIUM;
-               } elseif (($priority < PRIORITY_LOW) && ($new_retrial > 6)) {
-                       $priority = PRIORITY_LOW;
-               } elseif (($priority < PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) {
-                       $priority = PRIORITY_NEGLIGIBLE;
+               if (($priority < self::PRIORITY_MEDIUM) && ($new_retrial > 3)) {
+                       $priority = self::PRIORITY_MEDIUM;
+               } elseif (($priority < self::PRIORITY_LOW) && ($new_retrial > 6)) {
+                       $priority = self::PRIORITY_LOW;
+               } elseif (($priority < self::PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) {
+                       $priority = self::PRIORITY_NEGLIGIBLE;
                }
 
                Logger::info('Deferred task', ['id' => $id, 'retrial' => $new_retrial, 'created' => $queue['created'], 'next_execution' => $next, 'old_prio' => $queue['priority'], 'new_prio' => $priority]);
@@ -1396,7 +1433,7 @@ class Worker
         */
        public static function isInMaintenanceWindow(bool $check_last_execution = false): bool
        {
-               // Calculate the seconds of the start end end of the maintenance window
+               // Calculate the seconds of the start and end of the maintenance window
                $start = strtotime(DI::config()->get('system', 'maintenance_start')) % 86400;
                $end = strtotime(DI::config()->get('system', 'maintenance_end')) % 86400;
 
@@ -1407,7 +1444,7 @@ class Worker
                        $duration = max($start, $end) - min($start, $end);
 
                        // Quit when the last cron execution had been after the previous window
-                       $last_cron = DI::config()->get('system', 'last_cron_daily');
+                       $last_cron = DI::keyValue()->get('last_cron_daily');
                        if ($last_cron + $duration > time()) {
                                Logger::info('The Daily cron had been executed recently', ['last' => date(DateTimeFormat::MYSQL, $last_cron), 'start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]);
                                return false;