]> git.mxchange.org Git - friendica.git/blobdiff - src/Core/Worker.php
Merge pull request #11506 from annando/featured-worker
[friendica.git] / src / Core / Worker.php
index faa29717c145b05ae521d25bff671672524ffe63..e29cf765a4fa38c5885798ef396cc0c6a9b8f89a 100644 (file)
@@ -1,6 +1,6 @@
 <?php
 /**
- * @copyright Copyright (C) 2010-2021, the Friendica project
+ * @copyright Copyright (C) 2010-2022, the Friendica project
  *
  * @license GNU AGPL version 3 or any later version
  *
@@ -22,7 +22,6 @@
 namespace Friendica\Core;
 
 use Friendica\App\Mode;
-use Friendica\Core;
 use Friendica\Core\Worker\Entity\Process;
 use Friendica\Database\DBA;
 use Friendica\DI;
@@ -52,7 +51,7 @@ class Worker
        private static $last_update;
        private static $state;
        private static $daemon_mode = null;
-       /** @var Worker\Entity\Process */
+       /** @var Process */
        private static $process;
 
        /**
@@ -105,8 +104,7 @@ class Worker
                        // Don't refetch when a worker fetches tasks for multiple workers
                        $refetched = DI::config()->get('system', 'worker_multiple_fetch');
                        foreach ($r as $entry) {
-                               // Assure that the priority is an integer value
-                               $entry['priority'] = (int)$entry['priority'];
+                               $entry = self::checkPriority($entry);
 
                                // The work will be done
                                if (!self::execute($entry)) {
@@ -168,6 +166,24 @@ class Worker
                Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]);
        }
 
+       /**
+        * Check and fix the priority of a worker task
+        * @param array $entry
+        * @return array
+        */
+       private static function checkPriority(array $entry)
+       {
+               $entry['priority'] = (int)$entry['priority'];
+
+               if (!in_array($entry['priority'], PRIORITIES)) {
+                       Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
+                       DBA::update('workerqueue', ['priority' => PRIORITY_MEDIUM], ['id' => $entry['id']]);
+                       $entry['priority'] = PRIORITY_MEDIUM;
+               }
+
+               return $entry;
+       }
+
        /**
         * Checks if the system is ready.
         *
@@ -261,7 +277,7 @@ class Worker
                $workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]);
                self::$db_duration += (microtime(true) - $stamp);
                if (DBA::isResult($workerqueue)) {
-                       return $workerqueue["priority"];
+                       return $workerqueue['priority'];
                } else {
                        return 0;
                }
@@ -284,41 +300,41 @@ class Worker
        /**
         * Checks if the given file is valid to be included
         *
-        * @param mixed $file 
-        * @return bool 
+        * @param mixed $file
+        * @return bool
         */
        private static function validateInclude(&$file)
        {
                $orig_file = $file;
-       
+
                $file = realpath($file);
-       
+
                if (strpos($file, getcwd()) !== 0) {
                        return false;
                }
-       
+
                $file = str_replace(getcwd() . "/", "", $file, $count);
                if ($count != 1) {
                        return false;
                }
-       
+
                if ($orig_file !== $file) {
                        return false;
                }
-       
+
                $valid = false;
                if (strpos($file, "include/") === 0) {
                        $valid = true;
                }
-       
+
                if (strpos($file, "addon/") === 0) {
                        $valid = true;
                }
-       
+
                // Simply return flag
                return $valid;
        }
