namespace Friendica\Core;
use Friendica\App\Mode;
-use Friendica\Core;
use Friendica\Core\Worker\Entity\Process;
use Friendica\Database\DBA;
use Friendica\DI;
// Don't refetch when a worker fetches tasks for multiple workers
$refetched = DI::config()->get('system', 'worker_multiple_fetch');
foreach ($r as $entry) {
- // Assure that the priority is an integer value
- $entry['priority'] = (int)$entry['priority'];
- if (!in_array($entry['priority'], PRIORITIES)) {
- Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
- $entry['priority'] = PRIORITY_MEDIUM;
- }
+ $entry = self::checkPriority($entry);
// The work will be done
if (!self::execute($entry)) {
Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]);
}
+ /**
+ * Check and fix the priority of a worker task
+ * @param array $entry
+ * @return array
+ */
+ private static function checkPriority(array $entry)
+ {
+ $entry['priority'] = (int)$entry['priority'];
+
+ if (!in_array($entry['priority'], PRIORITIES)) {
+ Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
+ DBA::update('workerqueue', ['priority' => PRIORITY_MEDIUM], ['id' => $entry['id']]);
+ $entry['priority'] = PRIORITY_MEDIUM;
+ }
+
+ return $entry;
+ }
+
/**
* Checks if the system is ready.
*
/**
* Checks if the given file is valid to be included
*
- * @param mixed $file
- * @return bool
+ * @param mixed $file
+ * @return bool
*/
private static function validateInclude(&$file)
{
$orig_file = $file;
-
+
$file = realpath($file);
-
+
if (strpos($file, getcwd()) !== 0) {
return false;
}
-
+
$file = str_replace(getcwd() . "/", "", $file, $count);
if ($count != 1) {
return false;
}
-
+
if ($orig_file !== $file) {
return false;
}
-
+
$valid = false;
if (strpos($file, "include/") === 0) {
$valid = true;
}
-
+
if (strpos($file, "addon/") === 0) {
$valid = true;
}
-
+
// Simply return flag
return $valid;
}
-
+
/**
* Execute a worker entry
*
// For this reason the variables have to be initialized.
DI::profiler()->reset();
- if (!in_array($queue['priority'], PRIORITIES)) {
- Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]);
- $queue['priority'] = PRIORITY_MEDIUM;
- }
-
$a->setQueue($queue);
$up_duration = microtime(true) - self::$up_start;
self::$db_duration += (microtime(true) - $stamp);
while ($entry = DBA::fetch($entries)) {
+ $entry = self::checkPriority($entry);
+
if (!posix_kill($entry["pid"], 0)) {
$stamp = (float)microtime(true);
DBA::update(
self::$db_duration_write += (microtime(true) - $stamp);
} else {
// Kill long running processes
- // Check if the priority is in a valid range
- if (!in_array($entry['priority'], PRIORITIES)) {
- Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
- $entry['priority'] = PRIORITY_MEDIUM;
- }
// Define the maximum durations
$max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
// Cleaning dead processes
self::killStaleWorkers();
+
+ // Remove old entries from the workerqueue
+ self::cleanWorkerQueue();
+ }
+
+ /**
+ * Remove old entries from the workerqueue
+ *
+ * @return void
+ */
+ private static function cleanWorkerQueue()
+ {
+ DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]);
+
+ // Optimizing this table only last seconds
+ if (DI::config()->get('system', 'optimize_tables')) {
+ // We are acquiring the two locks from the worker to avoid locking problems
+ if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) {
+ if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) {
+ DBA::e("OPTIMIZE TABLE `workerqueue`");
+ DBA::e("OPTIMIZE TABLE `process`");
+ DI::lock()->release(Worker::LOCK_WORKER);
+ }
+ DI::lock()->release(Worker::LOCK_PROCESS);
+ }
+ }
}
/**
$found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
$added = 0;
- if (!in_array($priority, PRIORITIES)) {
+ if (!is_int($priority) || !in_array($priority, PRIORITIES)) {
Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]);
$priority = PRIORITY_MEDIUM;
}
return false;
}
+ $queue = self::checkPriority($queue);
+
$id = $queue['id'];
$priority = $queue['priority'];
- if (!in_array($priority, PRIORITIES)) {
- Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]);
- $priority = PRIORITY_MEDIUM;
- }
-
$max_level = DI::config()->get('system', 'worker_defer_limit');
$new_retrial = self::getNextRetrial($queue, $max_level);