X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=src%2FCore%2FWorker.php;h=e29cf765a4fa38c5885798ef396cc0c6a9b8f89a;hb=44291a465bb9b9c54b8781d6d6f13e1c3f317c1b;hp=3443e6608e98248e4177f4fe3ffa4900963e910e;hpb=f7b6507438a551975016076784ef90079d1662fa;p=friendica.git diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 3443e6608e..e29cf765a4 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -22,7 +22,6 @@ namespace Friendica\Core; use Friendica\App\Mode; -use Friendica\Core; use Friendica\Core\Worker\Entity\Process; use Friendica\Database\DBA; use Friendica\DI; @@ -105,12 +104,7 @@ class Worker // Don't refetch when a worker fetches tasks for multiple workers $refetched = DI::config()->get('system', 'worker_multiple_fetch'); foreach ($r as $entry) { - // Assure that the priority is an integer value - $entry['priority'] = (int)$entry['priority']; - if (!in_array($entry['priority'], PRIORITIES)) { - Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]); - $entry['priority'] = PRIORITY_MEDIUM; - } + $entry = self::checkPriority($entry); // The work will be done if (!self::execute($entry)) { @@ -172,6 +166,24 @@ class Worker Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]); } + /** + * Check and fix the priority of a worker task + * @param array $entry + * @return array + */ + private static function checkPriority(array $entry) + { + $entry['priority'] = (int)$entry['priority']; + + if (!in_array($entry['priority'], PRIORITIES)) { + Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]); + DBA::update('workerqueue', ['priority' => PRIORITY_MEDIUM], ['id' => $entry['id']]); + $entry['priority'] = PRIORITY_MEDIUM; + } + + return $entry; + } + /** * Checks if the system is ready. * @@ -288,41 +300,41 @@ class Worker /** * Checks if the given file is valid to be included * - * @param mixed $file - * @return bool + * @param mixed $file + * @return bool */ private static function validateInclude(&$file) { $orig_file = $file; - + $file = realpath($file); - + if (strpos($file, getcwd()) !== 0) { return false; } - + $file = str_replace(getcwd() . "/", "", $file, $count); if ($count != 1) { return false; } - + if ($orig_file !== $file) { return false; } - + $valid = false; if (strpos($file, "include/") === 0) { $valid = true; } - + if (strpos($file, "addon/") === 0) { $valid = true; } - + // Simply return flag return $valid; } - + /** * Execute a worker entry * @@ -484,11 +496,6 @@ class Worker // For this reason the variables have to be initialized. DI::profiler()->reset(); - if (!in_array($queue['priority'], PRIORITIES)) { - Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]); - $queue['priority'] = PRIORITY_MEDIUM; - } - $a->setQueue($queue); $up_duration = microtime(true) - self::$up_start; @@ -653,6 +660,8 @@ class Worker self::$db_duration += (microtime(true) - $stamp); while ($entry = DBA::fetch($entries)) { + $entry = self::checkPriority($entry); + if (!posix_kill($entry["pid"], 0)) { $stamp = (float)microtime(true); DBA::update( @@ -664,11 +673,6 @@ class Worker self::$db_duration_write += (microtime(true) - $stamp); } else { // Kill long running processes - // Check if the priority is in a valid range - if (!in_array($entry['priority'], PRIORITIES)) { - Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]); - $entry['priority'] = PRIORITY_MEDIUM; - } // Define the maximum durations $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720]; @@ -1145,6 +1149,32 @@ class Worker // Cleaning dead processes self::killStaleWorkers(); + + // Remove old entries from the workerqueue + self::cleanWorkerQueue(); + } + + /** + * Remove old entries from the workerqueue + * + * @return void + */ + private static function cleanWorkerQueue() + { + DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]); + + // Optimizing this table only last seconds + if (DI::config()->get('system', 'optimize_tables')) { + // We are acquiring the two locks from the worker to avoid locking problems + if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) { + if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) { + DBA::e("OPTIMIZE TABLE `workerqueue`"); + DBA::e("OPTIMIZE TABLE `process`"); + DI::lock()->release(Worker::LOCK_WORKER); + } + DI::lock()->release(Worker::LOCK_PROCESS); + } + } } /** @@ -1292,7 +1322,7 @@ class Worker $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]); $added = 0; - if (!in_array($priority, PRIORITIES)) { + if (!is_int($priority) || !in_array($priority, PRIORITIES)) { Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]); $priority = PRIORITY_MEDIUM; } @@ -1393,14 +1423,11 @@ class Worker return false; } + $queue = self::checkPriority($queue); + $id = $queue['id']; $priority = $queue['priority']; - if (!in_array($priority, PRIORITIES)) { - Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]); - $priority = PRIORITY_MEDIUM; - } - $max_level = DI::config()->get('system', 'worker_defer_limit'); $new_retrial = self::getNextRetrial($queue, $max_level);