X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=src%2FCore%2FWorker.php;h=00885f69200a04f59c8a601020031ebc7a5d8a0c;hb=46cd39fb34613f4b331793f19c0e562f93125066;hp=830e02983210d208f55db7e83520f943a5bcc0c6;hpb=075638c0ae799a60f2b7595ad3c99145b71351e6;p=friendica.git diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 830e029832..00885f6920 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1,6 +1,6 @@ get('system', 'worker_last_cleaned', 0); + $last_cleanup = DI::keyValue()->get('worker_last_cleaned') ?? 0; if (time() > ($last_cleanup + 300)) { - DI::config()->set('system', 'worker_last_cleaned', time()); + DI::keyValue()->set( 'worker_last_cleaned', time()); Worker\Cron::killStaleWorkers(); } @@ -126,7 +141,7 @@ class Worker if (DI::lock()->acquire(self::LOCK_WORKER, 0)) { // Count active workers and compare them with a maximum value that depends on the load if (self::tooMuchWorkers()) { - Logger::notice('Active worker limit reached, quitting.'); + Logger::info('Active worker limit reached, quitting.'); DI::lock()->release(self::LOCK_WORKER); return; } @@ -143,7 +158,7 @@ class Worker } // Quit the worker once every cron interval - if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { + if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60)) && !self::systemLimitReached()) { Logger::info('Process lifetime reached, respawning.'); self::unclaimProcess($process); if (Worker\Daemon::isMode()) { @@ -173,7 +188,7 @@ class Worker { // Count active workers and compare them with a maximum value that depends on the load if (self::tooMuchWorkers()) { - Logger::notice('Active worker limit reached, quitting.'); + Logger::info('Active worker limit reached, quitting.'); return false; } @@ -300,17 +315,7 @@ class Worker return false; } - $valid = false; - if (strpos($file, 'include/') === 0) { - $valid = true; - } - - if (strpos($file, 'addon/') === 0) { - $valid = true; - } - - // Simply return flag - return $valid; + return (strpos($file, 'addon/') === 0); } /** @@ -326,7 +331,7 @@ class Worker $mypid = getmypid(); // Quit when in maintenance - if (DI::config()->get('system', 'maintenance', false, true)) { + if (DI::config()->get('system', 'maintenance', false)) { Logger::notice('Maintenance mode - quit process', ['pid' => $mypid]); return false; } @@ -357,7 +362,7 @@ class Worker return false; } - // Check for existance and validity of the include file + // Check for existence and validity of the include file $include = $argv[0]; if (method_exists(sprintf('Friendica\Worker\%s', $include), 'execute')) { @@ -383,7 +388,7 @@ class Worker $stamp = (float)microtime(true); $condition = ["`id` = ? AND `next_try` < ?", $queue['id'], DateTimeFormat::utcNow()]; if (DBA::update('workerqueue', ['done' => true], $condition)) { - DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow()); + DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow()); } self::$db_duration = (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); @@ -391,11 +396,6 @@ class Worker return true; } - // The script could be provided as full path or only with the function name - if ($include == basename($include)) { - $include = 'include/' . $include . '.php'; - } - if (!self::validateInclude($include)) { Logger::warning('Include file is not valid', ['file' => $argv[0]]); $stamp = (float)microtime(true); @@ -429,7 +429,7 @@ class Worker $stamp = (float)microtime(true); if (DBA::update('workerqueue', ['done' => true], ['id' => $queue['id']])) { - DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow()); + DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow()); } self::$db_duration = (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); @@ -445,42 +445,110 @@ class Worker } /** - * Execute a function from the queue + * Checks if system limits are reached. * - * @param array $queue Workerqueue entry - * @param string $funcname name of the function - * @param array $argv Array of values to be passed to the function - * @param boolean $method_call boolean - * @return void - * @throws \Friendica\Network\HTTPException\InternalServerErrorException + * @return boolean */ - private static function execFunction(array $queue, string $funcname, array $argv, bool $method_call) + private static function systemLimitReached(): bool { - $a = DI::app(); + $load_cooldown = DI::config()->get('system', 'worker_load_cooldown'); + $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown'); + if ($load_cooldown == 0) { + $load_cooldown = DI::config()->get('system', 'maxloadavg'); + } + + if (($load_cooldown == 0) && ($processes_cooldown == 0)) { + return false; + } + + $load = System::getLoadAvg($processes_cooldown != 0); + if (empty($load)) { + return false; + } + + if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) { + return true; + } + + if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) { + return true; + } + + return false; + } + + /** + * Slow the execution down if the system load is too high + * + * @return void + */ + public static function coolDown() + { $cooldown = DI::config()->get('system', 'worker_cooldown', 0); if ($cooldown > 0) { - Logger::debug('Pre execution cooldown.', ['cooldown' => $cooldown, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]); - sleep($cooldown); + Logger::debug('Wait for cooldown.', ['cooldown' => $cooldown]); + if ($cooldown < 1) { + usleep($cooldown * 1000000); + } else { + sleep($cooldown); + } } $load_cooldown = DI::config()->get('system', 'worker_load_cooldown'); $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown'); - while ((($load_cooldown > 0) || ($processes_cooldown > 0)) && ($load = System::getLoadAvg())) { + if ($load_cooldown == 0) { + $load_cooldown = DI::config()->get('system', 'maxloadavg'); + } + + if (($load_cooldown == 0) && ($processes_cooldown == 0)) { + return; + } + + $sleeping = false; + + while ($load = System::getLoadAvg($processes_cooldown != 0)) { if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) { - Logger::debug('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]); + if (!$sleeping) { + Logger::info('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]); + $sleeping = true; + } sleep(1); continue; } if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) { - Logger::debug('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]); + if (!$sleeping) { + Logger::info('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]); + $sleeping = true; + } sleep(1); continue; } break; } + if ($sleeping) { + Logger::info('Cooldown ended.', ['max-load' => $load_cooldown, 'max-processes' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]); + } + } + + /** + * Execute a function from the queue + * + * @param array $queue Workerqueue entry + * @param string $funcname name of the function + * @param array $argv Array of values to be passed to the function + * @param boolean $method_call boolean + * @return void + * @throws \Friendica\Network\HTTPException\InternalServerErrorException + */ + private static function execFunction(array $queue, string $funcname, array $argv, bool $method_call) + { + $a = DI::app(); + + self::coolDown(); + Logger::enableWorker($funcname); Logger::info('Process start.', ['priority' => $queue['priority'], 'id' => $queue['id']]); @@ -500,7 +568,15 @@ class Worker // Set the workerLogger as new default logger if ($method_call) { - call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv); + try { + call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv); + } catch (\TypeError $e) { + // No need to defer a worker queue entry if the arguments are invalid + Logger::notice('Wrong worker arguments', ['class' => $funcname, 'argv' => $argv, 'queue' => $queue, 'message' => $e->getMessage()]); + } catch (\Throwable $e) { + Logger::error('Uncaught exception in worker execution', ['class' => get_class($e), 'message' => $e->getMessage(), 'code' => $e->getCode(), 'file' => $e->getFile() . ':' . $e->getLine(), 'trace' => $e->getTraceAsString(), 'previous' => $e->getPrevious()]); + Worker::defer(); + } } else { $funcname($argv, count($argv)); } @@ -514,7 +590,7 @@ class Worker /* With these values we can analyze how effective the worker is. * The database and rest time should be low since this is the unproductive time. * The execution time is the productive time. - * By changing parameters like the maximum number of workers we can check the effectivness. + * By changing parameters like the maximum number of workers we can check the effectiveness. */ $dbtotal = round(self::$db_duration, 2); $dbread = round(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 2); @@ -527,6 +603,8 @@ class Worker Logger::info('Performance:', ['state' => self::$state, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'lock' => $dblock, 'total' => $dbtotal, 'rest' => $rest, 'exec' => $exec]); + self::coolDown(); + self::$up_start = microtime(true); self::$db_duration = 0; self::$db_duration_count = 0; @@ -547,11 +625,6 @@ class Worker Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration, 3)]); DI::profiler()->saveLog(DI::logger(), 'ID ' . $queue['id'] . ': ' . $funcname); - - if ($cooldown > 0) { - Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'cooldown' => $cooldown]); - sleep($cooldown); - } } /** @@ -589,16 +662,24 @@ class Worker DBA::close($r); } + $stamp = (float)microtime(true); + $used = 0; + $sleep = 0; + $data = DBA::p("SHOW PROCESSLIST"); + while ($row = DBA::fetch($data)) { + if ($row['Command'] != 'Sleep') { + ++$used; + } else { + ++$sleep; + } + } + DBA::close($data); + self::$db_duration += (microtime(true) - $stamp); + // If $max is set we will use the processlist to determine the current number of connections // The processlist only shows entries of the current user if ($max != 0) { - $stamp = (float)microtime(true); - $r = DBA::p('SHOW PROCESSLIST'); - self::$db_duration += (microtime(true) - $stamp); - $used = DBA::numRows($r); - DBA::close($r); - - Logger::info('Connection usage (user values)', ['usage' => $used, 'max' => $max]); + Logger::info('Connection usage (user values)', ['working' => $used, 'sleeping' => $sleep, 'max' => $max]); $level = ($used / $max) * 100; @@ -622,11 +703,11 @@ class Worker if (!DBA::isResult($r)) { return false; } - $used = intval($r['Value']); + $used = max($used, intval($r['Value'])) - $sleep; if ($used == 0) { return false; } - Logger::info('Connection usage (system values)', ['used' => $used, 'max' => $max]); + Logger::info('Connection usage (system values)', ['working' => $used, 'sleeping' => $sleep, 'max' => $max]); $level = $used / $max * 100; @@ -735,16 +816,16 @@ class Worker $top_priority = self::highestPriority(); $high_running = self::processWithPriorityActive($top_priority); - if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) { + if (!$high_running && ($top_priority > self::PRIORITY_UNDEFINED) && ($top_priority < self::PRIORITY_NEGLIGIBLE)) { Logger::info('Jobs with a higher priority are waiting but none is executed. Open a fastlane.', ['priority' => $top_priority]); $queues = $active + 1; } } - Logger::notice('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues); + Logger::info('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues); // Are there fewer workers running as possible? Then fork a new one. - if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists()) { + if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists() && !self::systemLimitReached()) { Logger::info('There are fewer workers as possible, fork a new worker.', ['active' => $active, 'queues' => $queues]); if (Worker\Daemon::isMode()) { Worker\IPC::SetJobState(true); @@ -804,7 +885,7 @@ class Worker /** * Returns waiting jobs for the current process id * - * @return array|bool waiting workerqueue jobs or FALSE on failture + * @return array|bool waiting workerqueue jobs or FALSE on failure * @throws \Exception */ private static function getWaitingJobForPID() @@ -867,7 +948,7 @@ class Worker private static function nextPriority() { $waiting = []; - $priorities = [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE]; + $priorities = [self::PRIORITY_CRITICAL, self::PRIORITY_HIGH, self::PRIORITY_MEDIUM, self::PRIORITY_LOW, self::PRIORITY_NEGLIGIBLE]; foreach ($priorities as $priority) { $stamp = (float)microtime(true); if (DBA::exists('workerqueue', ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()])) { @@ -876,8 +957,8 @@ class Worker self::$db_duration += (microtime(true) - $stamp); } - if (!empty($waiting[PRIORITY_CRITICAL])) { - return PRIORITY_CRITICAL; + if (!empty($waiting[self::PRIORITY_CRITICAL])) { + return self::PRIORITY_CRITICAL; } $running = []; @@ -1134,8 +1215,8 @@ class Worker * @param (integer|array) priority or parameter array, strings are deprecated and are ignored * * next args are passed as $cmd command line - * or: Worker::add(PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id); - * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id); + * or: Worker::add(Worker::PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id); + * or: Worker::add(array('priority' => Worker::PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id); * * @return int '0' if worker queue entry already existed or there had been an error, otherwise the ID of the worker task * @throws \Friendica\Network\HTTPException\InternalServerErrorException @@ -1158,7 +1239,7 @@ class Worker return 1; } - $priority = PRIORITY_MEDIUM; + $priority = self::PRIORITY_MEDIUM; // Don't fork from frontend tasks by default $dont_fork = DI::config()->get('system', 'worker_dont_fork', false) || !DI::mode()->isBackend(); $created = DateTimeFormat::utcNow(); @@ -1191,12 +1272,12 @@ class Worker $command = array_shift($args); $parameters = json_encode($args); - $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]); + $queue = DBA::selectFirst('workerqueue', ['id', 'priority'], ['command' => $command, 'parameter' => $parameters, 'done' => false]); $added = 0; - if (!is_int($priority) || !in_array($priority, PRIORITIES)) { + if (!is_int($priority) || !in_array($priority, self::PRIORITIES)) { Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]); - $priority = PRIORITY_MEDIUM; + $priority = self::PRIORITY_MEDIUM; } // Quit if there was a database error - a precaution for the update process to 3.5.3 @@ -1204,14 +1285,17 @@ class Worker return 0; } - if (!$found) { + if (empty($queue)) { if (!DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created, 'priority' => $priority, 'next_try' => $delayed])) { return 0; } $added = DBA::lastInsertId(); } elseif ($force_priority) { - DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]); + $ret = DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]); + if ($ret && ($priority != $queue['priority'])) { + $added = $queue['id']; + } } // Set the IPC flag to ensure an immediate process execution via daemon @@ -1222,7 +1306,7 @@ class Worker Worker\Daemon::checkState(); // Should we quit and wait for the worker to be called as a cronjob? - if ($dont_fork) { + if ($dont_fork || self::systemLimitReached()) { return $added; } @@ -1239,8 +1323,8 @@ class Worker return $added; } - // Quit on daemon mode - if (Worker\Daemon::isMode()) { + // Quit on daemon mode, except the priority is critical (like for db updates) + if (Worker\Daemon::isMode() && $priority !== self::PRIORITY_CRITICAL) { return $added; } @@ -1311,12 +1395,12 @@ class Worker $delay = (($new_retrial + 2) ** 4) + (rand(1, 30) * ($new_retrial)); $next = DateTimeFormat::utc('now + ' . $delay . ' seconds'); - if (($priority < PRIORITY_MEDIUM) && ($new_retrial > 3)) { - $priority = PRIORITY_MEDIUM; - } elseif (($priority < PRIORITY_LOW) && ($new_retrial > 6)) { - $priority = PRIORITY_LOW; - } elseif (($priority < PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) { - $priority = PRIORITY_NEGLIGIBLE; + if (($priority < self::PRIORITY_MEDIUM) && ($new_retrial > 3)) { + $priority = self::PRIORITY_MEDIUM; + } elseif (($priority < self::PRIORITY_LOW) && ($new_retrial > 6)) { + $priority = self::PRIORITY_LOW; + } elseif (($priority < self::PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) { + $priority = self::PRIORITY_NEGLIGIBLE; } Logger::info('Deferred task', ['id' => $id, 'retrial' => $new_retrial, 'created' => $queue['created'], 'next_execution' => $next, 'old_prio' => $queue['priority'], 'new_prio' => $priority]); @@ -1338,7 +1422,7 @@ class Worker */ public static function isInMaintenanceWindow(bool $check_last_execution = false): bool { - // Calculate the seconds of the start end end of the maintenance window + // Calculate the seconds of the start and end of the maintenance window $start = strtotime(DI::config()->get('system', 'maintenance_start')) % 86400; $end = strtotime(DI::config()->get('system', 'maintenance_end')) % 86400; @@ -1349,7 +1433,7 @@ class Worker $duration = max($start, $end) - min($start, $end); // Quit when the last cron execution had been after the previous window - $last_cron = DI::config()->get('system', 'last_cron_daily'); + $last_cron = DI::keyValue()->get('last_cron_daily'); if ($last_cron + $duration > time()) { Logger::info('The Daily cron had been executed recently', ['last' => date(DateTimeFormat::MYSQL, $last_cron), 'start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]); return false;