<?php
/**
- * @copyright Copyright (C) 2010-2022, the Friendica project
+ * @copyright Copyright (C) 2010-2023, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
namespace Friendica\Core;
-use Friendica\App\Mode;
use Friendica\Core\Worker\Entity\Process;
use Friendica\Database\DBA;
use Friendica\DI;
*/
class Worker
{
+ /**
+ * @name Priority
+ *
+ * Process priority for the worker
+ * @{
+ */
+ const PRIORITY_UNDEFINED = 0;
+ const PRIORITY_CRITICAL = 10;
+ const PRIORITY_HIGH = 20;
+ const PRIORITY_MEDIUM = 30;
+ const PRIORITY_LOW = 40;
+ const PRIORITY_NEGLIGIBLE = 50;
+ const PRIORITIES = [self::PRIORITY_CRITICAL, self::PRIORITY_HIGH, self::PRIORITY_MEDIUM, self::PRIORITY_LOW, self::PRIORITY_NEGLIGIBLE];
+ /* @}*/
+
const STATE_STARTUP = 1; // Worker is in startup. This takes most time.
const STATE_LONG_LOOP = 2; // Worker is processing the whole - long - loop.
const STATE_REFETCH = 3; // Worker had refetched jobs in the execution loop.
private static $lock_duration = 0;
private static $last_update;
private static $state;
- private static $daemon_mode = null;
/** @var Process */
private static $process;
* @return void
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
*/
- public static function processQueue($run_cron, Process $process)
+ public static function processQueue(bool $run_cron, Process $process)
{
self::$up_start = microtime(true);
self::$process = $process;
// Kill stale processes every 5 minutes
- $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0);
+ $last_cleanup = DI::keyValue()->get('worker_last_cleaned') ?? 0;
if (time() > ($last_cleanup + 300)) {
- DI::config()->set('system', 'worker_last_cleaned', time());
- self::killStaleWorkers();
+ DI::keyValue()->set( 'worker_last_cleaned', time());
+ Worker\Cron::killStaleWorkers();
}
// Check if the system is ready
// Now we start additional cron processes if we should do so
if ($run_cron) {
- self::runCron();
+ Worker\Cron::run();
}
$last_check = $starttime = time();
// We fetch the next queue entry that is about to be executed
while ($r = self::workerProcess()) {
- if (self::IPCJobsExists(getmypid())) {
- self::IPCDeleteJobState(getmypid());
+ if (Worker\IPC::JobsExists(getmypid())) {
+ Worker\IPC::DeleteJobState(getmypid());
}
// Don't refetch when a worker fetches tasks for multiple workers
$refetched = DI::config()->get('system', 'worker_multiple_fetch');
foreach ($r as $entry) {
- $entry = self::checkPriority($entry);
-
// The work will be done
if (!self::execute($entry)) {
- Logger::notice('Process execution failed, quitting.');
+ Logger::warning('Process execution failed, quitting.', ['entry' => $entry]);
return;
}
if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
// Count active workers and compare them with a maximum value that depends on the load
if (self::tooMuchWorkers()) {
- Logger::notice('Active worker limit reached, quitting.');
+ Logger::info('Active worker limit reached, quitting.');
DI::lock()->release(self::LOCK_WORKER);
return;
}
}
// Quit the worker once every cron interval
- if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
+ if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60)) && !self::systemLimitReached()) {
Logger::info('Process lifetime reached, respawning.');
self::unclaimProcess($process);
- if (self::isDaemonMode()) {
- self::IPCSetJobState(true);
+ if (Worker\Daemon::isMode()) {
+ Worker\IPC::SetJobState(true);
} else {
self::spawnWorker();
}
}
// Cleaning up. Possibly not needed, but it doesn't harm anything.
- if (self::isDaemonMode()) {
- self::IPCSetJobState(false);
+ if (Worker\Daemon::isMode()) {
+ Worker\IPC::SetJobState(false);
}
Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]);
}
- /**
- * Check and fix the priority of a worker task
- * @param array $entry
- * @return array
- */
- private static function checkPriority(array $entry)
- {
- $entry['priority'] = (int)$entry['priority'];
-
- if (!in_array($entry['priority'], PRIORITIES)) {
- Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
- DBA::update('workerqueue', ['priority' => PRIORITY_MEDIUM], ['id' => $entry['id']]);
- $entry['priority'] = PRIORITY_MEDIUM;
- }
-
- return $entry;
- }
-
/**
* Checks if the system is ready.
*
*
* @return boolean
*/
- public static function isReady()
+ public static function isReady(): bool
{
// Count active workers and compare them with a maximum value that depends on the load
if (self::tooMuchWorkers()) {
- Logger::notice('Active worker limit reached, quitting.');
+ Logger::info('Active worker limit reached, quitting.');
return false;
}
* @return boolean Returns "true" if tasks are existing
* @throws \Exception
*/
- public static function entriesExists()
+ public static function entriesExists(): bool
{
$stamp = (float)microtime(true);
$exists = DBA::exists('workerqueue', ["NOT `done` AND `pid` = 0 AND `next_try` < ?", DateTimeFormat::utcNow()]);
* @return integer Number of deferred entries in the worker queue
* @throws \Exception
*/
- private static function deferredEntries()
+ private static function deferredEntries(): int
{
$stamp = (float)microtime(true);
$count = DBA::count('workerqueue', ["NOT `done` AND `pid` = 0 AND `retrial` > ?", 0]);
* @return integer Number of non executed entries in the worker queue
* @throws \Exception
*/
- private static function totalEntries()
+ private static function totalEntries(): int
{
$stamp = (float)microtime(true);
$count = DBA::count('workerqueue', ['done' => false, 'pid' => 0]);
* @return integer Number of active worker processes
* @throws \Exception
*/
- private static function highestPriority()
+ private static function highestPriority(): int
{
$stamp = (float)microtime(true);
$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
* @return integer Is there a process running with that priority?
* @throws \Exception
*/
- private static function processWithPriorityActive($priority)
+ private static function processWithPriorityActive(int $priority): int
{
$condition = ["`priority` <= ? AND `pid` != 0 AND NOT `done`", $priority];
return DBA::exists('workerqueue', $condition);
* @param mixed $file
* @return bool
*/
- private static function validateInclude(&$file)
+ private static function validateInclude(&$file): bool
{
$orig_file = $file;
return false;
}
- $file = str_replace(getcwd() . "/", "", $file, $count);
+ $file = str_replace(getcwd() . '/', '', $file, $count);
if ($count != 1) {
return false;
}
return false;
}
- $valid = false;
- if (strpos($file, "include/") === 0) {
- $valid = true;
- }
-
- if (strpos($file, "addon/") === 0) {
- $valid = true;
- }
-
- // Simply return flag
- return $valid;
+ return (strpos($file, 'addon/') === 0);
}
/**
* @return boolean "true" if further processing should be stopped
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
*/
- public static function execute($queue)
+ public static function execute(array $queue): bool
{
$mypid = getmypid();
// Quit when in maintenance
- if (DI::config()->get('system', 'maintenance', false, true)) {
- Logger::notice("Maintenance mode - quit process", ['pid' => $mypid]);
+ if (DI::config()->get('system', 'maintenance', false)) {
+ Logger::notice('Maintenance mode - quit process', ['pid' => $mypid]);
return false;
}
// Constantly check the number of parallel database processes
if (DI::system()->isMaxProcessesReached()) {
- Logger::warning("Max processes reached for process", ['pid' => $mypid]);
+ Logger::warning('Max processes reached for process', ['pid' => $mypid]);
return false;
}
// Constantly check the number of available database connections to let the frontend be accessible at any time
if (self::maxConnectionsReached()) {
- Logger::warning("Max connection reached for process", ['pid' => $mypid]);
+ Logger::warning('Max connection reached for process', ['pid' => $mypid]);
return false;
}
return false;
}
- // Check for existance and validity of the include file
+ // Check for existence and validity of the include file
$include = $argv[0];
if (method_exists(sprintf('Friendica\Worker\%s', $include), 'execute')) {
// We constantly update the "executed" date every minute to avoid being killed too soon
if (!isset(self::$last_update)) {
- self::$last_update = strtotime($queue["executed"]);
+ self::$last_update = strtotime($queue['executed']);
}
$age = (time() - self::$last_update) / 60;
$stamp = (float)microtime(true);
$condition = ["`id` = ? AND `next_try` < ?", $queue['id'], DateTimeFormat::utcNow()];
if (DBA::update('workerqueue', ['done' => true], $condition)) {
- DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow());
+ DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow());
}
self::$db_duration = (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
return true;
}
- // The script could be provided as full path or only with the function name
- if ($include == basename($include)) {
- $include = "include/".$include.".php";
- }
-
if (!self::validateInclude($include)) {
- Logger::warning("Include file is not valid", ['file' => $argv[0]]);
+ Logger::warning('Include file is not valid', ['file' => $argv[0]]);
$stamp = (float)microtime(true);
- DBA::delete('workerqueue', ['id' => $queue["id"]]);
+ DBA::delete('workerqueue', ['id' => $queue['id']]);
self::$db_duration = (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
return true;
require_once $include;
- $funcname = str_replace(".php", "", basename($argv[0]))."_run";
+ $funcname = str_replace('.php', '', basename($argv[0])) .'_run';
if (function_exists($funcname)) {
// We constantly update the "executed" date every minute to avoid being killed too soon
if (!isset(self::$last_update)) {
- self::$last_update = strtotime($queue["executed"]);
+ self::$last_update = strtotime($queue['executed']);
}
$age = (time() - self::$last_update) / 60;
self::execFunction($queue, $funcname, $argv, false);
$stamp = (float)microtime(true);
- if (DBA::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) {
- DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow());
+ if (DBA::update('workerqueue', ['done' => true], ['id' => $queue['id']])) {
+ DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow());
}
self::$db_duration = (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
} else {
- Logger::warning("Function does not exist", ['function' => $funcname]);
+ Logger::warning('Function does not exist', ['function' => $funcname]);
$stamp = (float)microtime(true);
- DBA::delete('workerqueue', ['id' => $queue["id"]]);
+ DBA::delete('workerqueue', ['id' => $queue['id']]);
self::$db_duration = (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
}
return true;
}
+ /**
+ * Checks if system limits are reached.
+ *
+ * @return boolean
+ */
+ private static function systemLimitReached(): bool
+ {
+ $load_cooldown = DI::config()->get('system', 'worker_load_cooldown');
+ $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');
+
+ if ($load_cooldown == 0) {
+ $load_cooldown = DI::config()->get('system', 'maxloadavg');
+ }
+
+ if (($load_cooldown == 0) && ($processes_cooldown == 0)) {
+ return false;
+ }
+
+ $load = System::getLoadAvg($processes_cooldown != 0);
+ if (empty($load)) {
+ return false;
+ }
+
+ if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) {
+ return true;
+ }
+
+ if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Slow the execution down if the system load is too high
+ *
+ * @return void
+ */
+ public static function coolDown()
+ {
+ $cooldown = DI::config()->get('system', 'worker_cooldown', 0);
+ if ($cooldown > 0) {
+ Logger::debug('Wait for cooldown.', ['cooldown' => $cooldown]);
+ if ($cooldown < 1) {
+ usleep($cooldown * 1000000);
+ } else {
+ sleep($cooldown);
+ }
+ }
+
+ $load_cooldown = DI::config()->get('system', 'worker_load_cooldown');
+ $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');
+
+ if ($load_cooldown == 0) {
+ $load_cooldown = DI::config()->get('system', 'maxloadavg');
+ }
+
+ if (($load_cooldown == 0) && ($processes_cooldown == 0)) {
+ return;
+ }
+
+ $sleeping = false;
+
+ while ($load = System::getLoadAvg($processes_cooldown != 0)) {
+ if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) {
+ if (!$sleeping) {
+ Logger::info('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+ $sleeping = true;
+ }
+ sleep(1);
+ continue;
+ }
+ if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) {
+ if (!$sleeping) {
+ Logger::info('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+ $sleeping = true;
+ }
+ sleep(1);
+ continue;
+ }
+ break;
+ }
+
+ if ($sleeping) {
+ Logger::info('Cooldown ended.', ['max-load' => $load_cooldown, 'max-processes' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+ }
+ }
+
/**
* Execute a function from the queue
*
* @return void
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
*/
- private static function execFunction($queue, $funcname, $argv, $method_call)
+ private static function execFunction(array $queue, string $funcname, array $argv, bool $method_call)
{
$a = DI::app();
- $cooldown = DI::config()->get("system", "worker_cooldown", 0);
- if ($cooldown > 0) {
- Logger::info('Pre execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]);
- sleep($cooldown);
- }
+ self::coolDown();
Logger::enableWorker($funcname);
- Logger::info("Process start.", ['priority' => $queue['priority'], 'id' => $queue["id"]]);
+ Logger::info('Process start.', ['priority' => $queue['priority'], 'id' => $queue['id']]);
$stamp = (float)microtime(true);
// Set the workerLogger as new default logger
if ($method_call) {
- call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv);
+ try {
+ call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv);
+ } catch (\TypeError $e) {
+ // No need to defer a worker queue entry if the arguments are invalid
+ Logger::notice('Wrong worker arguments', ['class' => $funcname, 'argv' => $argv, 'queue' => $queue, 'message' => $e->getMessage()]);
+ } catch (\Throwable $e) {
+ Logger::error('Uncaught exception in worker execution', ['class' => get_class($e), 'message' => $e->getMessage(), 'code' => $e->getCode(), 'file' => $e->getFile() . ':' . $e->getLine(), 'trace' => $e->getTraceAsString(), 'previous' => $e->getPrevious()]);
+ Worker::defer();
+ }
} else {
$funcname($argv, count($argv));
}
/* With these values we can analyze how effective the worker is.
* The database and rest time should be low since this is the unproductive time.
* The execution time is the productive time.
- * By changing parameters like the maximum number of workers we can check the effectivness.
+ * By changing parameters like the maximum number of workers we can check the effectiveness.
*/
$dbtotal = round(self::$db_duration, 2);
$dbread = round(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 2);
Logger::info('Performance:', ['state' => self::$state, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'lock' => $dblock, 'total' => $dbtotal, 'rest' => $rest, 'exec' => $exec]);
+ self::coolDown();
+
self::$up_start = microtime(true);
self::$db_duration = 0;
self::$db_duration_count = 0;
self::$lock_duration = 0;
if ($duration > 3600) {
- Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+ Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
} elseif ($duration > 600) {
- Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+ Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
} elseif ($duration > 300) {
- Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+ Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
} elseif ($duration > 120) {
- Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
+ Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
}
- Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration, 3)]);
-
- DI::profiler()->saveLog(DI::logger(), "ID " . $queue["id"] . ": " . $funcname);
+ Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration, 3)]);
- if ($cooldown > 0) {
- Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]);
- sleep($cooldown);
- }
+ DI::profiler()->saveLog(DI::logger(), 'ID ' . $queue['id'] . ': ' . $funcname);
}
/**
* @return bool Are more than 3/4 of the maximum connections used?
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
*/
- private static function maxConnectionsReached()
+ private static function maxConnectionsReached(): bool
{
// Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself.
- $max = DI::config()->get("system", "max_connections");
+ $max = DI::config()->get('system', 'max_connections');
// Fetch the percentage level where the worker will get active
- $maxlevel = DI::config()->get("system", "max_connections_level", 75);
+ $maxlevel = DI::config()->get('system', 'max_connections_level', 75);
if ($max == 0) {
// the maximum number of possible user connections can be a system variable
$r = DBA::fetchFirst("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
if (DBA::isResult($r)) {
- $max = $r["Value"];
+ $max = $r['Value'];
}
// Or it can be granted. This overrides the system variable
$stamp = (float)microtime(true);
DBA::close($r);
}
+ $stamp = (float)microtime(true);
+ $used = 0;
+ $sleep = 0;
+ $data = DBA::p("SHOW PROCESSLIST");
+ while ($row = DBA::fetch($data)) {
+ if ($row['Command'] != 'Sleep') {
+ ++$used;
+ } else {
+ ++$sleep;
+ }
+ }
+ DBA::close($data);
+ self::$db_duration += (microtime(true) - $stamp);
+
// If $max is set we will use the processlist to determine the current number of connections
// The processlist only shows entries of the current user
if ($max != 0) {
- $stamp = (float)microtime(true);
- $r = DBA::p('SHOW PROCESSLIST');
- self::$db_duration += (microtime(true) - $stamp);
- $used = DBA::numRows($r);
- DBA::close($r);
-
- Logger::info("Connection usage (user values)", ['usage' => $used, 'max' => $max]);
+ Logger::info('Connection usage (user values)', ['working' => $used, 'sleeping' => $sleep, 'max' => $max]);
$level = ($used / $max) * 100;
if ($level >= $maxlevel) {
- Logger::warning("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max);
+ Logger::warning('Maximum level (' . $maxlevel . '%) of user connections reached: ' . $used .'/' . $max);
return true;
}
}
if (!DBA::isResult($r)) {
return false;
}
- $max = intval($r["Value"]);
+ $max = intval($r['Value']);
if ($max == 0) {
return false;
}
if (!DBA::isResult($r)) {
return false;
}
- $used = intval($r["Value"]);
+ $used = max($used, intval($r['Value'])) - $sleep;
if ($used == 0) {
return false;
}
- Logger::info("Connection usage (system values)", ['used' => $used, 'max' => $max]);
+ Logger::info('Connection usage (system values)', ['working' => $used, 'sleeping' => $sleep, 'max' => $max]);
$level = $used / $max * 100;
if ($level < $maxlevel) {
return false;
}
- Logger::warning("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
+ Logger::warning('Maximum level (' . $level . '%) of system connections reached: ' . $used . '/' . $max);
return true;
}
- /**
- * fix the queue entry if the worker process died
- *
- * @return void
- * @throws \Exception
- */
- private static function killStaleWorkers()
- {
- $stamp = (float)microtime(true);
- $entries = DBA::select(
- 'workerqueue',
- ['id', 'pid', 'executed', 'priority', 'command', 'parameter'],
- ['NOT `done` AND `pid` != 0'],
- ['order' => ['priority', 'retrial', 'created']]
- );
- self::$db_duration += (microtime(true) - $stamp);
-
- while ($entry = DBA::fetch($entries)) {
- $entry = self::checkPriority($entry);
-
- if (!posix_kill($entry["pid"], 0)) {
- $stamp = (float)microtime(true);
- DBA::update(
- 'workerqueue',
- ['executed' => DBA::NULL_DATETIME, 'pid' => 0],
- ['id' => $entry["id"]]
- );
- self::$db_duration += (microtime(true) - $stamp);
- self::$db_duration_write += (microtime(true) - $stamp);
- } else {
- // Kill long running processes
-
- // Define the maximum durations
- $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
- $max_duration = $max_duration_defaults[$entry['priority']];
-
- $argv = json_decode($entry['parameter'], true);
- if (!empty($entry['command'])) {
- $command = $entry['command'];
- } elseif (!empty($argv)) {
- $command = array_shift($argv);
- } else {
- return;
- }
-
- $command = basename($command);
-
- // How long is the process already running?
- $duration = (time() - strtotime($entry["executed"])) / 60;
- if ($duration > $max_duration) {
- Logger::notice('Worker process took too much time - killed', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
- posix_kill($entry["pid"], SIGTERM);
-
- // We killed the stale process.
- // To avoid a blocking situation we reschedule the process at the beginning of the queue.
- // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
- $new_priority = $entry['priority'];
- if ($entry['priority'] == PRIORITY_HIGH) {
- $new_priority = PRIORITY_MEDIUM;
- } elseif ($entry['priority'] == PRIORITY_MEDIUM) {
- $new_priority = PRIORITY_LOW;
- } elseif ($entry['priority'] != PRIORITY_CRITICAL) {
- $new_priority = PRIORITY_NEGLIGIBLE;
- }
- $stamp = (float)microtime(true);
- DBA::update(
- 'workerqueue',
- ['executed' => DBA::NULL_DATETIME, 'created' => DateTimeFormat::utcNow(), 'priority' => $new_priority, 'pid' => 0],
- ['id' => $entry["id"]]
- );
- self::$db_duration += (microtime(true) - $stamp);
- self::$db_duration_write += (microtime(true) - $stamp);
- } else {
- Logger::info('Process runtime is okay', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
- }
- }
- }
- DBA::close($entries);
- }
/**
* Checks if the number of active workers exceeds the given limits
* @return bool Are there too much workers running?
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
*/
- private static function tooMuchWorkers()
+ private static function tooMuchWorkers(): bool
{
- $queues = DI::config()->get("system", "worker_queues", 10);
+ $queues = DI::config()->get('system', 'worker_queues', 10);
$maxqueues = $queues;
// Decrease the number of workers at higher load
$load = System::currentLoad();
if ($load) {
- $maxsysload = intval(DI::config()->get("system", "maxloadavg", 20));
+ $maxsysload = intval(DI::config()->get('system', 'maxloadavg', 20));
/* Default exponent 3 causes queues to rapidly decrease as load increases.
* If you have 20 max queues at idle, then you get only 5 queues at 37.1% of $maxsysload.
self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_stat += (microtime(true) - $stamp);
$idle_workers -= $running;
- $waiting_processes += $entry["entries"];
- $listitem[$entry['priority']] = $entry['priority'] . ":" . $running . "/" . $entry["entries"];
+ $waiting_processes += $entry['entries'];
+ $listitem[$entry['priority']] = $entry['priority'] . ':' . $running . '/' . $entry['entries'];
}
DBA::close($jobs);
} else {
self::$db_duration_stat += (microtime(true) - $stamp);
while ($entry = DBA::fetch($jobs)) {
- $idle_workers -= $entry["running"];
- $listitem[$entry['priority']] = $entry['priority'].":".$entry["running"];
+ $idle_workers -= $entry['running'];
+ $listitem[$entry['priority']] = $entry['priority'] . ':' . $entry['running'];
}
DBA::close($jobs);
}
$waiting_processes -= $deferred;
- $listitem[0] = "0:" . max(0, $idle_workers);
+ $listitem[0] = '0:' . max(0, $idle_workers);
$processlist .= ' ('.implode(', ', $listitem).')';
- if (DI::config()->get("system", "worker_fastlane", false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) {
+ if (DI::config()->get('system', 'worker_fastlane', false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) {
$top_priority = self::highestPriority();
$high_running = self::processWithPriorityActive($top_priority);
- if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
- Logger::info("Jobs with a higher priority are waiting but none is executed. Open a fastlane.", ['priority' => $top_priority]);
+ if (!$high_running && ($top_priority > self::PRIORITY_UNDEFINED) && ($top_priority < self::PRIORITY_NEGLIGIBLE)) {
+ Logger::info('Jobs with a higher priority are waiting but none is executed. Open a fastlane.', ['priority' => $top_priority]);
$queues = $active + 1;
}
}
- Logger::notice("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $waiting_processes . $processlist . " - maximum: " . $queues . "/" . $maxqueues);
+ Logger::info('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues);
// Are there fewer workers running as possible? Then fork a new one.
- if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) {
- Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]);
- if (self::isDaemonMode()) {
- self::IPCSetJobState(true);
+ if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists() && !self::systemLimitReached()) {
+ Logger::info('There are fewer workers as possible, fork a new worker.', ['active' => $active, 'queues' => $queues]);
+ if (Worker\Daemon::isMode()) {
+ Worker\IPC::SetJobState(true);
} else {
self::spawnWorker();
}
}
// if there are too much worker, we don't spawn a new one.
- if (self::isDaemonMode() && ($active > $queues)) {
- self::IPCSetJobState(false);
+ if (Worker\Daemon::isMode() && ($active > $queues)) {
+ Worker\IPC::SetJobState(false);
}
return $active > $queues;
* @return integer Number of active worker processes
* @throws \Exception
*/
- private static function activeWorkers()
+ private static function activeWorkers(): int
{
$stamp = (float)microtime(true);
$count = DI::process()->countCommand('Worker.php');
* @return array List of worker process ids
* @throws \Exception
*/
- private static function getWorkerPIDList()
+ private static function getWorkerPIDList(): array
{
$ids = [];
$stamp = (float)microtime(true);
/**
* Returns waiting jobs for the current process id
*
- * @return array waiting workerqueue jobs
+ * @return array|bool waiting workerqueue jobs or FALSE on failure
* @throws \Exception
*/
private static function getWaitingJobForPID()
* @return array array with next jobs
* @throws \Exception
*/
- private static function nextProcess(int $limit)
+ private static function nextProcess(int $limit): array
{
$priority = self::nextPriority();
if (empty($priority)) {
/**
* Returns the priority of the next workerqueue job
*
- * @return string priority
+ * @return string|bool priority or FALSE on failure
* @throws \Exception
*/
private static function nextPriority()
{
$waiting = [];
- $priorities = [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE];
+ $priorities = [self::PRIORITY_CRITICAL, self::PRIORITY_HIGH, self::PRIORITY_MEDIUM, self::PRIORITY_LOW, self::PRIORITY_NEGLIGIBLE];
foreach ($priorities as $priority) {
$stamp = (float)microtime(true);
if (DBA::exists('workerqueue', ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()])) {
self::$db_duration += (microtime(true) - $stamp);
}
- if (!empty($waiting[PRIORITY_CRITICAL])) {
- return PRIORITY_CRITICAL;
+ if (!empty($waiting[self::PRIORITY_CRITICAL])) {
+ return self::PRIORITY_CRITICAL;
}
$running = [];
/**
* Find and claim the next worker process for us
*
- * @return boolean Have we found something?
+ * @return void
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
*/
private static function findWorkerProcesses()
* @return array worker processes
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
*/
- public static function workerProcess()
+ public static function workerProcess(): array
{
// There can already be jobs for us in the queue.
$waiting = self::getWaitingJobForPID();
$stamp = (float)microtime(true);
if (!DI::lock()->acquire(self::LOCK_PROCESS)) {
- return false;
+ return [];
}
self::$lock_duration += (microtime(true) - $stamp);
DI::lock()->release(self::LOCK_PROCESS);
- return self::getWaitingJobForPID();
+ // Prevents "Return value of Friendica\Core\Worker::workerProcess() must be of the type array, bool returned"
+ $process = self::getWaitingJobForPID();
+ return (is_array($process) ? $process : []);
}
/**
self::$db_duration_write += (microtime(true) - $stamp);
}
- /**
- * Runs the cron processes
- *
- * @return void
- * @throws \Friendica\Network\HTTPException\InternalServerErrorException
- */
- private static function runCron()
- {
- Logger::info('Add cron entries');
-
- // Check for spooled items
- self::add(['priority' => PRIORITY_HIGH, 'force_priority' => true], 'SpoolPost');
-
- // Run the cron job that calls all other jobs
- self::add(['priority' => PRIORITY_MEDIUM, 'force_priority' => true], 'Cron');
-
- // Cleaning dead processes
- self::killStaleWorkers();
-
- // Remove old entries from the workerqueue
- self::cleanWorkerQueue();
- }
-
- /**
- * Remove old entries from the workerqueue
- *
- * @return void
- */
- private static function cleanWorkerQueue()
- {
- DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]);
-
- // Optimizing this table only last seconds
- if (DI::config()->get('system', 'optimize_tables')) {
- // We are acquiring the two locks from the worker to avoid locking problems
- if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) {
- if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) {
- DBA::e("OPTIMIZE TABLE `workerqueue`");
- DBA::e("OPTIMIZE TABLE `process`");
- DI::lock()->release(Worker::LOCK_WORKER);
- }
- DI::lock()->release(Worker::LOCK_PROCESS);
- }
- }
- }
-
/**
* Fork a child process
*
// The parent process continues here
DBA::connect();
- self::IPCSetJobState(true, $pid);
+ Worker\IPC::SetJobState(true, $pid);
Logger::info('Spawned new worker', ['pid' => $pid]);
$cycles = 0;
- while (self::IPCJobsExists($pid) && (++$cycles < 100)) {
+ while (Worker\IPC::JobsExists($pid) && (++$cycles < 100)) {
usleep(10000);
}
$process = DI::process()->create(getmypid(), basename(__FILE__));
$cycles = 0;
- while (!self::IPCJobsExists($process->pid) && (++$cycles < 100)) {
+ while (!Worker\IPC::JobsExists($process->pid) && (++$cycles < 100)) {
usleep(10000);
}
self::unclaimProcess($process);
- self::IPCSetJobState(false, $process->pid);
+ Worker\IPC::SetJobState(false, $process->pid);
DI::process()->delete($process);
Logger::info('Worker ended', ['pid' => $process->pid]);
exit();
* @return void
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
*/
- public static function spawnWorker($do_cron = false)
+ public static function spawnWorker(bool $do_cron = false)
{
- if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) {
+ if (Worker\Daemon::isMode() && DI::config()->get('system', 'worker_fork')) {
self::forkProcess($do_cron);
} else {
DI::system()->run('bin/worker.php', ['no_cron' => !$do_cron]);
}
- if (self::isDaemonMode()) {
- self::IPCSetJobState(false);
+ if (Worker\Daemon::isMode()) {
+ Worker\IPC::SetJobState(false);
}
}
* @param (integer|array) priority or parameter array, strings are deprecated and are ignored
*
* next args are passed as $cmd command line
- * or: Worker::add(PRIORITY_HIGH, "Notifier", Delivery::DELETION, $drop_id);
- * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "Delivery", $post_id);
+ * or: Worker::add(Worker::PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id);
+ * or: Worker::add(array('priority' => Worker::PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id);
*
- * @return int "0" if worker queue entry already existed or there had been an error, otherwise the ID of the worker task
+ * @return int '0' if worker queue entry already existed or there had been an error, otherwise the ID of the worker task
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
- * @note $cmd and string args are surrounded with ""
+ * @note $cmd and string args are surrounded with ''
*
* @hooks 'proc_run'
* array $arr
$arr = ['args' => $args, 'run_cmd' => true];
- Hook::callAll("proc_run", $arr);
+ Hook::callAll('proc_run', $arr);
if (!$arr['run_cmd'] || !count($args)) {
return 1;
}
- $priority = PRIORITY_MEDIUM;
+ $priority = self::PRIORITY_MEDIUM;
// Don't fork from frontend tasks by default
- $dont_fork = DI::config()->get("system", "worker_dont_fork", false) || !DI::mode()->isBackend();
+ $dont_fork = DI::config()->get('system', 'worker_dont_fork', false) || !DI::mode()->isBackend();
$created = DateTimeFormat::utcNow();
$delayed = DBA::NULL_DATETIME;
$force_priority = false;
$command = array_shift($args);
$parameters = json_encode($args);
- $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
+ $queue = DBA::selectFirst('workerqueue', ['id', 'priority'], ['command' => $command, 'parameter' => $parameters, 'done' => false]);
$added = 0;
- if (!is_int($priority) || !in_array($priority, PRIORITIES)) {
+ if (!is_int($priority) || !in_array($priority, self::PRIORITIES)) {
Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]);
- $priority = PRIORITY_MEDIUM;
+ $priority = self::PRIORITY_MEDIUM;
}
// Quit if there was a database error - a precaution for the update process to 3.5.3
return 0;
}
- if (!$found) {
+ if (empty($queue)) {
if (!DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created,
'priority' => $priority, 'next_try' => $delayed])) {
return 0;
}
$added = DBA::lastInsertId();
} elseif ($force_priority) {
- DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
+ $ret = DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
+ if ($ret && ($priority != $queue['priority'])) {
+ $added = $queue['id'];
+ }
}
// Set the IPC flag to ensure an immediate process execution via daemon
- if (self::isDaemonMode()) {
- self::IPCSetJobState(true);
+ if (Worker\Daemon::isMode()) {
+ Worker\IPC::SetJobState(true);
}
- self::checkDaemonState();
+ Worker\Daemon::checkState();
// Should we quit and wait for the worker to be called as a cronjob?
- if ($dont_fork) {
+ if ($dont_fork || self::systemLimitReached()) {
return $added;
}
return $added;
}
- // Quit on daemon mode
- if (self::isDaemonMode()) {
+ // Quit on daemon mode, except the priority is critical (like for db updates)
+ if (Worker\Daemon::isMode() && $priority !== self::PRIORITY_CRITICAL) {
return $added;
}
return $added;
}
- public static function countWorkersByCommand(string $command)
+ public static function countWorkersByCommand(string $command): int
{
return DBA::count('workerqueue', ['done' => false, 'pid' => 0, 'command' => $command]);
}
* @param integer $max_level maximum retrial level
* @return integer the next retrial level value
*/
- private static function getNextRetrial($queue, $max_level)
+ private static function getNextRetrial(array $queue, int $max_level): int
{
$created = strtotime($queue['created']);
$retrial_time = time() - $created;
return false;
}
- $queue = self::checkPriority($queue);
-
$id = $queue['id'];
$priority = $queue['priority'];
$delay = (($new_retrial + 2) ** 4) + (rand(1, 30) * ($new_retrial));
$next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
- if (($priority < PRIORITY_MEDIUM) && ($new_retrial > 3)) {
- $priority = PRIORITY_MEDIUM;
- } elseif (($priority < PRIORITY_LOW) && ($new_retrial > 6)) {
- $priority = PRIORITY_LOW;
- } elseif (($priority < PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) {
- $priority = PRIORITY_NEGLIGIBLE;
+ if (($priority < self::PRIORITY_MEDIUM) && ($new_retrial > 3)) {
+ $priority = self::PRIORITY_MEDIUM;
+ } elseif (($priority < self::PRIORITY_LOW) && ($new_retrial > 6)) {
+ $priority = self::PRIORITY_LOW;
+ } elseif (($priority < self::PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) {
+ $priority = self::PRIORITY_NEGLIGIBLE;
}
Logger::info('Deferred task', ['id' => $id, 'retrial' => $new_retrial, 'created' => $queue['created'], 'next_execution' => $next, 'old_prio' => $queue['priority'], 'new_prio' => $priority]);
return true;
}
- /**
- * Set the flag if some job is waiting
- *
- * @param boolean $jobs Is there a waiting job?
- * @param int $key Key number
- * @throws \Exception
- */
- public static function IPCSetJobState(bool $jobs, int $key = 0)
- {
- $stamp = (float)microtime(true);
- DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]);
- self::$db_duration += (microtime(true) - $stamp);
- self::$db_duration_write += (microtime(true) - $stamp);
- }
-
- /**
- * Delete a key entry
- *
- * @param int $key Key number
- * @throws \Exception
- */
- public static function IPCDeleteJobState(int $key)
- {
- $stamp = (float)microtime(true);
- DBA::delete('worker-ipc', ['key' => $key]);
- self::$db_duration += (microtime(true) - $stamp);
- self::$db_duration_write += (microtime(true) - $stamp);
- }
-
- /**
- * Checks if some worker job waits to be executed
- *
- * @param int $key Key number
- * @return bool
- * @throws \Exception
- */
- public static function IPCJobsExists(int $key = 0)
- {
- $stamp = (float)microtime(true);
- $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]);
- self::$db_duration += (microtime(true) - $stamp);
-
- // When we don't have a row, no job is running
- if (!DBA::isResult($row)) {
- return false;
- }
-
- return (bool)$row['jobs'];
- }
-
- /**
- * Checks if the worker is running in the daemon mode.
- *
- * @return boolean
- */
- public static function isDaemonMode()
- {
- if (!is_null(self::$daemon_mode)) {
- return self::$daemon_mode;
- }
-
- if (DI::mode()->getExecutor() == Mode::DAEMON) {
- return true;
- }
-
- $daemon_mode = DI::config()->get('system', 'worker_daemon_mode', false, true);
- if ($daemon_mode) {
- return $daemon_mode;
- }
-
- if (!function_exists('pcntl_fork')) {
- self::$daemon_mode = false;
- return false;
- }
-
- $pidfile = DI::config()->get('system', 'pidfile');
- if (empty($pidfile)) {
- // No pid file, no daemon
- self::$daemon_mode = false;
- return false;
- }
-
- if (!is_readable($pidfile)) {
- // No pid file. We assume that the daemon had been intentionally stopped.
- self::$daemon_mode = false;
- return false;
- }
-
- $pid = intval(file_get_contents($pidfile));
- $running = posix_kill($pid, 0);
-
- self::$daemon_mode = $running;
- return $running;
- }
-
- /**
- * Test if the daemon is running. If not, it will be started
- *
- * @return void
- */
- private static function checkDaemonState()
- {
- if (!DI::config()->get('system', 'daemon_watchdog', false)) {
- return;
- }
-
- if (!DI::mode()->isNormal()) {
- return;
- }
-
- // Check every minute if the daemon is running
- if (DI::config()->get('system', 'last_daemon_check', 0) + 60 > time()) {
- return;
- }
-
- DI::config()->set('system', 'last_daemon_check', time());
-
- $pidfile = DI::config()->get('system', 'pidfile');
- if (empty($pidfile)) {
- // No pid file, no daemon
- return;
- }
-
- if (!is_readable($pidfile)) {
- // No pid file. We assume that the daemon had been intentionally stopped.
- return;
- }
-
- $pid = intval(file_get_contents($pidfile));
- if (posix_kill($pid, 0)) {
- Logger::info('Daemon process is running', ['pid' => $pid]);
- return;
- }
-
- Logger::warning('Daemon process is not running', ['pid' => $pid]);
-
- self::spawnDaemon();
- }
-
- /**
- * Spawn a new daemon process
- *
- * @return void
- */
- private static function spawnDaemon()
- {
- Logger::notice('Starting new daemon process');
- $command = 'bin/daemon.php';
- $a = DI::app();
- DI::system()->run($command, ['start']);
- Logger::notice('New daemon process started');
- }
-
/**
* Check if the system is inside the defined maintenance window
*
+ * @param bool $check_last_execution Whether check last execution
* @return boolean
*/
- public static function isInMaintenanceWindow(bool $check_last_execution = false)
+ public static function isInMaintenanceWindow(bool $check_last_execution = false): bool
{
- // Calculate the seconds of the start end end of the maintenance window
+ // Calculate the seconds of the start and end of the maintenance window
$start = strtotime(DI::config()->get('system', 'maintenance_start')) % 86400;
$end = strtotime(DI::config()->get('system', 'maintenance_end')) % 86400;
$duration = max($start, $end) - min($start, $end);
// Quit when the last cron execution had been after the previous window
- $last_cron = DI::config()->get('system', 'last_cron_daily');
+ $last_cron = DI::keyValue()->get('last_cron_daily');
if ($last_cron + $duration > time()) {
Logger::info('The Daily cron had been executed recently', ['last' => date(DateTimeFormat::MYSQL, $last_cron), 'start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]);
return false;