*/
class Worker
{
+ /**
+ * @name Priority
+ *
+ * Process priority for the worker
+ * @{
+ */
+ const PRIORITY_UNDEFINED = 0;
+ const PRIORITY_CRITICAL = 10;
+ const PRIORITY_HIGH = 20;
+ const PRIORITY_MEDIUM = 30;
+ const PRIORITY_LOW = 40;
+ const PRIORITY_NEGLIGIBLE = 50;
+ const PRIORITIES = [self::PRIORITY_CRITICAL, self::PRIORITY_HIGH, self::PRIORITY_MEDIUM, self::PRIORITY_LOW, self::PRIORITY_NEGLIGIBLE];
+ /* @}*/
+
const STATE_STARTUP = 1; // Worker is in startup. This takes most time.
const STATE_LONG_LOOP = 2; // Worker is processing the whole - long - loop.
const STATE_REFETCH = 3; // Worker had refetched jobs in the execution loop.
}
// Quit the worker once every cron interval
- if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
+ if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60)) && !self::systemLimitReached()) {
Logger::info('Process lifetime reached, respawning.');
self::unclaimProcess($process);
if (Worker\Daemon::isMode()) {
return false;
}
- $valid = false;
- if (strpos($file, 'include/') === 0) {
- $valid = true;
- }
-
- if (strpos($file, 'addon/') === 0) {
- $valid = true;
- }
-
- // Simply return flag
- return $valid;
+ return (strpos($file, 'addon/') === 0);
}
/**
return true;
}
- // The script could be provided as full path or only with the function name
- if ($include == basename($include)) {
- $include = 'include/' . $include . '.php';
- }
-
if (!self::validateInclude($include)) {
Logger::warning('Include file is not valid', ['file' => $argv[0]]);
$stamp = (float)microtime(true);
}
/**
- * Execute a function from the queue
+ * Checks if system limits are reached.
*
- * @param array $queue Workerqueue entry
- * @param string $funcname name of the function
- * @param array $argv Array of values to be passed to the function
- * @param boolean $method_call boolean
- * @return void
- * @throws \Friendica\Network\HTTPException\InternalServerErrorException
+ * @return boolean
*/
- private static function execFunction(array $queue, string $funcname, array $argv, bool $method_call)
+ private static function systemLimitReached(): bool
{
- $a = DI::app();
+ $load_cooldown = DI::config()->get('system', 'worker_load_cooldown');
+ $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');
+
+ if ($load_cooldown == 0) {
+ $load_cooldown = DI::config()->get('system', 'maxloadavg');
+ }
+
+ if (($load_cooldown == 0) && ($processes_cooldown == 0)) {
+ return false;
+ }
+
+ $load = System::getLoadAvg();
+ if (empty($load)) {
+ return false;
+ }
+
+ if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) {
+ return true;
+ }
+
+ if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) {
+ return true;
+ }
+
+ return false;
+ }
+ /**
+ * Slow the execution down if the system load is too high
+ *
+ * @return void
+ */
+ public static function coolDown()
+ {
$cooldown = DI::config()->get('system', 'worker_cooldown', 0);
if ($cooldown > 0) {
- Logger::debug('Pre execution cooldown.', ['cooldown' => $cooldown, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
- sleep($cooldown);
+ Logger::debug('Wait for cooldown.', ['cooldown' => $cooldown]);
+ if ($cooldown < 1) {
+ usleep($cooldown * 1000000);
+ } else {
+ sleep($cooldown);
+ }
}
$load_cooldown = DI::config()->get('system', 'worker_load_cooldown');
$processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');
- while ((($load_cooldown > 0) || ($processes_cooldown > 0)) && ($load = System::getLoadAvg())) {
+ if ($load_cooldown == 0) {
+ $load_cooldown = DI::config()->get('system', 'maxloadavg');
+ }
+
+ if (($load_cooldown == 0) && ($processes_cooldown == 0)) {
+ return;
+ }
+
+ $sleeping = false;
+
+ while ($load = System::getLoadAvg()) {
if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) {
- Logger::debug('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
+ if (!$sleeping) {
+ Logger::notice('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+ $sleeping = true;
+ }
sleep(1);
continue;
}
if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) {
- Logger::debug('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
+ if (!$sleeping) {
+ Logger::notice('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+ $sleeping = true;
+ }
sleep(1);
continue;
}
break;
}
+ if ($sleeping) {
+ Logger::notice('Cooldown ended.', ['max-load' => $load_cooldown, 'max-processes' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]);
+ }
+ }
+
+ /**
+ * Execute a function from the queue
+ *
+ * @param array $queue Workerqueue entry
+ * @param string $funcname name of the function
+ * @param array $argv Array of values to be passed to the function
+ * @param boolean $method_call boolean
+ * @return void
+ * @throws \Friendica\Network\HTTPException\InternalServerErrorException
+ */
+ private static function execFunction(array $queue, string $funcname, array $argv, bool $method_call)
+ {
+ $a = DI::app();
+
+ self::coolDown();
+
Logger::enableWorker($funcname);
Logger::info('Process start.', ['priority' => $queue['priority'], 'id' => $queue['id']]);
Logger::info('Performance:', ['state' => self::$state, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'lock' => $dblock, 'total' => $dbtotal, 'rest' => $rest, 'exec' => $exec]);
+ self::coolDown();
+
self::$up_start = microtime(true);
self::$db_duration = 0;
self::$db_duration_count = 0;
Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration, 3)]);
DI::profiler()->saveLog(DI::logger(), 'ID ' . $queue['id'] . ': ' . $funcname);
-
- if ($cooldown > 0) {
- Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'cooldown' => $cooldown]);
- sleep($cooldown);
- }
}
/**
$top_priority = self::highestPriority();
$high_running = self::processWithPriorityActive($top_priority);
- if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
+ if (!$high_running && ($top_priority > self::PRIORITY_UNDEFINED) && ($top_priority < self::PRIORITY_NEGLIGIBLE)) {
Logger::info('Jobs with a higher priority are waiting but none is executed. Open a fastlane.', ['priority' => $top_priority]);
$queues = $active + 1;
}
Logger::notice('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues);
// Are there fewer workers running as possible? Then fork a new one.
- if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists()) {
+ if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists() && !self::systemLimitReached()) {
Logger::info('There are fewer workers as possible, fork a new worker.', ['active' => $active, 'queues' => $queues]);
if (Worker\Daemon::isMode()) {
Worker\IPC::SetJobState(true);
private static function nextPriority()
{
$waiting = [];
- $priorities = [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE];
+ $priorities = [self::PRIORITY_CRITICAL, self::PRIORITY_HIGH, self::PRIORITY_MEDIUM, self::PRIORITY_LOW, self::PRIORITY_NEGLIGIBLE];
foreach ($priorities as $priority) {
$stamp = (float)microtime(true);
if (DBA::exists('workerqueue', ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()])) {
self::$db_duration += (microtime(true) - $stamp);
}
- if (!empty($waiting[PRIORITY_CRITICAL])) {
- return PRIORITY_CRITICAL;
+ if (!empty($waiting[self::PRIORITY_CRITICAL])) {
+ return self::PRIORITY_CRITICAL;
}
$running = [];
* @param (integer|array) priority or parameter array, strings are deprecated and are ignored
*
* next args are passed as $cmd command line
- * or: Worker::add(PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id);
- * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id);
+ * or: Worker::add(Worker::PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id);
+ * or: Worker::add(array('priority' => Worker::PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id);
*
* @return int '0' if worker queue entry already existed or there had been an error, otherwise the ID of the worker task
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
return 1;
}
- $priority = PRIORITY_MEDIUM;
+ $priority = self::PRIORITY_MEDIUM;
// Don't fork from frontend tasks by default
$dont_fork = DI::config()->get('system', 'worker_dont_fork', false) || !DI::mode()->isBackend();
$created = DateTimeFormat::utcNow();
$found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
$added = 0;
- if (!is_int($priority) || !in_array($priority, PRIORITIES)) {
+ if (!is_int($priority) || !in_array($priority, self::PRIORITIES)) {
Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]);
- $priority = PRIORITY_MEDIUM;
+ $priority = self::PRIORITY_MEDIUM;
}
// Quit if there was a database error - a precaution for the update process to 3.5.3
Worker\Daemon::checkState();
// Should we quit and wait for the worker to be called as a cronjob?
- if ($dont_fork) {
+ if ($dont_fork || self::systemLimitReached()) {
return $added;
}
$delay = (($new_retrial + 2) ** 4) + (rand(1, 30) * ($new_retrial));
$next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
- if (($priority < PRIORITY_MEDIUM) && ($new_retrial > 3)) {
- $priority = PRIORITY_MEDIUM;
- } elseif (($priority < PRIORITY_LOW) && ($new_retrial > 6)) {
- $priority = PRIORITY_LOW;
- } elseif (($priority < PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) {
- $priority = PRIORITY_NEGLIGIBLE;
+ if (($priority < self::PRIORITY_MEDIUM) && ($new_retrial > 3)) {
+ $priority = self::PRIORITY_MEDIUM;
+ } elseif (($priority < self::PRIORITY_LOW) && ($new_retrial > 6)) {
+ $priority = self::PRIORITY_LOW;
+ } elseif (($priority < self::PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) {
+ $priority = self::PRIORITY_NEGLIGIBLE;
}
Logger::info('Deferred task', ['id' => $id, 'retrial' => $new_retrial, 'created' => $queue['created'], 'next_execution' => $next, 'old_prio' => $queue['priority'], 'new_prio' => $priority]);