X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=src%2FCore%2FWorker.php;h=00885f69200a04f59c8a601020031ebc7a5d8a0c;hb=46cd39fb34613f4b331793f19c0e562f93125066;hp=7758e06a90b9e838a225bd1ceda277087fe1ed1a;hpb=d8d2cdc6ef64daf83a23ee1eabe95e27e6bb051b;p=friendica.git diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 7758e06a90..00885f6920 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1,6 +1,6 @@ isMaxLoadReached()) { - Logger::info('Pre check: maximum load reached, quitting.'); + if (DI::system()->isMaxLoadReached()) { + Logger::notice('Pre check: maximum load reached, quitting.'); return; } // We now start the process. This is done after the load check since this could increase the load. - self::startProcess(); + self::$process = $process; // Kill stale processes every 5 minutes - $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0); + $last_cleanup = DI::keyValue()->get('worker_last_cleaned') ?? 0; if (time() > ($last_cleanup + 300)) { - DI::config()->set('system', 'worker_last_cleaned', time()); - self::killStaleWorkers(); + DI::keyValue()->set( 'worker_last_cleaned', time()); + Worker\Cron::killStaleWorkers(); } // Check if the system is ready @@ -88,7 +102,7 @@ class Worker // Now we start additional cron processes if we should do so if ($run_cron) { - self::runCron(); + Worker\Cron::run(); } $last_check = $starttime = time(); @@ -96,15 +110,16 @@ class Worker // We fetch the next queue entry that is about to be executed while ($r = self::workerProcess()) { + if (Worker\IPC::JobsExists(getmypid())) { + Worker\IPC::DeleteJobState(getmypid()); + } + // Don't refetch when a worker fetches tasks for multiple workers $refetched = DI::config()->get('system', 'worker_multiple_fetch'); foreach ($r as $entry) { - // Assure that the priority is an integer value - $entry['priority'] = (int)$entry['priority']; - // The work will be done if (!self::execute($entry)) { - Logger::info('Process execution failed, quitting.'); + Logger::warning('Process execution failed, quitting.', ['entry' => $entry]); return; } @@ -132,8 +147,8 @@ class Worker } // Check free memory - if (DI::process()->isMinMemoryReached()) { - Logger::info('Memory limit reached, quitting.'); + if (DI::system()->isMinMemoryReached()) { + Logger::warning('Memory limit reached, quitting.'); DI::lock()->release(self::LOCK_WORKER); return; } @@ -143,17 +158,21 @@ class Worker } // Quit the worker once every cron interval - if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { + if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60)) && !self::systemLimitReached()) { Logger::info('Process lifetime reached, respawning.'); - self::unclaimProcess(); - self::spawnWorker(); + self::unclaimProcess($process); + if (Worker\Daemon::isMode()) { + Worker\IPC::SetJobState(true); + } else { + self::spawnWorker(); + } return; } } // Cleaning up. Possibly not needed, but it doesn't harm anything. - if (DI::config()->get('system', 'worker_daemon_mode', false)) { - self::IPCSetJobState(false); + if (Worker\Daemon::isMode()) { + Worker\IPC::SetJobState(false); } Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]); } @@ -165,7 +184,7 @@ class Worker * * @return boolean */ - public static function isReady() + public static function isReady(): bool { // Count active workers and compare them with a maximum value that depends on the load if (self::tooMuchWorkers()) { @@ -174,23 +193,23 @@ class Worker } // Do we have too few memory? - if (DI::process()->isMinMemoryReached()) { - Logger::info('Memory limit reached, quitting.'); + if (DI::system()->isMinMemoryReached()) { + Logger::warning('Memory limit reached, quitting.'); return false; } // Possibly there are too much database connections if (self::maxConnectionsReached()) { - Logger::info('Maximum connections reached, quitting.'); + Logger::warning('Maximum connections reached, quitting.'); return false; } // Possibly there are too much database processes that block the system - if (DI::process()->isMaxProcessesReached()) { - Logger::info('Maximum processes reached, quitting.'); + if (DI::system()->isMaxProcessesReached()) { + Logger::warning('Maximum processes reached, quitting.'); return false; } - + return true; } @@ -200,7 +219,7 @@ class Worker * @return boolean Returns "true" if tasks are existing * @throws \Exception */ - public static function entriesExists() + public static function entriesExists(): bool { $stamp = (float)microtime(true); $exists = DBA::exists('workerqueue', ["NOT `done` AND `pid` = 0 AND `next_try` < ?", DateTimeFormat::utcNow()]); @@ -214,7 +233,7 @@ class Worker * @return integer Number of deferred entries in the worker queue * @throws \Exception */ - private static function deferredEntries() + private static function deferredEntries(): int { $stamp = (float)microtime(true); $count = DBA::count('workerqueue', ["NOT `done` AND `pid` = 0 AND `retrial` > ?", 0]); @@ -229,7 +248,7 @@ class Worker * @return integer Number of non executed entries in the worker queue * @throws \Exception */ - private static function totalEntries() + private static function totalEntries(): int { $stamp = (float)microtime(true); $count = DBA::count('workerqueue', ['done' => false, 'pid' => 0]); @@ -244,14 +263,14 @@ class Worker * @return integer Number of active worker processes * @throws \Exception */ - private static function highestPriority() + private static function highestPriority(): int { $stamp = (float)microtime(true); $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()]; $workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]); self::$db_duration += (microtime(true) - $stamp); if (DBA::isResult($workerqueue)) { - return $workerqueue["priority"]; + return $workerqueue['priority']; } else { return 0; } @@ -265,12 +284,40 @@ class Worker * @return integer Is there a process running with that priority? * @throws \Exception */ - private static function processWithPriorityActive($priority) + private static function processWithPriorityActive(int $priority): int { $condition = ["`priority` <= ? AND `pid` != 0 AND NOT `done`", $priority]; return DBA::exists('workerqueue', $condition); } + /** + * Checks if the given file is valid to be included + * + * @param mixed $file + * @return bool + */ + private static function validateInclude(&$file): bool + { + $orig_file = $file; + + $file = realpath($file); + + if (strpos($file, getcwd()) !== 0) { + return false; + } + + $file = str_replace(getcwd() . '/', '', $file, $count); + if ($count != 1) { + return false; + } + + if ($orig_file !== $file) { + return false; + } + + return (strpos($file, 'addon/') === 0); + } + /** * Execute a worker entry * @@ -279,41 +326,49 @@ class Worker * @return boolean "true" if further processing should be stopped * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - public static function execute($queue) + public static function execute(array $queue): bool { $mypid = getmypid(); // Quit when in maintenance - if (DI::config()->get('system', 'maintenance', false, true)) { - Logger::info("Maintenance mode - quit process", ['pid' => $mypid]); + if (DI::config()->get('system', 'maintenance', false)) { + Logger::notice('Maintenance mode - quit process', ['pid' => $mypid]); return false; } // Constantly check the number of parallel database processes - if (DI::process()->isMaxProcessesReached()) { - Logger::info("Max processes reached for process", ['pid' => $mypid]); + if (DI::system()->isMaxProcessesReached()) { + Logger::warning('Max processes reached for process', ['pid' => $mypid]); return false; } // Constantly check the number of available database connections to let the frontend be accessible at any time if (self::maxConnectionsReached()) { - Logger::info("Max connection reached for process", ['pid' => $mypid]); + Logger::warning('Max connection reached for process', ['pid' => $mypid]); return false; } - $argv = json_decode($queue["parameter"], true); + $argv = json_decode($queue['parameter'], true); + if (!is_array($argv)) { + $argv = []; + } + + if (!empty($queue['command'])) { + array_unshift($argv, $queue['command']); + } + if (empty($argv)) { - Logger::error('Parameter is empty', ['queue' => $queue]); + Logger::warning('Parameter is empty', ['queue' => $queue]); return false; } - // Check for existance and validity of the include file + // Check for existence and validity of the include file $include = $argv[0]; if (method_exists(sprintf('Friendica\Worker\%s', $include), 'execute')) { // We constantly update the "executed" date every minute to avoid being killed too soon if (!isset(self::$last_update)) { - self::$last_update = strtotime($queue["executed"]); + self::$last_update = strtotime($queue['executed']); } $age = (time() - self::$last_update) / 60; @@ -333,7 +388,7 @@ class Worker $stamp = (float)microtime(true); $condition = ["`id` = ? AND `next_try` < ?", $queue['id'], DateTimeFormat::utcNow()]; if (DBA::update('workerqueue', ['done' => true], $condition)) { - DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow()); + DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow()); } self::$db_duration = (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); @@ -341,15 +396,10 @@ class Worker return true; } - // The script could be provided as full path or only with the function name - if ($include == basename($include)) { - $include = "include/".$include.".php"; - } - - if (!validate_include($include)) { - Logger::log("Include file ".$argv[0]." is not valid!"); + if (!self::validateInclude($include)) { + Logger::warning('Include file is not valid', ['file' => $argv[0]]); $stamp = (float)microtime(true); - DBA::delete('workerqueue', ['id' => $queue["id"]]); + DBA::delete('workerqueue', ['id' => $queue['id']]); self::$db_duration = (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); return true; @@ -357,12 +407,12 @@ class Worker require_once $include; - $funcname = str_replace(".php", "", basename($argv[0]))."_run"; + $funcname = str_replace('.php', '', basename($argv[0])) .'_run'; if (function_exists($funcname)) { // We constantly update the "executed" date every minute to avoid being killed too soon if (!isset(self::$last_update)) { - self::$last_update = strtotime($queue["executed"]); + self::$last_update = strtotime($queue['executed']); } $age = (time() - self::$last_update) / 60; @@ -378,15 +428,15 @@ class Worker self::execFunction($queue, $funcname, $argv, false); $stamp = (float)microtime(true); - if (DBA::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) { - DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow()); + if (DBA::update('workerqueue', ['done' => true], ['id' => $queue['id']])) { + DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow()); } self::$db_duration = (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); } else { - Logger::log("Function ".$funcname." does not exist"); + Logger::warning('Function does not exist', ['function' => $funcname]); $stamp = (float)microtime(true); - DBA::delete('workerqueue', ['id' => $queue["id"]]); + DBA::delete('workerqueue', ['id' => $queue['id']]); self::$db_duration = (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); } @@ -394,6 +444,95 @@ class Worker return true; } + /** + * Checks if system limits are reached. + * + * @return boolean + */ + private static function systemLimitReached(): bool + { + $load_cooldown = DI::config()->get('system', 'worker_load_cooldown'); + $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown'); + + if ($load_cooldown == 0) { + $load_cooldown = DI::config()->get('system', 'maxloadavg'); + } + + if (($load_cooldown == 0) && ($processes_cooldown == 0)) { + return false; + } + + $load = System::getLoadAvg($processes_cooldown != 0); + if (empty($load)) { + return false; + } + + if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) { + return true; + } + + if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) { + return true; + } + + return false; + } + + /** + * Slow the execution down if the system load is too high + * + * @return void + */ + public static function coolDown() + { + $cooldown = DI::config()->get('system', 'worker_cooldown', 0); + if ($cooldown > 0) { + Logger::debug('Wait for cooldown.', ['cooldown' => $cooldown]); + if ($cooldown < 1) { + usleep($cooldown * 1000000); + } else { + sleep($cooldown); + } + } + + $load_cooldown = DI::config()->get('system', 'worker_load_cooldown'); + $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown'); + + if ($load_cooldown == 0) { + $load_cooldown = DI::config()->get('system', 'maxloadavg'); + } + + if (($load_cooldown == 0) && ($processes_cooldown == 0)) { + return; + } + + $sleeping = false; + + while ($load = System::getLoadAvg($processes_cooldown != 0)) { + if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) { + if (!$sleeping) { + Logger::info('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]); + $sleeping = true; + } + sleep(1); + continue; + } + if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) { + if (!$sleeping) { + Logger::info('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]); + $sleeping = true; + } + sleep(1); + continue; + } + break; + } + + if ($sleeping) { + Logger::info('Cooldown ended.', ['max-load' => $load_cooldown, 'max-processes' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]); + } + } + /** * Execute a function from the queue * @@ -404,13 +543,15 @@ class Worker * @return void * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - private static function execFunction($queue, $funcname, $argv, $method_call) + private static function execFunction(array $queue, string $funcname, array $argv, bool $method_call) { $a = DI::app(); + self::coolDown(); + Logger::enableWorker($funcname); - Logger::info("Process start.", ['priority' => $queue["priority"], 'id' => $queue["id"]]); + Logger::info('Process start.', ['priority' => $queue['priority'], 'id' => $queue['id']]); $stamp = (float)microtime(true); @@ -418,7 +559,7 @@ class Worker // For this reason the variables have to be initialized. DI::profiler()->reset(); - $a->queue = $queue; + $a->setQueue($queue); $up_duration = microtime(true) - self::$up_start; @@ -427,21 +568,29 @@ class Worker // Set the workerLogger as new default logger if ($method_call) { - call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv); + try { + call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv); + } catch (\TypeError $e) { + // No need to defer a worker queue entry if the arguments are invalid + Logger::notice('Wrong worker arguments', ['class' => $funcname, 'argv' => $argv, 'queue' => $queue, 'message' => $e->getMessage()]); + } catch (\Throwable $e) { + Logger::error('Uncaught exception in worker execution', ['class' => get_class($e), 'message' => $e->getMessage(), 'code' => $e->getCode(), 'file' => $e->getFile() . ':' . $e->getLine(), 'trace' => $e->getTraceAsString(), 'previous' => $e->getPrevious()]); + Worker::defer(); + } } else { $funcname($argv, count($argv)); } Logger::disableWorker(); - unset($a->queue); + $a->setQueue([]); $duration = (microtime(true) - $stamp); /* With these values we can analyze how effective the worker is. * The database and rest time should be low since this is the unproductive time. * The execution time is the productive time. - * By changing parameters like the maximum number of workers we can check the effectivness. + * By changing parameters like the maximum number of workers we can check the effectiveness. */ $dbtotal = round(self::$db_duration, 2); $dbread = round(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 2); @@ -454,6 +603,8 @@ class Worker Logger::info('Performance:', ['state' => self::$state, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'lock' => $dblock, 'total' => $dbtotal, 'rest' => $rest, 'exec' => $exec]); + self::coolDown(); + self::$up_start = microtime(true); self::$db_duration = 0; self::$db_duration_count = 0; @@ -462,25 +613,18 @@ class Worker self::$lock_duration = 0; if ($duration > 3600) { - Logger::info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]); } elseif ($duration > 600) { - Logger::info('Longer than 10 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]); } elseif ($duration > 300) { - Logger::info('Longer than 5 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]); } elseif ($duration > 120) { - Logger::info('Longer than 2 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]); } - Logger::info('Process done.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration, 3)]); + Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration, 3)]); - DI::profiler()->saveLog(DI::logger(), "ID " . $queue["id"] . ": " . $funcname); - - $cooldown = DI::config()->get("system", "worker_cooldown", 0); - - if ($cooldown > 0) { - Logger::info('Cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]); - sleep($cooldown); - } + DI::profiler()->saveLog(DI::logger(), 'ID ' . $queue['id'] . ': ' . $funcname); } /** @@ -489,19 +633,19 @@ class Worker * @return bool Are more than 3/4 of the maximum connections used? * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - private static function maxConnectionsReached() + private static function maxConnectionsReached(): bool { // Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself. - $max = DI::config()->get("system", "max_connections"); + $max = DI::config()->get('system', 'max_connections'); // Fetch the percentage level where the worker will get active - $maxlevel = DI::config()->get("system", "max_connections_level", 75); + $maxlevel = DI::config()->get('system', 'max_connections_level', 75); if ($max == 0) { // the maximum number of possible user connections can be a system variable $r = DBA::fetchFirst("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'"); if (DBA::isResult($r)) { - $max = $r["Value"]; + $max = $r['Value']; } // Or it can be granted. This overrides the system variable $stamp = (float)microtime(true); @@ -518,21 +662,29 @@ class Worker DBA::close($r); } + $stamp = (float)microtime(true); + $used = 0; + $sleep = 0; + $data = DBA::p("SHOW PROCESSLIST"); + while ($row = DBA::fetch($data)) { + if ($row['Command'] != 'Sleep') { + ++$used; + } else { + ++$sleep; + } + } + DBA::close($data); + self::$db_duration += (microtime(true) - $stamp); + // If $max is set we will use the processlist to determine the current number of connections // The processlist only shows entries of the current user if ($max != 0) { - $stamp = (float)microtime(true); - $r = DBA::p('SHOW PROCESSLIST'); - self::$db_duration += (microtime(true) - $stamp); - $used = DBA::numRows($r); - DBA::close($r); - - Logger::info("Connection usage (user values)", ['usage' => $used, 'max' => $max]); + Logger::info('Connection usage (user values)', ['working' => $used, 'sleeping' => $sleep, 'max' => $max]); $level = ($used / $max) * 100; if ($level >= $maxlevel) { - Logger::log("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max); + Logger::warning('Maximum level (' . $maxlevel . '%) of user connections reached: ' . $used .'/' . $max); return true; } } @@ -543,7 +695,7 @@ class Worker if (!DBA::isResult($r)) { return false; } - $max = intval($r["Value"]); + $max = intval($r['Value']); if ($max == 0) { return false; } @@ -551,98 +703,21 @@ class Worker if (!DBA::isResult($r)) { return false; } - $used = intval($r["Value"]); + $used = max($used, intval($r['Value'])) - $sleep; if ($used == 0) { return false; } - Logger::info("Connection usage (system values)", ['used' => $used, 'max' => $max]); + Logger::info('Connection usage (system values)', ['working' => $used, 'sleeping' => $sleep, 'max' => $max]); $level = $used / $max * 100; if ($level < $maxlevel) { return false; } - Logger::log("Maximum level (".$level."%) of system connections reached: ".$used."/".$max); + Logger::warning('Maximum level (' . $level . '%) of system connections reached: ' . $used . '/' . $max); return true; } - /** - * fix the queue entry if the worker process died - * - * @return void - * @throws \Exception - */ - private static function killStaleWorkers() - { - $stamp = (float)microtime(true); - $entries = DBA::select( - 'workerqueue', - ['id', 'pid', 'executed', 'priority', 'parameter'], - ['NOT `done` AND `pid` != 0'], - ['order' => ['priority', 'created']] - ); - self::$db_duration += (microtime(true) - $stamp); - - while ($entry = DBA::fetch($entries)) { - if (!posix_kill($entry["pid"], 0)) { - $stamp = (float)microtime(true); - DBA::update( - 'workerqueue', - ['executed' => DBA::NULL_DATETIME, 'pid' => 0], - ['id' => $entry["id"]] - ); - self::$db_duration += (microtime(true) - $stamp); - self::$db_duration_write += (microtime(true) - $stamp); - } else { - // Kill long running processes - // Check if the priority is in a valid range - if (!in_array($entry["priority"], [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE])) { - $entry["priority"] = PRIORITY_MEDIUM; - } - - // Define the maximum durations - $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720]; - $max_duration = $max_duration_defaults[$entry["priority"]]; - - $argv = json_decode($entry["parameter"], true); - if (empty($argv)) { - return; - } - - $argv[0] = basename($argv[0]); - - // How long is the process already running? - $duration = (time() - strtotime($entry["executed"])) / 60; - if ($duration > $max_duration) { - Logger::log("Worker process ".$entry["pid"]." (".substr(json_encode($argv), 0, 50).") took more than ".$max_duration." minutes. It will be killed now."); - posix_kill($entry["pid"], SIGTERM); - - // We killed the stale process. - // To avoid a blocking situation we reschedule the process at the beginning of the queue. - // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL) - $new_priority = $entry["priority"]; - if ($entry["priority"] == PRIORITY_HIGH) { - $new_priority = PRIORITY_MEDIUM; - } elseif ($entry["priority"] == PRIORITY_MEDIUM) { - $new_priority = PRIORITY_LOW; - } elseif ($entry["priority"] != PRIORITY_CRITICAL) { - $new_priority = PRIORITY_NEGLIGIBLE; - } - $stamp = (float)microtime(true); - DBA::update( - 'workerqueue', - ['executed' => DBA::NULL_DATETIME, 'created' => DateTimeFormat::utcNow(), 'priority' => $new_priority, 'pid' => 0], - ['id' => $entry["id"]] - ); - self::$db_duration += (microtime(true) - $stamp); - self::$db_duration_write += (microtime(true) - $stamp); - } else { - Logger::info('Process runtime is okay', ['pid' => $entry["pid"], 'duration' => $duration, 'max' => $max_duration, 'command' => substr(json_encode($argv), 0, 50)]); - } - } - } - DBA::close($entries); - } /** * Checks if the number of active workers exceeds the given limits @@ -650,9 +725,9 @@ class Worker * @return bool Are there too much workers running? * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - private static function tooMuchWorkers() + private static function tooMuchWorkers(): bool { - $queues = DI::config()->get("system", "worker_queues", 10); + $queues = DI::config()->get('system', 'worker_queues', 10); $maxqueues = $queues; @@ -661,7 +736,7 @@ class Worker // Decrease the number of workers at higher load $load = System::currentLoad(); if ($load) { - $maxsysload = intval(DI::config()->get("system", "maxloadavg", 20)); + $maxsysload = intval(DI::config()->get('system', 'maxloadavg', 20)); /* Default exponent 3 causes queues to rapidly decrease as load increases. * If you have 20 max queues at idle, then you get only 5 queues at 37.1% of $maxsysload. @@ -685,13 +760,10 @@ class Worker } $stamp = (float)microtime(true); - $jobs = DBA::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE", $interval); + $jobs = DBA::count('workerqueue', ["`done` AND `executed` > ?", DateTimeFormat::utc('now - ' . $interval . ' minute')]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_stat += (microtime(true) - $stamp); - if ($job = DBA::fetch($jobs)) { - $jobs_per_minute[$interval] = number_format($job['jobs'] / $interval, 0); - } - DBA::close($jobs); + $jobs_per_minute[$interval] = number_format($jobs / $interval, 0); } $processlist = ' - jpm: '.implode('/', $jobs_per_minute); } @@ -712,15 +784,12 @@ class Worker self::$db_duration_stat += (microtime(true) - $stamp); while ($entry = DBA::fetch($jobs)) { $stamp = (float)microtime(true); - $processes = DBA::p("SELECT COUNT(*) AS `running` FROM `workerqueue-view` WHERE `priority` = ?", $entry["priority"]); + $running = DBA::count('workerqueue-view', ['priority' => $entry['priority']]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_stat += (microtime(true) - $stamp); - if ($process = DBA::fetch($processes)) { - $idle_workers -= $process["running"]; - $waiting_processes += $entry["entries"]; - $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"]; - } - DBA::close($processes); + $idle_workers -= $running; + $waiting_processes += $entry['entries']; + $listitem[$entry['priority']] = $entry['priority'] . ':' . $running . '/' . $entry['entries']; } DBA::close($jobs); } else { @@ -731,35 +800,35 @@ class Worker self::$db_duration_stat += (microtime(true) - $stamp); while ($entry = DBA::fetch($jobs)) { - $idle_workers -= $entry["running"]; - $listitem[$entry["priority"]] = $entry["priority"].":".$entry["running"]; + $idle_workers -= $entry['running']; + $listitem[$entry['priority']] = $entry['priority'] . ':' . $entry['running']; } DBA::close($jobs); } $waiting_processes -= $deferred; - $listitem[0] = "0:" . max(0, $idle_workers); + $listitem[0] = '0:' . max(0, $idle_workers); $processlist .= ' ('.implode(', ', $listitem).')'; - if (DI::config()->get("system", "worker_fastlane", false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) { + if (DI::config()->get('system', 'worker_fastlane', false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) { $top_priority = self::highestPriority(); $high_running = self::processWithPriorityActive($top_priority); - if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) { - Logger::info("Jobs with a higher priority are waiting but none is executed. Open a fastlane.", ['priority' => $top_priority]); + if (!$high_running && ($top_priority > self::PRIORITY_UNDEFINED) && ($top_priority < self::PRIORITY_NEGLIGIBLE)) { + Logger::info('Jobs with a higher priority are waiting but none is executed. Open a fastlane.', ['priority' => $top_priority]); $queues = $active + 1; } } - Logger::notice("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $waiting_processes . $processlist . " - maximum: " . $queues . "/" . $maxqueues); + Logger::info('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues); // Are there fewer workers running as possible? Then fork a new one. - if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) { - Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]); - if (DI::config()->get('system', 'worker_daemon_mode', false)) { - self::IPCSetJobState(true); + if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists() && !self::systemLimitReached()) { + Logger::info('There are fewer workers as possible, fork a new worker.', ['active' => $active, 'queues' => $queues]); + if (Worker\Daemon::isMode()) { + Worker\IPC::SetJobState(true); } else { self::spawnWorker(); } @@ -767,8 +836,8 @@ class Worker } // if there are too much worker, we don't spawn a new one. - if (DI::config()->get('system', 'worker_daemon_mode', false) && ($active > $queues)) { - self::IPCSetJobState(false); + if (Worker\Daemon::isMode() && ($active > $queues)) { + Worker\IPC::SetJobState(false); } return $active > $queues; @@ -780,10 +849,10 @@ class Worker * @return integer Number of active worker processes * @throws \Exception */ - private static function activeWorkers() + private static function activeWorkers(): int { $stamp = (float)microtime(true); - $count = DBA::count('process', ['command' => 'Worker.php']); + $count = DI::process()->countCommand('Worker.php'); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_count += (microtime(true) - $stamp); return $count; @@ -795,13 +864,13 @@ class Worker * @return array List of worker process ids * @throws \Exception */ - private static function getWorkerPIDList() + private static function getWorkerPIDList(): array { $ids = []; $stamp = (float)microtime(true); $queues = DBA::p("SELECT `process`.`pid`, COUNT(`workerqueue`.`pid`) AS `entries` FROM `process` - LEFT JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `workerqueue`.`done` + LEFT JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `workerqueue`.`done` GROUP BY `process`.`pid`"); while ($queue = DBA::fetch($queues)) { $ids[$queue['pid']] = $queue['entries']; @@ -816,7 +885,7 @@ class Worker /** * Returns waiting jobs for the current process id * - * @return array waiting workerqueue jobs + * @return array|bool waiting workerqueue jobs or FALSE on failure * @throws \Exception */ private static function getWaitingJobForPID() @@ -838,7 +907,7 @@ class Worker * @return array array with next jobs * @throws \Exception */ - private static function nextProcess(int $limit) + private static function nextProcess(int $limit): array { $priority = self::nextPriority(); if (empty($priority)) { @@ -849,12 +918,17 @@ class Worker $ids = []; $stamp = (float)microtime(true); $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()]; - $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['created']]); + $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['retrial', 'created']]); self::$db_duration += (microtime(true) - $stamp); while ($task = DBA::fetch($tasks)) { $ids[] = $task['id']; // Only continue that loop while we are storing commands that can be processed quickly - $command = json_decode($task['parameter'])[0]; + if (!empty($task['command'])) { + $command = $task['command']; + } else { + $command = json_decode($task['parameter'])[0]; + } + if (!in_array($command, self::FAST_COMMANDS)) { break; } @@ -868,13 +942,13 @@ class Worker /** * Returns the priority of the next workerqueue job * - * @return string priority + * @return string|bool priority or FALSE on failure * @throws \Exception */ private static function nextPriority() { $waiting = []; - $priorities = [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE]; + $priorities = [self::PRIORITY_CRITICAL, self::PRIORITY_HIGH, self::PRIORITY_MEDIUM, self::PRIORITY_LOW, self::PRIORITY_NEGLIGIBLE]; foreach ($priorities as $priority) { $stamp = (float)microtime(true); if (DBA::exists('workerqueue', ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()])) { @@ -883,8 +957,8 @@ class Worker self::$db_duration += (microtime(true) - $stamp); } - if (!empty($waiting[PRIORITY_CRITICAL])) { - return PRIORITY_CRITICAL; + if (!empty($waiting[self::PRIORITY_CRITICAL])) { + return self::PRIORITY_CRITICAL; } $running = []; @@ -939,7 +1013,7 @@ class Worker /** * Find and claim the next worker process for us * - * @return boolean Have we found something? + * @return void * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ private static function findWorkerProcesses() @@ -969,13 +1043,17 @@ class Worker if ($limit > 0) { $stamp = (float)microtime(true); $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()]; - $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]); + $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'retrial', 'created']]); self::$db_duration += (microtime(true) - $stamp); while ($task = DBA::fetch($tasks)) { $ids[] = $task['id']; // Only continue that loop while we are storing commands that can be processed quickly - $command = json_decode($task['parameter'])[0]; + if (!empty($task['command'])) { + $command = $task['command']; + } else { + $command = json_decode($task['parameter'])[0]; + } if (!in_array($command, self::FAST_COMMANDS)) { break; } @@ -1013,7 +1091,7 @@ class Worker * @return array worker processes * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - public static function workerProcess() + public static function workerProcess(): array { // There can already be jobs for us in the queue. $waiting = self::getWaitingJobForPID(); @@ -1023,7 +1101,7 @@ class Worker $stamp = (float)microtime(true); if (!DI::lock()->acquire(self::LOCK_PROCESS)) { - return false; + return []; } self::$lock_duration += (microtime(true) - $stamp); @@ -1031,130 +1109,85 @@ class Worker DI::lock()->release(self::LOCK_PROCESS); - return self::getWaitingJobForPID(); + // Prevents "Return value of Friendica\Core\Worker::workerProcess() must be of the type array, bool returned" + $process = self::getWaitingJobForPID(); + return (is_array($process) ? $process : []); } /** * Removes a workerqueue entry from the current process * + * @param Process $process the process behind the workerqueue + * * @return void * @throws \Exception */ - public static function unclaimProcess() + public static function unclaimProcess(Process $process) { - $mypid = getmypid(); - $stamp = (float)microtime(true); - DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $mypid, 'done' => false]); + DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $process->pid, 'done' => false]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); } /** - * Call the front end worker + * Fork a child process * + * @param boolean $do_cron * @return void - * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - public static function callWorker() + private static function forkProcess(bool $do_cron) { - if (!DI::config()->get("system", "frontend_worker")) { + if (DI::system()->isMinMemoryReached()) { + Logger::warning('Memory limit reached - quitting'); return; } - $url = DI::baseUrl() . '/worker'; - DI::httpRequest()->fetch($url, false, 1); - } - - /** - * Call the front end worker if there aren't any active - * - * @return void - * @throws \Friendica\Network\HTTPException\InternalServerErrorException - */ - public static function executeIfIdle() - { - if (!DI::config()->get("system", "frontend_worker")) { + // Children inherit their parent's database connection. + // To avoid problems we disconnect and connect both parent and child + DBA::disconnect(); + $pid = pcntl_fork(); + if ($pid == -1) { + DBA::connect(); + Logger::warning('Could not spawn worker'); return; - } - - // Do we have "proc_open"? Then we can fork the worker - if (function_exists("proc_open")) { - // When was the last time that we called the worker? - // Less than one minute? Then we quit - if ((time() - DI::config()->get("system", "worker_started")) < 60) { - return; - } + } elseif ($pid) { + // The parent process continues here + DBA::connect(); - DI::config()->set("system", "worker_started", time()); + Worker\IPC::SetJobState(true, $pid); + Logger::info('Spawned new worker', ['pid' => $pid]); - // Do we have enough running workers? Then we quit here. - if (self::tooMuchWorkers()) { - // Cleaning dead processes - self::killStaleWorkers(); - Process::deleteInactive(); - - return; + $cycles = 0; + while (Worker\IPC::JobsExists($pid) && (++$cycles < 100)) { + usleep(10000); } - self::runCron(); - - Logger::info('Call worker'); - self::spawnWorker(); + Logger::info('Spawned worker is ready', ['pid' => $pid, 'wait_cycles' => $cycles]); return; } - // We cannot execute background processes. - // We now run the processes from the frontend. - // This won't work with long running processes. - self::runCron(); - - self::clearProcesses(); + // We now are in the new worker + DBA::connect(); - $workers = self::activeWorkers(); + DI::flushLogger(); + $process = DI::process()->create(getmypid(), basename(__FILE__)); - if ($workers == 0) { - self::callWorker(); + $cycles = 0; + while (!Worker\IPC::JobsExists($process->pid) && (++$cycles < 100)) { + usleep(10000); } - } - - /** - * Removes long running worker processes - * - * @return void - * @throws \Friendica\Network\HTTPException\InternalServerErrorException - */ - public static function clearProcesses() - { - $timeout = DI::config()->get("system", "frontend_worker_timeout", 10); - /// @todo We should clean up the corresponding workerqueue entries as well - $stamp = (float)microtime(true); - $condition = ["`created` < ? AND `command` = 'worker.php'", - DateTimeFormat::utc("now - ".$timeout." minutes")]; - DBA::delete('process', $condition); - self::$db_duration = (microtime(true) - $stamp); - self::$db_duration_write += (microtime(true) - $stamp); - } + Logger::info('Worker spawned', ['pid' => $process->pid, 'wait_cycles' => $cycles]); - /** - * Runs the cron processes - * - * @return void - * @throws \Friendica\Network\HTTPException\InternalServerErrorException - */ - private static function runCron() - { - Logger::info('Add cron entries'); + self::processQueue($do_cron, $process); - // Check for spooled items - self::add(['priority' => PRIORITY_HIGH, 'force_priority' => true], 'SpoolPost'); + self::unclaimProcess($process); - // Run the cron job that calls all other jobs - self::add(['priority' => PRIORITY_MEDIUM, 'force_priority' => true], 'Cron'); - - // Cleaning dead processes - self::killStaleWorkers(); + Worker\IPC::SetJobState(false, $process->pid); + DI::process()->delete($process); + Logger::info('Worker ended', ['pid' => $process->pid]); + exit(); } /** @@ -1164,19 +1197,15 @@ class Worker * @return void * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - public static function spawnWorker($do_cron = false) + public static function spawnWorker(bool $do_cron = false) { - $command = 'bin/worker.php'; - - $args = ['no_cron' => !$do_cron]; - - $a = DI::app(); - $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), $a->getBasePath()); - $process->run($command, $args); - - // after spawning we have to remove the flag. - if (DI::config()->get('system', 'worker_daemon_mode', false)) { - self::IPCSetJobState(false); + if (Worker\Daemon::isMode() && DI::config()->get('system', 'worker_fork')) { + self::forkProcess($do_cron); + } else { + DI::system()->run('bin/worker.php', ['no_cron' => !$do_cron]); + } + if (Worker\Daemon::isMode()) { + Worker\IPC::SetJobState(false); } } @@ -1186,36 +1215,35 @@ class Worker * @param (integer|array) priority or parameter array, strings are deprecated and are ignored * * next args are passed as $cmd command line - * or: Worker::add(PRIORITY_HIGH, "Notifier", Delivery::DELETION, $drop_id); - * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "CreateShadowEntry", $post_id); + * or: Worker::add(Worker::PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id); + * or: Worker::add(array('priority' => Worker::PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id); * - * @return boolean "false" if worker queue entry already existed or there had been an error + * @return int '0' if worker queue entry already existed or there had been an error, otherwise the ID of the worker task * @throws \Friendica\Network\HTTPException\InternalServerErrorException - * @note $cmd and string args are surrounded with "" + * @note $cmd and string args are surrounded with '' * * @hooks 'proc_run' * array $arr * */ - public static function add($cmd) + public static function add(...$args) { - $args = func_get_args(); - if (!count($args)) { - return false; + return 0; } $arr = ['args' => $args, 'run_cmd' => true]; - Hook::callAll("proc_run", $arr); + Hook::callAll('proc_run', $arr); if (!$arr['run_cmd'] || !count($args)) { - return true; + return 1; } - $priority = PRIORITY_MEDIUM; + $priority = self::PRIORITY_MEDIUM; // Don't fork from frontend tasks by default - $dont_fork = DI::config()->get("system", "worker_dont_fork", false) || !DI::mode()->isBackend(); + $dont_fork = DI::config()->get('system', 'worker_dont_fork', false) || !DI::mode()->isBackend(); $created = DateTimeFormat::utcNow(); + $delayed = DBA::NULL_DATETIME; $force_priority = false; $run_parameter = array_shift($args); @@ -1223,6 +1251,9 @@ class Worker if (is_int($run_parameter)) { $priority = $run_parameter; } elseif (is_array($run_parameter)) { + if (isset($run_parameter['delayed'])) { + $delayed = $run_parameter['delayed']; + } if (isset($run_parameter['priority'])) { $priority = $run_parameter['priority']; } @@ -1235,28 +1266,47 @@ class Worker if (isset($run_parameter['force_priority'])) { $force_priority = $run_parameter['force_priority']; } + } else { + throw new \InvalidArgumentException('Priority number or task parameter array expected as first argument'); } + $command = array_shift($args); $parameters = json_encode($args); - $found = DBA::exists('workerqueue', ['parameter' => $parameters, 'done' => false]); - $added = false; + $queue = DBA::selectFirst('workerqueue', ['id', 'priority'], ['command' => $command, 'parameter' => $parameters, 'done' => false]); + $added = 0; + + if (!is_int($priority) || !in_array($priority, self::PRIORITIES)) { + Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]); + $priority = self::PRIORITY_MEDIUM; + } // Quit if there was a database error - a precaution for the update process to 3.5.3 if (DBA::errorNo() != 0) { - return false; + return 0; } - if (!$found) { - $added = DBA::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, 'priority' => $priority]); - if (!$added) { - return false; + if (empty($queue)) { + if (!DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created, + 'priority' => $priority, 'next_try' => $delayed])) { + return 0; } + $added = DBA::lastInsertId(); } elseif ($force_priority) { - DBA::update('workerqueue', ['priority' => $priority], ['parameter' => $parameters, 'done' => false, 'pid' => 0]); + $ret = DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]); + if ($ret && ($priority != $queue['priority'])) { + $added = $queue['id']; + } + } + + // Set the IPC flag to ensure an immediate process execution via daemon + if (Worker\Daemon::isMode()) { + Worker\IPC::SetJobState(true); } + Worker\Daemon::checkState(); + // Should we quit and wait for the worker to be called as a cronjob? - if ($dont_fork) { + if ($dont_fork || self::systemLimitReached()) { return $added; } @@ -1273,9 +1323,8 @@ class Worker return $added; } - // We tell the daemon that a new job entry exists - if (DI::config()->get('system', 'worker_daemon_mode', false)) { - // We don't have to set the IPC flag - this is done in "tooMuchWorkers" + // Quit on daemon mode, except the priority is critical (like for db updates) + if (Worker\Daemon::isMode() && $priority !== self::PRIORITY_CRITICAL) { return $added; } @@ -1285,6 +1334,11 @@ class Worker return $added; } + public static function countWorkersByCommand(string $command): int + { + return DBA::count('workerqueue', ['done' => false, 'pid' => 0, 'command' => $command]); + } + /** * Returns the next retrial level for worker jobs. * This function will skip levels when jobs are older. @@ -1293,7 +1347,7 @@ class Worker * @param integer $max_level maximum retrial level * @return integer the next retrial level value */ - private static function getNextRetrial($queue, $max_level) + private static function getNextRetrial(array $queue, int $max_level): int { $created = strtotime($queue['created']); $retrial_time = time() - $created; @@ -1307,7 +1361,7 @@ class Worker $new_retrial = $retrial; } } - Logger::info('New retrial for task', ['id' => $queue['id'], 'created' => $queue['created'], 'old' => $queue['retrial'], 'new' => $new_retrial]); + Logger::notice('New retrial for task', ['id' => $queue['id'], 'created' => $queue['created'], 'old' => $queue['retrial'], 'new' => $new_retrial]); return $new_retrial; } @@ -1315,16 +1369,16 @@ class Worker * Defers the current worker entry * * @return boolean had the entry been deferred? + * @throws \Exception */ - public static function defer() + public static function defer(): bool { - if (empty(DI::app()->queue)) { + $queue = DI::app()->getQueue(); + + if (empty($queue)) { return false; } - $queue = DI::app()->queue; - - $retrial = $queue['retrial']; $id = $queue['id']; $priority = $queue['priority']; @@ -1333,7 +1387,7 @@ class Worker $new_retrial = self::getNextRetrial($queue, $max_level); if ($new_retrial > $max_level) { - Logger::info('The task exceeded the maximum retry count', ['id' => $id, 'created' => $queue['created'], 'old_prio' => $queue['priority'], 'old_retrial' => $queue['retrial'], 'max_level' => $max_level, 'retrial' => $new_retrial]); + Logger::notice('The task exceeded the maximum retry count', ['id' => $id, 'created' => $queue['created'], 'old_prio' => $queue['priority'], 'old_retrial' => $queue['retrial'], 'max_level' => $max_level, 'retrial' => $new_retrial]); return false; } @@ -1341,12 +1395,12 @@ class Worker $delay = (($new_retrial + 2) ** 4) + (rand(1, 30) * ($new_retrial)); $next = DateTimeFormat::utc('now + ' . $delay . ' seconds'); - if (($priority < PRIORITY_MEDIUM) && ($new_retrial > 3)) { - $priority = PRIORITY_MEDIUM; - } elseif (($priority < PRIORITY_LOW) && ($new_retrial > 6)) { - $priority = PRIORITY_LOW; - } elseif (($priority < PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) { - $priority = PRIORITY_NEGLIGIBLE; + if (($priority < self::PRIORITY_MEDIUM) && ($new_retrial > 3)) { + $priority = self::PRIORITY_MEDIUM; + } elseif (($priority < self::PRIORITY_LOW) && ($new_retrial > 6)) { + $priority = self::PRIORITY_LOW; + } elseif (($priority < self::PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) { + $priority = self::PRIORITY_NEGLIGIBLE; } Logger::info('Deferred task', ['id' => $id, 'retrial' => $new_retrial, 'created' => $queue['created'], 'next_execution' => $next, 'old_prio' => $queue['priority'], 'new_prio' => $priority]); @@ -1361,61 +1415,47 @@ class Worker } /** - * Log active processes into the "process" table + * Check if the system is inside the defined maintenance window + * + * @param bool $check_last_execution Whether check last execution + * @return boolean */ - public static function startProcess() + public static function isInMaintenanceWindow(bool $check_last_execution = false): bool { - $trace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1); + // Calculate the seconds of the start and end of the maintenance window + $start = strtotime(DI::config()->get('system', 'maintenance_start')) % 86400; + $end = strtotime(DI::config()->get('system', 'maintenance_end')) % 86400; - $command = basename($trace[0]['file']); + Logger::info('Maintenance window', ['start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]); - Process::deleteInactive(); + if ($check_last_execution) { + // Calculate the window duration + $duration = max($start, $end) - min($start, $end); - Process::insert($command); - } - - /** - * Remove the active process from the "process" table - * - * @return bool - * @throws \Exception - */ - public static function endProcess() - { - return Process::deleteByPid(); - } + // Quit when the last cron execution had been after the previous window + $last_cron = DI::keyValue()->get('last_cron_daily'); + if ($last_cron + $duration > time()) { + Logger::info('The Daily cron had been executed recently', ['last' => date(DateTimeFormat::MYSQL, $last_cron), 'start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]); + return false; + } + } - /** - * Set the flag if some job is waiting - * - * @param boolean $jobs Is there a waiting job? - * @throws \Exception - */ - public static function IPCSetJobState($jobs) - { - $stamp = (float)microtime(true); - DBA::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true); - self::$db_duration += (microtime(true) - $stamp); - self::$db_duration_write += (microtime(true) - $stamp); - } + $current = time() % 86400; - /** - * Checks if some worker job waits to be executed - * - * @return bool - * @throws \Exception - */ - public static function IPCJobsExists() - { - $stamp = (float)microtime(true); - $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]); - self::$db_duration += (microtime(true) - $stamp); + if ($start < $end) { + // Execute if we are inside the window + $execute = ($current >= $start) && ($current <= $end); + } else { + // Don't execute if we are outside the window + $execute = !(($current > $end) && ($current < $start)); + } - // When we don't have a row, no job is running - if (!DBA::isResult($row)) { - return false; + if ($execute) { + Logger::info('We are inside the maintenance window', ['current' => date('H:i:s', $current), 'start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]); + } else { + Logger::info('We are outside the maintenance window', ['current' => date('H:i:s', $current), 'start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]); } - return (bool)$row['jobs']; + return $execute; } }