<?php
/**
- * @copyright Copyright (C) 2010-2022, the Friendica project
+ * @copyright Copyright (C) 2010-2023, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
*/
class Worker
{
+ /**
+ * @name Priority
+ *
+ * Process priority for the worker
+ * @{
+ */
+ const PRIORITY_UNDEFINED = 0;
+ const PRIORITY_CRITICAL = 10;
+ const PRIORITY_HIGH = 20;
+ const PRIORITY_MEDIUM = 30;
+ const PRIORITY_LOW = 40;
+ const PRIORITY_NEGLIGIBLE = 50;
+ const PRIORITIES = [self::PRIORITY_CRITICAL, self::PRIORITY_HIGH, self::PRIORITY_MEDIUM, self::PRIORITY_LOW, self::PRIORITY_NEGLIGIBLE];
+ /* @}*/
+
const STATE_STARTUP = 1; // Worker is in startup. This takes most time.
const STATE_LONG_LOOP = 2; // Worker is processing the whole - long - loop.
const STATE_REFETCH = 3; // Worker had refetched jobs in the execution loop.
self::$process = $process;
// Kill stale processes every 5 minutes
- $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0);
+ $last_cleanup = DI::keyValue()->get('worker_last_cleaned') ?? 0;
if (time() > ($last_cleanup + 300)) {
- DI::config()->set('system', 'worker_last_cleaned', time());
+ DI::keyValue()->set( 'worker_last_cleaned', time());
Worker\Cron::killStaleWorkers();
}
if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
// Count active workers and compare them with a maximum value that depends on the load
if (self::tooMuchWorkers()) {
- Logger::notice('Active worker limit reached, quitting.');
+ Logger::info('Active worker limit reached, quitting.');
DI::lock()->release(self::LOCK_WORKER);
return;
}
{
// Count active workers and compare them with a maximum value that depends on the load
if (self::tooMuchWorkers()) {
- Logger::notice('Active worker limit reached, quitting.');
+ Logger::info('Active worker limit reached, quitting.');
return false;
}
return false;
}
- $valid = false;
- if (strpos($file, 'include/') === 0) {
- $valid = true;
- }
-
- if (strpos($file, 'addon/') === 0) {
- $valid = true;
- }
-
- // Simply return flag
- return $valid;
+ return (strpos($file, 'addon/') === 0);
}
/**
$mypid = getmypid();
// Quit when in maintenance
- if (DI::config()->get('system', 'maintenance', false, true)) {
+ if (DI::config()->get('system', 'maintenance', false)) {
Logger::notice('Maintenance mode - quit process', ['pid' => $mypid]);
return false;
}
return false;
}
- // Check for existance and validity of the include file
+ // Check for existence and validity of the include file
$include = $argv[0];
if (method_exists(sprintf('Friendica\Worker\%s', $include), 'execute')) {
$stamp = (float)microtime(true);
$condition = ["`id` = ? AND `next_try` < ?", $queue['id'], DateTimeFormat::utcNow()];
if (DBA::update('workerqueue', ['done' => true], $condition)) {
- DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow());
+ DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow());
}
self::$db_duration = (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
return true;
}
- // The script could be provided as full path or only with the function name
- if ($include == basename($include)) {
- $include = 'include/' . $include . '.php';
- }
-
if (!self::validateInclude($include)) {
Logger::warning('Include file is not valid', ['file' => $argv[0]]);
$stamp = (float)microtime(true);
$stamp = (float)microtime(true);
if (DBA::update('workerqueue', ['done' => true], ['id' => $queue['id']])) {
- DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow());
+ DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow());
}
self::$db_duration = (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
$load_cooldown = DI::config()->get('system', 'worker_load_cooldown');
$processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');
+ if ($load_cooldown == 0) {
+ $load_cooldown = DI::config()->get('system', 'maxloadavg');
+ }
+
if (($load_cooldown == 0) && ($processes_cooldown == 0)) {
return false;
}
- $load = System::getLoadAvg();
+ $load = System::getLoadAvg($processes_cooldown != 0);
if (empty($load)) {
return false;
}
$load_cooldown = DI::config()->get('system', 'worker_load_cooldown');
$processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');
+ if ($load_cooldown == 0) {
+ $load_cooldown = DI::config()->get('system', 'maxloadavg');
+ }
+
if (($load_cooldown == 0) && ($processes_cooldown == 0)) {
return;
}
$sleeping = false;
- while ($load = System::getLoadAvg()) {
+ while ($load = System::getLoadAvg($processes_cooldown != 0)) {
if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) {
if (!$sleeping) {
- Logger::notice('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+ Logger::info('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
$sleeping = true;
}
sleep(1);
}
if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) {
if (!$sleeping) {
- Logger::notice('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+ Logger::info('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
$sleeping = true;
}
sleep(1);
}
if ($sleeping) {
- Logger::notice('Cooldown ended.', ['max-load' => $load_cooldown, 'max-processes' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+ Logger::info('Cooldown ended.', ['max-load' => $load_cooldown, 'max-processes' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
}
}
// Set the workerLogger as new default logger
if ($method_call) {
- call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv);
+ try {
+ call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv);
+ } catch (\TypeError $e) {
+ // No need to defer a worker queue entry if the arguments are invalid
+ Logger::notice('Wrong worker arguments', ['class' => $funcname, 'argv' => $argv, 'queue' => $queue, 'message' => $e->getMessage()]);
+ } catch (\Throwable $e) {
+ Logger::error('Uncaught exception in worker execution', ['class' => get_class($e), 'message' => $e->getMessage(), 'code' => $e->getCode(), 'file' => $e->getFile() . ':' . $e->getLine(), 'trace' => $e->getTraceAsString(), 'previous' => $e->getPrevious()]);
+ Worker::defer();
+ }
} else {
$funcname($argv, count($argv));
}
/* With these values we can analyze how effective the worker is.
* The database and rest time should be low since this is the unproductive time.
* The execution time is the productive time.
- * By changing parameters like the maximum number of workers we can check the effectivness.
+ * By changing parameters like the maximum number of workers we can check the effectiveness.
*/
$dbtotal = round(self::$db_duration, 2);
$dbread = round(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 2);
$rest = round(max(0, $up_duration - (self::$db_duration + self::$lock_duration)), 2);
$exec = round($duration, 2);
- Logger::info('Performance:', ['state' => self::$state, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'lock' => $dblock, 'total' => $dbtotal, 'rest' => $rest, 'exec' => $exec]);
+ Logger::info('Performance:', ['function' => $funcname, 'state' => self::$state, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'lock' => $dblock, 'total' => $dbtotal, 'rest' => $rest, 'exec' => $exec]);
self::coolDown();
Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
}
- Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration, 3)]);
+ Logger::info('Process done.', ['function' => $funcname, 'priority' => $queue['priority'], 'retrial' => $queue['retrial'], 'id' => $queue['id'], 'duration' => round($duration, 3)]);
DI::profiler()->saveLog(DI::logger(), 'ID ' . $queue['id'] . ': ' . $funcname);
}
DBA::close($r);
}
+ $stamp = (float)microtime(true);
+ $used = 0;
+ $sleep = 0;
+ $data = DBA::p("SHOW PROCESSLIST");
+ while ($row = DBA::fetch($data)) {
+ if ($row['Command'] != 'Sleep') {
+ ++$used;
+ } else {
+ ++$sleep;
+ }
+ }
+ DBA::close($data);
+ self::$db_duration += (microtime(true) - $stamp);
+
// If $max is set we will use the processlist to determine the current number of connections
// The processlist only shows entries of the current user
if ($max != 0) {
- $stamp = (float)microtime(true);
- $r = DBA::p('SHOW PROCESSLIST');
- self::$db_duration += (microtime(true) - $stamp);
- $used = DBA::numRows($r);
- DBA::close($r);
-
- Logger::info('Connection usage (user values)', ['usage' => $used, 'max' => $max]);
+ Logger::info('Connection usage (user values)', ['working' => $used, 'sleeping' => $sleep, 'max' => $max]);
$level = ($used / $max) * 100;
if (!DBA::isResult($r)) {
return false;
}
- $used = intval($r['Value']);
+ $used = max($used, intval($r['Value'])) - $sleep;
if ($used == 0) {
return false;
}
- Logger::info('Connection usage (system values)', ['used' => $used, 'max' => $max]);
+ Logger::info('Connection usage (system values)', ['working' => $used, 'sleeping' => $sleep, 'max' => $max]);
$level = $used / $max * 100;
$top_priority = self::highestPriority();
$high_running = self::processWithPriorityActive($top_priority);
- if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
+ if (!$high_running && ($top_priority > self::PRIORITY_UNDEFINED) && ($top_priority < self::PRIORITY_NEGLIGIBLE)) {
Logger::info('Jobs with a higher priority are waiting but none is executed. Open a fastlane.', ['priority' => $top_priority]);
$queues = $active + 1;
}
}
- Logger::notice('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues);
+ Logger::info('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues);
// Are there fewer workers running as possible? Then fork a new one.
if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists() && !self::systemLimitReached()) {
/**
* Returns waiting jobs for the current process id
*
- * @return array|bool waiting workerqueue jobs or FALSE on failture
+ * @return array|bool waiting workerqueue jobs or FALSE on failure
* @throws \Exception
*/
private static function getWaitingJobForPID()
private static function nextPriority()
{
$waiting = [];
- $priorities = [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE];
+ $priorities = [self::PRIORITY_CRITICAL, self::PRIORITY_HIGH, self::PRIORITY_MEDIUM, self::PRIORITY_LOW, self::PRIORITY_NEGLIGIBLE];
foreach ($priorities as $priority) {
$stamp = (float)microtime(true);
if (DBA::exists('workerqueue', ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()])) {
self::$db_duration += (microtime(true) - $stamp);
}
- if (!empty($waiting[PRIORITY_CRITICAL])) {
- return PRIORITY_CRITICAL;
+ if (!empty($waiting[self::PRIORITY_CRITICAL])) {
+ return self::PRIORITY_CRITICAL;
}
$running = [];
* @param (integer|array) priority or parameter array, strings are deprecated and are ignored
*
* next args are passed as $cmd command line
- * or: Worker::add(PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id);
- * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id);
+ * or: Worker::add(Worker::PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id);
+ * or: Worker::add(array('priority' => Worker::PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id);
*
* @return int '0' if worker queue entry already existed or there had been an error, otherwise the ID of the worker task
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
return 1;
}
- $priority = PRIORITY_MEDIUM;
+ $priority = self::PRIORITY_MEDIUM;
// Don't fork from frontend tasks by default
$dont_fork = DI::config()->get('system', 'worker_dont_fork', false) || !DI::mode()->isBackend();
$created = DateTimeFormat::utcNow();
$command = array_shift($args);
$parameters = json_encode($args);
- $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
+ $queue = DBA::selectFirst('workerqueue', ['id', 'priority'], ['command' => $command, 'parameter' => $parameters, 'done' => false]);
$added = 0;
- if (!is_int($priority) || !in_array($priority, PRIORITIES)) {
+ if (!is_int($priority) || !in_array($priority, self::PRIORITIES)) {
Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]);
- $priority = PRIORITY_MEDIUM;
+ $priority = self::PRIORITY_MEDIUM;
}
// Quit if there was a database error - a precaution for the update process to 3.5.3
return 0;
}
- if (!$found) {
+ if (empty($queue)) {
if (!DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created,
'priority' => $priority, 'next_try' => $delayed])) {
return 0;
}
$added = DBA::lastInsertId();
} elseif ($force_priority) {
- DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
+ $ret = DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
+ if ($ret && ($priority != $queue['priority'])) {
+ $added = $queue['id'];
+ }
}
// Set the IPC flag to ensure an immediate process execution via daemon
return $added;
}
- // Quit on daemon mode
- if (Worker\Daemon::isMode()) {
+ // Quit on daemon mode, except the priority is critical (like for db updates)
+ if (Worker\Daemon::isMode() && $priority !== self::PRIORITY_CRITICAL) {
return $added;
}
return $new_retrial;
}
+ /**
+ * Get the number of retrials for the current worker task
+ *
+ * @return integer
+ */
+ public static function getRetrial(): int
+ {
+ $queue = DI::app()->getQueue();
+ return $queue['retrial'] ?? 0;
+ }
+
/**
* Defers the current worker entry
*
$delay = (($new_retrial + 2) ** 4) + (rand(1, 30) * ($new_retrial));
$next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
- if (($priority < PRIORITY_MEDIUM) && ($new_retrial > 3)) {
- $priority = PRIORITY_MEDIUM;
- } elseif (($priority < PRIORITY_LOW) && ($new_retrial > 6)) {
- $priority = PRIORITY_LOW;
- } elseif (($priority < PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) {
- $priority = PRIORITY_NEGLIGIBLE;
+ if (($priority < self::PRIORITY_MEDIUM) && ($new_retrial > 3)) {
+ $priority = self::PRIORITY_MEDIUM;
+ } elseif (($priority < self::PRIORITY_LOW) && ($new_retrial > 6)) {
+ $priority = self::PRIORITY_LOW;
+ } elseif (($priority < self::PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) {
+ $priority = self::PRIORITY_NEGLIGIBLE;
}
Logger::info('Deferred task', ['id' => $id, 'retrial' => $new_retrial, 'created' => $queue['created'], 'next_execution' => $next, 'old_prio' => $queue['priority'], 'new_prio' => $priority]);
*/
public static function isInMaintenanceWindow(bool $check_last_execution = false): bool
{
- // Calculate the seconds of the start end end of the maintenance window
+ // Calculate the seconds of the start and end of the maintenance window
$start = strtotime(DI::config()->get('system', 'maintenance_start')) % 86400;
$end = strtotime(DI::config()->get('system', 'maintenance_end')) % 86400;
$duration = max($start, $end) - min($start, $end);
// Quit when the last cron execution had been after the previous window
- $last_cron = DI::config()->get('system', 'last_cron_daily');
+ $last_cron = DI::keyValue()->get('last_cron_daily');
if ($last_cron + $duration > time()) {
Logger::info('The Daily cron had been executed recently', ['last' => date(DateTimeFormat::MYSQL, $last_cron), 'start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]);
return false;