<?php
/**
- * @copyright Copyright (C) 2010-2021, the Friendica project
+ * @copyright Copyright (C) 2010-2022, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
namespace Friendica\Core;
use Friendica\App\Mode;
-use Friendica\Core;
use Friendica\Core\Worker\Entity\Process;
use Friendica\Database\DBA;
use Friendica\DI;
private static $last_update;
private static $state;
private static $daemon_mode = null;
- /** @var Worker\Entity\Process */
+ /** @var Process */
private static $process;
/**
// Don't refetch when a worker fetches tasks for multiple workers
$refetched = DI::config()->get('system', 'worker_multiple_fetch');
foreach ($r as $entry) {
- // Assure that the priority is an integer value
- $entry['priority'] = (int)$entry['priority'];
+ $entry = self::checkPriority($entry);
// The work will be done
if (!self::execute($entry)) {
Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]);
}
+ /**
+ * Check and fix the priority of a worker task
+ * @param array $entry
+ * @return array
+ */
+ private static function checkPriority(array $entry)
+ {
+ $entry['priority'] = (int)$entry['priority'];
+
+ if (!in_array($entry['priority'], PRIORITIES)) {
+ Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
+ DBA::update('workerqueue', ['priority' => PRIORITY_MEDIUM], ['id' => $entry['id']]);
+ $entry['priority'] = PRIORITY_MEDIUM;
+ }
+
+ return $entry;
+ }
+
/**
* Checks if the system is ready.
*
$workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]);
self::$db_duration += (microtime(true) - $stamp);
if (DBA::isResult($workerqueue)) {
- return $workerqueue["priority"];
+ return $workerqueue['priority'];
} else {
return 0;
}
/**
* Checks if the given file is valid to be included
*
- * @param mixed $file
- * @return bool
+ * @param mixed $file
+ * @return bool
*/
private static function validateInclude(&$file)
{
$orig_file = $file;
-
+
$file = realpath($file);
-
+
if (strpos($file, getcwd()) !== 0) {
return false;
}
-
+
$file = str_replace(getcwd() . "/", "", $file, $count);
if ($count != 1) {
return false;
}
-
+
if ($orig_file !== $file) {
return false;
}
-
+
$valid = false;
if (strpos($file, "include/") === 0) {
$valid = true;
}
-
+
if (strpos($file, "addon/") === 0) {
$valid = true;
}
-
+
// Simply return flag
return $valid;
}
-
+
/**
* Execute a worker entry
*
$cooldown = DI::config()->get("system", "worker_cooldown", 0);
if ($cooldown > 0) {
- Logger::info('Pre execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]);
+ Logger::info('Pre execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]);
sleep($cooldown);
}
Logger::enableWorker($funcname);
- Logger::info("Process start.", ['priority' => $queue["priority"], 'id' => $queue["id"]]);
+ Logger::info("Process start.", ['priority' => $queue['priority'], 'id' => $queue["id"]]);
$stamp = (float)microtime(true);
// For this reason the variables have to be initialized.
DI::profiler()->reset();
- if (!in_array($queue['priority'], PRIORITIES)) {
- Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]);
- $queue['priority'] = PRIORITY_MEDIUM;
- }
-
$a->setQueue($queue);
$up_duration = microtime(true) - self::$up_start;
self::$lock_duration = 0;
if ($duration > 3600) {
- Logger::info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+ Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
} elseif ($duration > 600) {
- Logger::info('Longer than 10 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+ Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
} elseif ($duration > 300) {
- Logger::info('Longer than 5 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+ Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
} elseif ($duration > 120) {
- Logger::info('Longer than 2 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+ Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
}
- Logger::info('Process done.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration, 3)]);
+ Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration, 3)]);
DI::profiler()->saveLog(DI::logger(), "ID " . $queue["id"] . ": " . $funcname);
if ($cooldown > 0) {
- Logger::info('Post execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]);
+ Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]);
sleep($cooldown);
}
}
self::$db_duration += (microtime(true) - $stamp);
while ($entry = DBA::fetch($entries)) {
+ $entry = self::checkPriority($entry);
+
if (!posix_kill($entry["pid"], 0)) {
$stamp = (float)microtime(true);
DBA::update(
self::$db_duration_write += (microtime(true) - $stamp);
} else {
// Kill long running processes
- // Check if the priority is in a valid range
- if (!in_array($entry["priority"], [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE])) {
- $entry["priority"] = PRIORITY_MEDIUM;
- }
// Define the maximum durations
$max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
- $max_duration = $max_duration_defaults[$entry["priority"]];
+ $max_duration = $max_duration_defaults[$entry['priority']];
$argv = json_decode($entry['parameter'], true);
if (!empty($entry['command'])) {
// We killed the stale process.
// To avoid a blocking situation we reschedule the process at the beginning of the queue.
// Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
- $new_priority = $entry["priority"];
- if ($entry["priority"] == PRIORITY_HIGH) {
+ $new_priority = $entry['priority'];
+ if ($entry['priority'] == PRIORITY_HIGH) {
$new_priority = PRIORITY_MEDIUM;
- } elseif ($entry["priority"] == PRIORITY_MEDIUM) {
+ } elseif ($entry['priority'] == PRIORITY_MEDIUM) {
$new_priority = PRIORITY_LOW;
- } elseif ($entry["priority"] != PRIORITY_CRITICAL) {
+ } elseif ($entry['priority'] != PRIORITY_CRITICAL) {
$new_priority = PRIORITY_NEGLIGIBLE;
}
$stamp = (float)microtime(true);
}
$stamp = (float)microtime(true);
- $jobs = DBA::count('workerqueue', ["`done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE", $interval]);
+ $jobs = DBA::count('workerqueue', ["`done` AND `executed` > ?", DateTimeFormat::utc('now - ' . $interval . ' minute')]);
self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_stat += (microtime(true) - $stamp);
$jobs_per_minute[$interval] = number_format($jobs / $interval, 0);
self::$db_duration_stat += (microtime(true) - $stamp);
while ($entry = DBA::fetch($jobs)) {
$stamp = (float)microtime(true);
- $running = DBA::count('workerqueue-view', ['priority' => $entry["priority"]]);
+ $running = DBA::count('workerqueue-view', ['priority' => $entry['priority']]);
self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_stat += (microtime(true) - $stamp);
$idle_workers -= $running;
$waiting_processes += $entry["entries"];
- $listitem[$entry["priority"]] = $entry["priority"] . ":" . $running . "/" . $entry["entries"];
+ $listitem[$entry['priority']] = $entry['priority'] . ":" . $running . "/" . $entry["entries"];
}
DBA::close($jobs);
} else {
while ($entry = DBA::fetch($jobs)) {
$idle_workers -= $entry["running"];
- $listitem[$entry["priority"]] = $entry["priority"].":".$entry["running"];
+ $listitem[$entry['priority']] = $entry['priority'].":".$entry["running"];
}
DBA::close($jobs);
}
// Cleaning dead processes
self::killStaleWorkers();
+
+ // Remove old entries from the workerqueue
+ self::cleanWorkerQueue();
+ }
+
+ /**
+ * Remove old entries from the workerqueue
+ *
+ * @return void
+ */
+ private static function cleanWorkerQueue()
+ {
+ DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]);
+
+ // Optimizing this table only last seconds
+ if (DI::config()->get('system', 'optimize_tables')) {
+ // We are acquiring the two locks from the worker to avoid locking problems
+ if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) {
+ if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) {
+ DBA::e("OPTIMIZE TABLE `workerqueue`");
+ DBA::e("OPTIMIZE TABLE `process`");
+ DI::lock()->release(Worker::LOCK_WORKER);
+ }
+ DI::lock()->release(Worker::LOCK_PROCESS);
+ }
+ }
}
/**
DBA::connect();
DI::flushLogger();
- $process = DI::process()->create($pid);
+ $process = DI::process()->create(getmypid(), basename(__FILE__));
$cycles = 0;
- while (!self::IPCJobsExists($pid) && (++$cycles < 100)) {
+ while (!self::IPCJobsExists($process->pid) && (++$cycles < 100)) {
usleep(10000);
}
- Logger::info('Worker spawned', ['pid' => $pid, 'wait_cycles' => $cycles]);
+ Logger::info('Worker spawned', ['pid' => $process->pid, 'wait_cycles' => $cycles]);
self::processQueue($do_cron, $process);
self::unclaimProcess($process);
- self::IPCSetJobState(false, $pid);
+ self::IPCSetJobState(false, $process->pid);
DI::process()->delete($process);
- Logger::info('Worker ended', ['pid' => $pid]);
+ Logger::info('Worker ended', ['pid' => $process->pid]);
exit();
}
$found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
$added = 0;
- if (!in_array($priority, PRIORITIES)) {
+ if (!is_int($priority) || !in_array($priority, PRIORITIES)) {
Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]);
$priority = PRIORITY_MEDIUM;
}
* Defers the current worker entry
*
* @return boolean had the entry been deferred?
+ * @throws \Exception
*/
- public static function defer()
+ public static function defer(): bool
{
$queue = DI::app()->getQueue();
return false;
}
- $retrial = $queue['retrial'];
+ $queue = self::checkPriority($queue);
+
$id = $queue['id'];
$priority = $queue['priority'];