const LAST_CHECK = 'worker::check';
private static $up_start;
- private static $db_duration = 0;
+ private static $db_duration = 0;
private static $db_duration_count = 0;
private static $db_duration_write = 0;
- private static $db_duration_stat = 0;
- private static $lock_duration = 0;
+ private static $db_duration_stat = 0;
+ private static $lock_duration = 0;
private static $last_update;
private static $state;
/** @var Process */
Worker\Cron::run();
}
- $last_check = $starttime = time();
+ $last_check = $starttime = time();
self::$state = self::STATE_STARTUP;
// We fetch the next queue entry that is about to be executed
self::findWorkerProcesses();
DI::lock()->release(self::LOCK_PROCESS);
self::$state = self::STATE_REFETCH;
- $refetched = true;
+ $refetched = true;
} else {
self::$state = self::STATE_SHORT_LOOP;
}
self::$state = self::STATE_LONG_LOOP;
if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
- // Count active workers and compare them with a maximum value that depends on the load
+ // Count active workers and compare them with a maximum value that depends on the load
if (self::tooMuchWorkers()) {
Logger::info('Active worker limit reached, quitting.');
DI::lock()->release(self::LOCK_WORKER);
*/
public static function entriesExists(): bool
{
- $stamp = (float)microtime(true);
+ $stamp = (float)microtime(true);
$exists = DBA::exists('workerqueue', ["NOT `done` AND `pid` = 0 AND `next_try` < ?", DateTimeFormat::utcNow()]);
self::$db_duration += (microtime(true) - $stamp);
return $exists;
*/
private static function highestPriority(): int
{
- $stamp = (float)microtime(true);
- $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
+ $stamp = (float)microtime(true);
+ $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
$workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]);
self::$db_duration += (microtime(true) - $stamp);
if (DBA::isResult($workerqueue)) {
self::$last_update = strtotime($queue['executed']);
}
- $age = (time() - self::$last_update) / 60;
+ $age = (time() - self::$last_update) / 60;
self::$last_update = time();
if ($age > 1) {
self::execFunction($queue, $include, $argv, true);
- $stamp = (float)microtime(true);
+ $stamp = (float)microtime(true);
$condition = ["`id` = ? AND `next_try` < ?", $queue['id'], DateTimeFormat::utcNow()];
if (DBA::update('workerqueue', ['done' => true], $condition)) {
DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow());
self::$last_update = strtotime($queue['executed']);
}
- $age = (time() - self::$last_update) / 60;
+ $age = (time() - self::$last_update) / 60;
self::$last_update = time();
if ($age > 1) {
self::coolDown();
- self::$up_start = microtime(true);
- self::$db_duration = 0;
+ self::$up_start = microtime(true);
+ self::$db_duration = 0;
self::$db_duration_count = 0;
- self::$db_duration_stat = 0;
+ self::$db_duration_stat = 0;
self::$db_duration_write = 0;
- self::$lock_duration = 0;
+ self::$lock_duration = 0;
if ($duration > 3600) {
- Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
+ Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration / 60, 3)]);
} elseif ($duration > 600) {
- Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
+ Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration / 60, 3)]);
} elseif ($duration > 300) {
- Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
+ Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration / 60, 3)]);
} elseif ($duration > 120) {
- Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
+ Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration / 60, 3)]);
}
Logger::info('Process done.', ['function' => $funcname, 'priority' => $queue['priority'], 'retrial' => $queue['retrial'], 'id' => $queue['id'], 'duration' => round($duration, 3)]);
}
// Or it can be granted. This overrides the system variable
$stamp = (float)microtime(true);
- $r = DBA::p('SHOW GRANTS');
+ $r = DBA::p('SHOW GRANTS');
self::$db_duration += (microtime(true) - $stamp);
while ($grants = DBA::fetch($r)) {
$grant = array_pop($grants);
$stamp = (float)microtime(true);
$used = 0;
$sleep = 0;
- $data = DBA::p("SHOW PROCESSLIST");
+ $data = DBA::p("SHOW PROCESSLIST");
while ($row = DBA::fetch($data)) {
if ($row['Command'] != 'Sleep') {
++$used;
* With exponent 1, you could have 20 max queues at idle and 13 at 37% of $maxsysload.
*/
$exponent = intval(DI::config()->get('system', 'worker_load_exponent', 3));
- $slope = pow(max(0, $maxsysload - $load) / $maxsysload, $exponent);
- $queues = intval(ceil($slope * $maxqueues));
+ $slope = pow(max(0, $maxsysload - $load) / $maxsysload, $exponent);
+ $queues = intval(ceil($slope * $maxqueues));
$processlist = '';
if (DI::config()->get('system', 'worker_jpm')) {
- $intervals = explode(',', DI::config()->get('system', 'worker_jpm_range'));
+ $intervals = explode(',', DI::config()->get('system', 'worker_jpm_range'));
$jobs_per_minute = [];
foreach ($intervals as $interval) {
if ($interval == 0) {
}
$stamp = (float)microtime(true);
- $jobs = DBA::count('workerqueue', ["`done` AND `executed` > ?", DateTimeFormat::utc('now - ' . $interval . ' minute')]);
+ $jobs = DBA::count('workerqueue', ["`done` AND `executed` > ?", DateTimeFormat::utc('now - ' . $interval . ' minute')]);
self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_stat += (microtime(true) - $stamp);
$jobs_per_minute[$interval] = number_format($jobs / $interval, 0);
$waiting_processes = 0;
// Now adding all processes with workerqueue entries
$stamp = (float)microtime(true);
- $jobs = DBA::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` GROUP BY `priority`");
+ $jobs = DBA::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` GROUP BY `priority`");
self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_stat += (microtime(true) - $stamp);
while ($entry = DBA::fetch($jobs)) {
- $stamp = (float)microtime(true);
+ $stamp = (float)microtime(true);
$running = DBA::count('workerqueue-view', ['priority' => $entry['priority']]);
self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_stat += (microtime(true) - $stamp);
}
DBA::close($jobs);
} else {
- $waiting_processes = self::totalEntries();
- $stamp = (float)microtime(true);
- $jobs = DBA::p("SELECT COUNT(*) AS `running`, `priority` FROM `workerqueue-view` GROUP BY `priority` ORDER BY `priority`");
+ $waiting_processes = self::totalEntries();
+ $stamp = (float)microtime(true);
+ $jobs = DBA::p("SELECT COUNT(*) AS `running`, `priority` FROM `workerqueue-view` GROUP BY `priority` ORDER BY `priority`");
self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_stat += (microtime(true) - $stamp);
*/
private static function getWorkerPIDList(): array
{
- $ids = [];
+ $ids = [];
$stamp = (float)microtime(true);
$queues = DBA::p("SELECT `process`.`pid`, COUNT(`workerqueue`.`pid`) AS `entries` FROM `process`
private static function getWaitingJobForPID()
{
$stamp = (float)microtime(true);
- $r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]);
+ $r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]);
self::$db_duration += (microtime(true) - $stamp);
if (DBA::isResult($r)) {
return DBA::toArray($r);
return [];
}
- $ids = [];
- $stamp = (float)microtime(true);
+ $ids = [];
+ $stamp = (float)microtime(true);
$condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
- $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['retrial', 'created']]);
+ $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['retrial', 'created']]);
self::$db_duration += (microtime(true) - $stamp);
while ($task = DBA::fetch($tasks)) {
$ids[] = $task['id'];
*/
private static function nextPriority()
{
- $waiting = [];
+ $waiting = [];
$priorities = [self::PRIORITY_CRITICAL, self::PRIORITY_HIGH, self::PRIORITY_MEDIUM, self::PRIORITY_LOW, self::PRIORITY_NEGLIGIBLE];
foreach ($priorities as $priority) {
$stamp = (float)microtime(true);
return self::PRIORITY_CRITICAL;
}
- $running = [];
+ $running = [];
$running_total = 0;
- $stamp = (float)microtime(true);
- $processes = DBA::p("SELECT COUNT(DISTINCT(`pid`)) AS `running`, `priority` FROM `workerqueue-view` GROUP BY `priority`");
+ $stamp = (float)microtime(true);
+ $processes = DBA::p("SELECT COUNT(DISTINCT(`pid`)) AS `running`, `priority` FROM `workerqueue-view` GROUP BY `priority`");
self::$db_duration += (microtime(true) - $stamp);
while ($process = DBA::fetch($processes)) {
$running[$process['priority']] = $process['running'];
}
}
- $active = max(self::activeWorkers(), $running_total);
+ $active = max(self::activeWorkers(), $running_total);
$priorities = max(count($waiting), count($running));
- $exponent = 2;
+ $exponent = 2;
$total = 0;
for ($i = 1; $i <= $priorities; ++$i) {
}
$limit = $fetch_limit * count($pids);
} else {
- $pids = [getmypid()];
+ $pids = [getmypid()];
$limit = $fetch_limit;
}
// If there is not enough results we check without priority limit
if ($limit > 0) {
- $stamp = (float)microtime(true);
+ $stamp = (float)microtime(true);
$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
- $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'retrial', 'created']]);
+ $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'retrial', 'created']]);
self::$db_duration += (microtime(true) - $stamp);
while ($task = DBA::fetch($tasks)) {
$stamp = (float)microtime(true);
foreach ($worker as $worker_pid => $worker_ids) {
Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
- DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
- ['id' => $worker_ids, 'done' => false, 'pid' => 0]);
+ DBA::update(
+ 'workerqueue',
+ ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
+ ['id' => $worker_ids, 'done' => false, 'pid' => 0]
+ );
}
self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
$priority = self::PRIORITY_MEDIUM;
// Don't fork from frontend tasks by default
- $dont_fork = DI::config()->get('system', 'worker_dont_fork', false) || !DI::mode()->isBackend();
- $created = DateTimeFormat::utcNow();
- $delayed = DBA::NULL_DATETIME;
+ $dont_fork = DI::config()->get('system', 'worker_dont_fork', false) || !DI::mode()->isBackend();
+ $created = DateTimeFormat::utcNow();
+ $delayed = DBA::NULL_DATETIME;
$force_priority = false;
$run_parameter = array_shift($args);
throw new \InvalidArgumentException('Priority number or task parameter array expected as first argument');
}
- $command = array_shift($args);
+ $command = array_shift($args);
$parameters = json_encode($args);
- $queue = DBA::selectFirst('workerqueue', ['id', 'priority'], ['command' => $command, 'parameter' => $parameters, 'done' => false]);
- $added = 0;
+ $queue = DBA::selectFirst('workerqueue', ['id', 'priority'], ['command' => $command, 'parameter' => $parameters, 'done' => false]);
+ $added = 0;
if (!is_int($priority) || !in_array($priority, self::PRIORITIES)) {
Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command]);
if (empty($queue)) {
if (!DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created,
- 'priority' => $priority, 'next_try' => $delayed])) {
+ 'priority' => $priority, 'next_try' => $delayed])) {
return 0;
}
$added = DBA::lastInsertId();
*/
private static function getNextRetrial(array $queue, int $max_level): int
{
- $created = strtotime($queue['created']);
+ $created = strtotime($queue['created']);
$retrial_time = time() - $created;
$new_retrial = $queue['retrial'] + 1;
- $total = 0;
+ $total = 0;
for ($retrial = 0; $retrial <= $max_level + 1; ++$retrial) {
$delay = (($retrial + 3) ** 4) + (rand(1, 30) * ($retrial + 1));
$total += $delay;
return false;
}
- $id = $queue['id'];
+ $id = $queue['id'];
$priority = $queue['priority'];
$max_level = DI::config()->get('system', 'worker_defer_limit');
// Calculate the delay until the next trial
$delay = (($new_retrial + 2) ** 4) + (rand(1, 30) * ($new_retrial));
- $next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
+ $next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
if (($priority < self::PRIORITY_MEDIUM) && ($new_retrial > 3)) {
$priority = self::PRIORITY_MEDIUM;
Logger::info('Deferred task', ['id' => $id, 'retrial' => $new_retrial, 'created' => $queue['created'], 'next_execution' => $next, 'old_prio' => $queue['priority'], 'new_prio' => $priority]);
- $stamp = (float)microtime(true);
+ $stamp = (float)microtime(true);
$fields = ['retrial' => $new_retrial, 'next_try' => $next, 'executed' => DBA::NULL_DATETIME, 'pid' => 0, 'priority' => $priority];
DBA::update('workerqueue', $fields, ['id' => $id]);
self::$db_duration += (microtime(true) - $stamp);
{
// Calculate the seconds of the start and end of the maintenance window
$start = strtotime(DI::config()->get('system', 'maintenance_start')) % 86400;
- $end = strtotime(DI::config()->get('system', 'maintenance_end')) % 86400;
+ $end = strtotime(DI::config()->get('system', 'maintenance_end')) % 86400;
Logger::info('Maintenance window', ['start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]);