-       
+
        /**
         * Execute a worker entry
         *
@@ -466,13 +482,13 @@ class Worker
 
                $cooldown = DI::config()->get("system", "worker_cooldown", 0);
                if ($cooldown > 0) {
-                       Logger::info('Pre execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]);
+                       Logger::info('Pre execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]);
                        sleep($cooldown);
                }
 
                Logger::enableWorker($funcname);
 
-               Logger::info("Process start.", ['priority' => $queue["priority"], 'id' => $queue["id"]]);
+               Logger::info("Process start.", ['priority' => $queue['priority'], 'id' => $queue["id"]]);
 
                $stamp = (float)microtime(true);
 
@@ -480,11 +496,6 @@ class Worker
                // For this reason the variables have to be initialized.
                DI::profiler()->reset();
 
-               if (!in_array($queue['priority'], PRIORITIES)) {
-                       Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]);
-                       $queue['priority'] = PRIORITY_MEDIUM;
-               }
-
                $a->setQueue($queue);
 
                $up_duration = microtime(true) - self::$up_start;
@@ -529,21 +540,21 @@ class Worker
                self::$lock_duration = 0;
 
                if ($duration > 3600) {
-                       Logger::info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+                       Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
                } elseif ($duration > 600) {
-                       Logger::info('Longer than 10 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+                       Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
                } elseif ($duration > 300) {
-                       Logger::info('Longer than 5 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+                       Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
                } elseif ($duration > 120) {
-                       Logger::info('Longer than 2 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+                       Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
                }
 
-               Logger::info('Process done.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration, 3)]);
+               Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration, 3)]);
 
                DI::profiler()->saveLog(DI::logger(), "ID " . $queue["id"] . ": " . $funcname);
 
                if ($cooldown > 0) {
-                       Logger::info('Post execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]);
+                       Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]);
                        sleep($cooldown);
                }
        }
@@ -649,6 +660,8 @@ class Worker
                self::$db_duration += (microtime(true) - $stamp);
 
                while ($entry = DBA::fetch($entries)) {
+                       $entry = self::checkPriority($entry);
+
                        if (!posix_kill($entry["pid"], 0)) {
                                $stamp = (float)microtime(true);
                                DBA::update(
@@ -660,14 +673,10 @@ class Worker
                                self::$db_duration_write += (microtime(true) - $stamp);
                        } else {
                                // Kill long running processes
-                               // Check if the priority is in a valid range
-                               if (!in_array($entry["priority"], [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE])) {
-                                       $entry["priority"] = PRIORITY_MEDIUM;
-                               }
 
                                // Define the maximum durations
                                $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
-                               $max_duration = $max_duration_defaults[$entry["priority"]];
+                               $max_duration = $max_duration_defaults[$entry['priority']];
 
                                $argv = json_decode($entry['parameter'], true);
                                if (!empty($entry['command'])) {
@@ -689,12 +698,12 @@ class Worker
                                        // We killed the stale process.
                                        // To avoid a blocking situation we reschedule the process at the beginning of the queue.
                                        // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
-                                       $new_priority = $entry["priority"];
-                                       if ($entry["priority"] == PRIORITY_HIGH) {
+                                       $new_priority = $entry['priority'];
+                                       if ($entry['priority'] == PRIORITY_HIGH) {
                                                $new_priority = PRIORITY_MEDIUM;
-                                       } elseif ($entry["priority"] == PRIORITY_MEDIUM) {
+                                       } elseif ($entry['priority'] == PRIORITY_MEDIUM) {
                                                $new_priority = PRIORITY_LOW;
-                                       } elseif ($entry["priority"] != PRIORITY_CRITICAL) {
+                                       } elseif ($entry['priority'] != PRIORITY_CRITICAL) {
                                                $new_priority = PRIORITY_NEGLIGIBLE;
                                        }
                                        $stamp = (float)microtime(true);
@@ -754,7 +763,7 @@ class Worker
                                        }
 
                                        $stamp = (float)microtime(true);
-                                       $jobs = DBA::count('workerqueue', ["`done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE", $interval]);
+                                       $jobs = DBA::count('workerqueue', ["`done` AND `executed` > ?", DateTimeFormat::utc('now - ' . $interval . ' minute')]);
                                        self::$db_duration += (microtime(true) - $stamp);
                                        self::$db_duration_stat += (microtime(true) - $stamp);
                                        $jobs_per_minute[$interval] = number_format($jobs / $interval, 0);
@@ -778,12 +787,12 @@ class Worker
                                self::$db_duration_stat += (microtime(true) - $stamp);
                                while ($entry = DBA::fetch($jobs)) {
                                        $stamp = (float)microtime(true);
-                                       $running = DBA::count('workerqueue-view', ['priority' => $entry["priority"]]);
+                                       $running = DBA::count('workerqueue-view', ['priority' => $entry['priority']]);
                                        self::$db_duration += (microtime(true) - $stamp);
                                        self::$db_duration_stat += (microtime(true) - $stamp);
                                        $idle_workers -= $running;
                                        $waiting_processes += $entry["entries"];
-                                       $listitem[$entry["priority"]] = $entry["priority"] . ":" . $running . "/" . $entry["entries"];
+                                       $listitem[$entry['priority']] = $entry['priority'] . ":" . $running . "/" . $entry["entries"];
                                }
                                DBA::close($jobs);
                        } else {
@@ -795,7 +804,7 @@ class Worker
 
                                while ($entry = DBA::fetch($jobs)) {
                                        $idle_workers -= $entry["running"];
-                                       $listitem[$entry["priority"]] = $entry["priority"].":".$entry["running"];
+                                       $listitem[$entry['priority']] = $entry['priority'].":".$entry["running"];
                                }
                                DBA::close($jobs);
                        }
@@ -1140,6 +1149,32 @@ class Worker
 
                // Cleaning dead processes
                self::killStaleWorkers();
+
+               // Remove old entries from the workerqueue
+               self::cleanWorkerQueue();
+       }
+
+       /**
+        * Remove old entries from the workerqueue
+        *
+        * @return void
+        */
+       private static function cleanWorkerQueue()
+       {
+               DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]);
+
+               // Optimizing this table only last seconds
+               if (DI::config()->get('system', 'optimize_tables')) {
+                       // We are acquiring the two locks from the worker to avoid locking problems
+                       if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) {
+                               if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) {
+                                       DBA::e("OPTIMIZE TABLE `workerqueue`");
+                                       DBA::e("OPTIMIZE TABLE `process`");
+                                       DI::lock()->release(Worker::LOCK_WORKER);
+                               }
+                               DI::lock()->release(Worker::LOCK_PROCESS);
+                       }
+               }
        }
 
        /**
@@ -1183,22 +1218,22 @@ class Worker
                DBA::connect();
 
                DI::flushLogger();
-               $process = DI::process()->create($pid);
+               $process = DI::process()->create(getmypid(), basename(__FILE__));
 
                $cycles = 0;
-               while (!self::IPCJobsExists($pid) && (++$cycles < 100)) {
+               while (!self::IPCJobsExists($process->pid) && (++$cycles < 100)) {
                        usleep(10000);
                }
 
-               Logger::info('Worker spawned', ['pid' => $pid, 'wait_cycles' => $cycles]);
+               Logger::info('Worker spawned', ['pid' => $process->pid, 'wait_cycles' => $cycles]);
 
                self::processQueue($do_cron, $process);
 
                self::unclaimProcess($process);
 
-               self::IPCSetJobState(false, $pid);
+               self::IPCSetJobState(false, $process->pid);
                DI::process()->delete($process);
-               Logger::info('Worker ended', ['pid' => $pid]);
+               Logger::info('Worker ended', ['pid' => $process->pid]);
                exit();
        }
 
@@ -1287,7 +1322,7 @@ class Worker
                $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
                $added = 0;
 
-               if (!in_array($priority, PRIORITIES)) {
+               if (!is_int($priority) || !in_array($priority, PRIORITIES)) {
                        Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]);
                        $priority = PRIORITY_MEDIUM;
                }
@@ -1378,8 +1413,9 @@ class Worker
         * Defers the current worker entry
         *
         * @return boolean had the entry been deferred?
+        * @throws \Exception
         */
-       public static function defer()
+       public static function defer(): bool
        {
                $queue = DI::app()->getQueue();
 
@@ -1387,7 +1423,8 @@ class Worker
                        return false;
                }
 
-               $retrial = $queue['retrial'];
+               $queue = self::checkPriority($queue);
+
                $id = $queue['id'];
                $priority = $queue['priority'];