*/
class Worker
{
+ const PRIORITY_UNDEFINED = PRIORITY_UNDEFINED;
+ const PRIORITY_CRITICAL = PRIORITY_CRITICAL;
+ const PRIORITY_HIGH = PRIORITY_HIGH;
+ const PRIORITY_MEDIUM = PRIORITY_MEDIUM;
+ const PRIORITY_LOW = PRIORITY_LOW;
+ const PRIORITY_NEGLIGIBLE = PRIORITY_NEGLIGIBLE;
+
const STATE_STARTUP = 1; // Worker is in startup. This takes most time.
const STATE_LONG_LOOP = 2; // Worker is processing the whole - long - loop.
const STATE_REFETCH = 3; // Worker had refetched jobs in the execution loop.
}
// Quit the worker once every cron interval
- if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
+ if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60)) && !self::systemLimitReached()) {
Logger::info('Process lifetime reached, respawning.');
self::unclaimProcess($process);
if (Worker\Daemon::isMode()) {
return true;
}
+ /**
+ * Checks if system limits are reached.
+ *
+ * @return boolean
+ */
+ private static function systemLimitReached(): bool
+ {
+ $load_cooldown = DI::config()->get('system', 'worker_load_cooldown');
+ $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');
+
+ if ($load_cooldown == 0) {
+ $load_cooldown = DI::config()->get('system', 'maxloadavg');
+ }
+
+ if (($load_cooldown == 0) && ($processes_cooldown == 0)) {
+ return false;
+ }
+
+ $load = System::getLoadAvg();
+ if (empty($load)) {
+ return false;
+ }
+
+ if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) {
+ return true;
+ }
+
+ if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) {
+ return true;
+ }
+
+ return false;
+ }
+
/**
* Slow the execution down if the system load is too high
*
*/
public static function coolDown()
{
+ $cooldown = DI::config()->get('system', 'worker_cooldown', 0);
+ if ($cooldown > 0) {
+ Logger::debug('Wait for cooldown.', ['cooldown' => $cooldown]);
+ if ($cooldown < 1) {
+ usleep($cooldown * 1000000);
+ } else {
+ sleep($cooldown);
+ }
+ }
+
$load_cooldown = DI::config()->get('system', 'worker_load_cooldown');
$processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');
+ if ($load_cooldown == 0) {
+ $load_cooldown = DI::config()->get('system', 'maxloadavg');
+ }
+
if (($load_cooldown == 0) && ($processes_cooldown == 0)) {
return;
}
{
$a = DI::app();
- $cooldown = DI::config()->get('system', 'worker_cooldown', 0);
- if ($cooldown > 0) {
- Logger::notice('Pre execution cooldown.', ['cooldown' => $cooldown, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
- sleep($cooldown);
- }
-
self::coolDown();
Logger::enableWorker($funcname);
Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration, 3)]);
DI::profiler()->saveLog(DI::logger(), 'ID ' . $queue['id'] . ': ' . $funcname);
-
- if ($cooldown > 0) {
- Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'cooldown' => $cooldown]);
- sleep($cooldown);
- }
}
/**
Logger::notice('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues);
// Are there fewer workers running as possible? Then fork a new one.
- if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists()) {
+ if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists() && !self::systemLimitReached()) {
Logger::info('There are fewer workers as possible, fork a new worker.', ['active' => $active, 'queues' => $queues]);
if (Worker\Daemon::isMode()) {
Worker\IPC::SetJobState(true);
Worker\Daemon::checkState();
// Should we quit and wait for the worker to be called as a cronjob?
- if ($dont_fork) {
+ if ($dont_fork || self::systemLimitReached()) {
return $added;
}