]> git.mxchange.org Git - friendica.git/blobdiff - src/Core/Worker.php
Merge pull request #11506 from annando/featured-worker
[friendica.git] / src / Core / Worker.php
index 3443e6608e98248e4177f4fe3ffa4900963e910e..e29cf765a4fa38c5885798ef396cc0c6a9b8f89a 100644 (file)
@@ -22,7 +22,6 @@
 namespace Friendica\Core;
 
 use Friendica\App\Mode;
-use Friendica\Core;
 use Friendica\Core\Worker\Entity\Process;
 use Friendica\Database\DBA;
 use Friendica\DI;
@@ -105,12 +104,7 @@ class Worker
                        // Don't refetch when a worker fetches tasks for multiple workers
                        $refetched = DI::config()->get('system', 'worker_multiple_fetch');
                        foreach ($r as $entry) {
-                               // Assure that the priority is an integer value
-                               $entry['priority'] = (int)$entry['priority'];
-                               if (!in_array($entry['priority'], PRIORITIES)) {
-                                       Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
-                                       $entry['priority'] = PRIORITY_MEDIUM;
-                               }
+                               $entry = self::checkPriority($entry);
 
                                // The work will be done
                                if (!self::execute($entry)) {
@@ -172,6 +166,24 @@ class Worker
                Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]);
        }
 
+       /**
+        * Check and fix the priority of a worker task
+        * @param array $entry
+        * @return array
+        */
+       private static function checkPriority(array $entry)
+       {
+               $entry['priority'] = (int)$entry['priority'];
+
+               if (!in_array($entry['priority'], PRIORITIES)) {
+                       Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
+                       DBA::update('workerqueue', ['priority' => PRIORITY_MEDIUM], ['id' => $entry['id']]);
+                       $entry['priority'] = PRIORITY_MEDIUM;
+               }
+
+               return $entry;
+       }
+
        /**
         * Checks if the system is ready.
         *
@@ -288,41 +300,41 @@ class Worker
        /**
         * Checks if the given file is valid to be included
         *
-        * @param mixed $file 
-        * @return bool 
+        * @param mixed $file
+        * @return bool
         */
        private static function validateInclude(&$file)
        {
                $orig_file = $file;
-       
+
                $file = realpath($file);
-       
+
                if (strpos($file, getcwd()) !== 0) {
                        return false;
                }
-       
+
                $file = str_replace(getcwd() . "/", "", $file, $count);
                if ($count != 1) {
                        return false;
                }
-       
+
                if ($orig_file !== $file) {
                        return false;
                }
-       
+
                $valid = false;
                if (strpos($file, "include/") === 0) {
                        $valid = true;
                }
-       
+
                if (strpos($file, "addon/") === 0) {
                        $valid = true;
                }
-       
+
                // Simply return flag
                return $valid;
        }
-       
+
        /**
         * Execute a worker entry
         *
@@ -484,11 +496,6 @@ class Worker
                // For this reason the variables have to be initialized.
                DI::profiler()->reset();
 
-               if (!in_array($queue['priority'], PRIORITIES)) {
-                       Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]);
-                       $queue['priority'] = PRIORITY_MEDIUM;
-               }
-
                $a->setQueue($queue);
 
                $up_duration = microtime(true) - self::$up_start;
@@ -653,6 +660,8 @@ class Worker
                self::$db_duration += (microtime(true) - $stamp);
 
                while ($entry = DBA::fetch($entries)) {
+                       $entry = self::checkPriority($entry);
+
                        if (!posix_kill($entry["pid"], 0)) {
                                $stamp = (float)microtime(true);
                                DBA::update(
@@ -664,11 +673,6 @@ class Worker
                                self::$db_duration_write += (microtime(true) - $stamp);
                        } else {
                                // Kill long running processes
-                               // Check if the priority is in a valid range
-                               if (!in_array($entry['priority'], PRIORITIES)) {
-                                       Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
-                                       $entry['priority'] = PRIORITY_MEDIUM;
-                               }
 
                                // Define the maximum durations
                                $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
@@ -1145,6 +1149,32 @@ class Worker
 
                // Cleaning dead processes
                self::killStaleWorkers();
+
+               // Remove old entries from the workerqueue
+               self::cleanWorkerQueue();
+       }
+
+       /**
+        * Remove old entries from the workerqueue
+        *
+        * @return void
+        */
+       private static function cleanWorkerQueue()
+       {
+               DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]);
+
+               // Optimizing this table only last seconds
+               if (DI::config()->get('system', 'optimize_tables')) {
+                       // We are acquiring the two locks from the worker to avoid locking problems
+                       if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) {
+                               if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) {
+                                       DBA::e("OPTIMIZE TABLE `workerqueue`");
+                                       DBA::e("OPTIMIZE TABLE `process`");
+                                       DI::lock()->release(Worker::LOCK_WORKER);
+                               }
+                               DI::lock()->release(Worker::LOCK_PROCESS);
+                       }
+               }
        }
 
        /**
@@ -1292,7 +1322,7 @@ class Worker
                $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
                $added = 0;
 
-               if (!in_array($priority, PRIORITIES)) {
+               if (!is_int($priority) || !in_array($priority, PRIORITIES)) {
                        Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]);
                        $priority = PRIORITY_MEDIUM;
                }
@@ -1393,14 +1423,11 @@ class Worker
                        return false;
                }
 
+               $queue = self::checkPriority($queue);
+
                $id = $queue['id'];
                $priority = $queue['priority'];
 
-               if (!in_array($priority, PRIORITIES)) {
-                       Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]);
-                       $priority = PRIORITY_MEDIUM;
-               }
-
                $max_level = DI::config()->get('system', 'worker_defer_limit');
 
                $new_retrial = self::getNextRetrial($queue, $max_level);