X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=src%2FCore%2FWorker.php;h=e29cf765a4fa38c5885798ef396cc0c6a9b8f89a;hb=44291a465bb9b9c54b8781d6d6f13e1c3f317c1b;hp=7149dd9ce34289545057115702ca4e5fca92e2a2;hpb=80da47921e2faa528bd16e587fe23ec340c847d6;p=friendica.git diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 7149dd9ce3..e29cf765a4 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1,6 +1,6 @@ get('system', 'worker_multiple_fetch'); foreach ($r as $entry) { - // Assure that the priority is an integer value - $entry['priority'] = (int)$entry['priority']; + $entry = self::checkPriority($entry); // The work will be done if (!self::execute($entry)) { @@ -168,6 +166,24 @@ class Worker Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]); } + /** + * Check and fix the priority of a worker task + * @param array $entry + * @return array + */ + private static function checkPriority(array $entry) + { + $entry['priority'] = (int)$entry['priority']; + + if (!in_array($entry['priority'], PRIORITIES)) { + Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]); + DBA::update('workerqueue', ['priority' => PRIORITY_MEDIUM], ['id' => $entry['id']]); + $entry['priority'] = PRIORITY_MEDIUM; + } + + return $entry; + } + /** * Checks if the system is ready. * @@ -261,7 +277,7 @@ class Worker $workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]); self::$db_duration += (microtime(true) - $stamp); if (DBA::isResult($workerqueue)) { - return $workerqueue["priority"]; + return $workerqueue['priority']; } else { return 0; } @@ -284,41 +300,41 @@ class Worker /** * Checks if the given file is valid to be included * - * @param mixed $file - * @return bool + * @param mixed $file + * @return bool */ private static function validateInclude(&$file) { $orig_file = $file; - + $file = realpath($file); - + if (strpos($file, getcwd()) !== 0) { return false; } - + $file = str_replace(getcwd() . "/", "", $file, $count); if ($count != 1) { return false; } - + if ($orig_file !== $file) { return false; } - + $valid = false; if (strpos($file, "include/") === 0) { $valid = true; } - + if (strpos($file, "addon/") === 0) { $valid = true; } - + // Simply return flag return $valid; } - + /** * Execute a worker entry * @@ -466,13 +482,13 @@ class Worker $cooldown = DI::config()->get("system", "worker_cooldown", 0); if ($cooldown > 0) { - Logger::info('Pre execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]); + Logger::info('Pre execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]); sleep($cooldown); } Logger::enableWorker($funcname); - Logger::info("Process start.", ['priority' => $queue["priority"], 'id' => $queue["id"]]); + Logger::info("Process start.", ['priority' => $queue['priority'], 'id' => $queue["id"]]); $stamp = (float)microtime(true); @@ -480,11 +496,6 @@ class Worker // For this reason the variables have to be initialized. DI::profiler()->reset(); - if (!in_array($queue['priority'], PRIORITIES)) { - Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]); - $queue['priority'] = PRIORITY_MEDIUM; - } - $a->setQueue($queue); $up_duration = microtime(true) - self::$up_start; @@ -529,21 +540,21 @@ class Worker self::$lock_duration = 0; if ($duration > 3600) { - Logger::info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); } elseif ($duration > 600) { - Logger::info('Longer than 10 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); } elseif ($duration > 300) { - Logger::info('Longer than 5 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); } elseif ($duration > 120) { - Logger::info('Longer than 2 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); } - Logger::info('Process done.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration, 3)]); + Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration, 3)]); DI::profiler()->saveLog(DI::logger(), "ID " . $queue["id"] . ": " . $funcname); if ($cooldown > 0) { - Logger::info('Post execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]); + Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]); sleep($cooldown); } } @@ -649,6 +660,8 @@ class Worker self::$db_duration += (microtime(true) - $stamp); while ($entry = DBA::fetch($entries)) { + $entry = self::checkPriority($entry); + if (!posix_kill($entry["pid"], 0)) { $stamp = (float)microtime(true); DBA::update( @@ -660,14 +673,10 @@ class Worker self::$db_duration_write += (microtime(true) - $stamp); } else { // Kill long running processes - // Check if the priority is in a valid range - if (!in_array($entry["priority"], [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE])) { - $entry["priority"] = PRIORITY_MEDIUM; - } // Define the maximum durations $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720]; - $max_duration = $max_duration_defaults[$entry["priority"]]; + $max_duration = $max_duration_defaults[$entry['priority']]; $argv = json_decode($entry['parameter'], true); if (!empty($entry['command'])) { @@ -689,12 +698,12 @@ class Worker // We killed the stale process. // To avoid a blocking situation we reschedule the process at the beginning of the queue. // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL) - $new_priority = $entry["priority"]; - if ($entry["priority"] == PRIORITY_HIGH) { + $new_priority = $entry['priority']; + if ($entry['priority'] == PRIORITY_HIGH) { $new_priority = PRIORITY_MEDIUM; - } elseif ($entry["priority"] == PRIORITY_MEDIUM) { + } elseif ($entry['priority'] == PRIORITY_MEDIUM) { $new_priority = PRIORITY_LOW; - } elseif ($entry["priority"] != PRIORITY_CRITICAL) { + } elseif ($entry['priority'] != PRIORITY_CRITICAL) { $new_priority = PRIORITY_NEGLIGIBLE; } $stamp = (float)microtime(true); @@ -778,12 +787,12 @@ class Worker self::$db_duration_stat += (microtime(true) - $stamp); while ($entry = DBA::fetch($jobs)) { $stamp = (float)microtime(true); - $running = DBA::count('workerqueue-view', ['priority' => $entry["priority"]]); + $running = DBA::count('workerqueue-view', ['priority' => $entry['priority']]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_stat += (microtime(true) - $stamp); $idle_workers -= $running; $waiting_processes += $entry["entries"]; - $listitem[$entry["priority"]] = $entry["priority"] . ":" . $running . "/" . $entry["entries"]; + $listitem[$entry['priority']] = $entry['priority'] . ":" . $running . "/" . $entry["entries"]; } DBA::close($jobs); } else { @@ -795,7 +804,7 @@ class Worker while ($entry = DBA::fetch($jobs)) { $idle_workers -= $entry["running"]; - $listitem[$entry["priority"]] = $entry["priority"].":".$entry["running"]; + $listitem[$entry['priority']] = $entry['priority'].":".$entry["running"]; } DBA::close($jobs); } @@ -1140,6 +1149,32 @@ class Worker // Cleaning dead processes self::killStaleWorkers(); + + // Remove old entries from the workerqueue + self::cleanWorkerQueue(); + } + + /** + * Remove old entries from the workerqueue + * + * @return void + */ + private static function cleanWorkerQueue() + { + DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]); + + // Optimizing this table only last seconds + if (DI::config()->get('system', 'optimize_tables')) { + // We are acquiring the two locks from the worker to avoid locking problems + if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) { + if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) { + DBA::e("OPTIMIZE TABLE `workerqueue`"); + DBA::e("OPTIMIZE TABLE `process`"); + DI::lock()->release(Worker::LOCK_WORKER); + } + DI::lock()->release(Worker::LOCK_PROCESS); + } + } } /** @@ -1287,7 +1322,7 @@ class Worker $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]); $added = 0; - if (!in_array($priority, PRIORITIES)) { + if (!is_int($priority) || !in_array($priority, PRIORITIES)) { Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]); $priority = PRIORITY_MEDIUM; } @@ -1378,8 +1413,9 @@ class Worker * Defers the current worker entry * * @return boolean had the entry been deferred? + * @throws \Exception */ - public static function defer() + public static function defer(): bool { $queue = DI::app()->getQueue(); @@ -1387,7 +1423,8 @@ class Worker return false; } - $retrial = $queue['retrial']; + $queue = self::checkPriority($queue); + $id = $queue['id']; $priority = $queue['priority'];