<?php
/**
- * @copyright Copyright (C) 2010-2022, the Friendica project
+ * @copyright Copyright (C) 2010-2023, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
self::$process = $process;
// Kill stale processes every 5 minutes
- $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0);
+ $last_cleanup = DI::keyValue()->get('worker_last_cleaned') ?? 0;
if (time() > ($last_cleanup + 300)) {
- DI::config()->set('system', 'worker_last_cleaned', time());
+ DI::keyValue()->set( 'worker_last_cleaned', time());
Worker\Cron::killStaleWorkers();
}
if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
// Count active workers and compare them with a maximum value that depends on the load
if (self::tooMuchWorkers()) {
- Logger::notice('Active worker limit reached, quitting.');
+ Logger::info('Active worker limit reached, quitting.');
DI::lock()->release(self::LOCK_WORKER);
return;
}
{
// Count active workers and compare them with a maximum value that depends on the load
if (self::tooMuchWorkers()) {
- Logger::notice('Active worker limit reached, quitting.');
+ Logger::info('Active worker limit reached, quitting.');
return false;
}
return false;
}
- $valid = false;
- if (strpos($file, 'include/') === 0) {
- $valid = true;
- }
-
- if (strpos($file, 'addon/') === 0) {
- $valid = true;
- }
-
- // Simply return flag
- return $valid;
+ return (strpos($file, 'addon/') === 0);
}
/**
$mypid = getmypid();
// Quit when in maintenance
- if (DI::config()->get('system', 'maintenance', false, true)) {
+ if (DI::config()->get('system', 'maintenance', false)) {
Logger::notice('Maintenance mode - quit process', ['pid' => $mypid]);
return false;
}
$stamp = (float)microtime(true);
$condition = ["`id` = ? AND `next_try` < ?", $queue['id'], DateTimeFormat::utcNow()];
if (DBA::update('workerqueue', ['done' => true], $condition)) {
- DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow());
+ DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow());
}
self::$db_duration = (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
return true;
}
- // The script could be provided as full path or only with the function name
- if ($include == basename($include)) {
- $include = 'include/' . $include . '.php';
- }
-
if (!self::validateInclude($include)) {
Logger::warning('Include file is not valid', ['file' => $argv[0]]);
$stamp = (float)microtime(true);
$stamp = (float)microtime(true);
if (DBA::update('workerqueue', ['done' => true], ['id' => $queue['id']])) {
- DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow());
+ DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow());
}
self::$db_duration = (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
return false;
}
- $load = System::getLoadAvg();
+ $load = System::getLoadAvg($processes_cooldown != 0);
if (empty($load)) {
return false;
}
$sleeping = false;
- while ($load = System::getLoadAvg()) {
+ while ($load = System::getLoadAvg($processes_cooldown != 0)) {
if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) {
if (!$sleeping) {
- Logger::notice('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+ Logger::info('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
$sleeping = true;
}
sleep(1);
}
if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) {
if (!$sleeping) {
- Logger::notice('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+ Logger::info('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
$sleeping = true;
}
sleep(1);
}
if ($sleeping) {
- Logger::notice('Cooldown ended.', ['max-load' => $load_cooldown, 'max-processes' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+ Logger::info('Cooldown ended.', ['max-load' => $load_cooldown, 'max-processes' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
}
}
// Set the workerLogger as new default logger
if ($method_call) {
- call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv);
+ try {
+ call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv);
+ } catch (\TypeError $e) {
+ // No need to defer a worker queue entry if the arguments are invalid
+ Logger::notice('Wrong worker arguments', ['class' => $funcname, 'argv' => $argv, 'queue' => $queue, 'message' => $e->getMessage()]);
+ } catch (\Throwable $e) {
+ Logger::error('Uncaught exception in worker execution', ['class' => get_class($e), 'message' => $e->getMessage(), 'code' => $e->getCode(), 'file' => $e->getFile() . ':' . $e->getLine(), 'trace' => $e->getTraceAsString(), 'previous' => $e->getPrevious()]);
+ Worker::defer();
+ }
} else {
$funcname($argv, count($argv));
}
}
}
- Logger::notice('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues);
+ Logger::info('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues);
// Are there fewer workers running as possible? Then fork a new one.
if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists() && !self::systemLimitReached()) {
$command = array_shift($args);
$parameters = json_encode($args);
- $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
+ $queue = DBA::selectFirst('workerqueue', ['id', 'priority'], ['command' => $command, 'parameter' => $parameters, 'done' => false]);
$added = 0;
if (!is_int($priority) || !in_array($priority, self::PRIORITIES)) {
return 0;
}
- if (!$found) {
+ if (empty($queue)) {
if (!DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created,
'priority' => $priority, 'next_try' => $delayed])) {
return 0;
}
$added = DBA::lastInsertId();
} elseif ($force_priority) {
- DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
+ $ret = DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
+ if ($ret && ($priority != $queue['priority'])) {
+ $added = $queue['id'];
+ }
}
// Set the IPC flag to ensure an immediate process execution via daemon
return $added;
}
- // Quit on daemon mode
- if (Worker\Daemon::isMode()) {
+ // Quit on daemon mode, except the priority is critical (like for db updates)
+ if (Worker\Daemon::isMode() && $priority !== self::PRIORITY_CRITICAL) {
return $added;
}
$duration = max($start, $end) - min($start, $end);
// Quit when the last cron execution had been after the previous window
- $last_cron = DI::config()->get('system', 'last_cron_daily');
+ $last_cron = DI::keyValue()->get('last_cron_daily');
if ($last_cron + $duration > time()) {
Logger::info('The Daily cron had been executed recently', ['last' => date(DateTimeFormat::MYSQL, $last_cron), 'start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]);
return false;