class Worker
{
const STATE_STARTUP = 1; // Worker is in startup. This takes most time.
- const STATE_SHORT_LOOP = 2; // Worker is processing preassigned jobs, thus saving much time.
+ const STATE_LONG_LOOP = 2; // Worker is processing the whole - long - loop.
const STATE_REFETCH = 3; // Worker had refetched jobs in the execution loop.
- const STATE_LONG_LOOP = 4; // Worker is processing the whole - long - loop.
+ const STATE_SHORT_LOOP = 4; // Worker is processing preassigned jobs, thus saving much time.
private static $up_start;
private static $db_duration = 0;
// We fetch the next queue entry that is about to be executed
while ($r = self::workerProcess()) {
+ $refetched = false;
foreach ($r as $entry) {
// Assure that the priority is an integer value
$entry['priority'] = (int)$entry['priority'];
return;
}
- // If possible we will fetch new jobs for this worker
- if (!self::getWaitingJobForPID() && Lock::acquire('worker_process', 0)) {
+ // Trying to fetch new processes - but only once when successful
+ if (!$refetched && Lock::acquire('worker_process', 0)) {
self::findWorkerProcesses();
Lock::release('worker_process');
self::$state = self::STATE_REFETCH;
+ $refetched = true;
+ } else {
+ self::$state = self::STATE_SHORT_LOOP;
}
}
- if (self::$state != self::STATE_REFETCH) {
+ // To avoid the quitting of multiple workers only one worker at a time will execute the check
+ if (!self::getWaitingJobForPID()) {
self::$state = self::STATE_LONG_LOOP;
- }
- // To avoid the quitting of multiple workers only one worker at a time will execute the check
- if (Lock::acquire('worker', 0)) {
+ if (Lock::acquire('worker', 0)) {
// Count active workers and compare them with a maximum value that depends on the load
- if (self::tooMuchWorkers()) {
- Logger::log('Active worker limit reached, quitting.', Logger::DEBUG);
- Lock::release('worker');
- return;
- }
+ if (self::tooMuchWorkers()) {
+ Logger::log('Active worker limit reached, quitting.', Logger::DEBUG);
+ Lock::release('worker');
+ return;
+ }
- // Check free memory
- if ($a->isMinMemoryReached()) {
- Logger::log('Memory limit reached, quitting.', Logger::DEBUG);
+ // Check free memory
+ if ($a->isMinMemoryReached()) {
+ Logger::log('Memory limit reached, quitting.', Logger::DEBUG);
+ Lock::release('worker');
+ return;
+ }
Lock::release('worker');
- return;
}
- Lock::release('worker');
}
// Quit the worker once every cron interval
self::$db_duration_stat = 0;
self::$db_duration_write = 0;
self::$lock_duration = 0;
- self::$state = self::STATE_SHORT_LOOP;
if ($duration > 3600) {
$logger->info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);