use Friendica\DI;
use Friendica\Model\Process;
use Friendica\Util\DateTimeFormat;
-use Friendica\Util\Network;
/**
* Contains the class for the worker background job processing
const FAST_COMMANDS = ['APDelivery', 'Delivery', 'CreateShadowEntry'];
+ const LOCK_PROCESS = 'worker_process';
+ const LOCK_WORKER = 'worker';
private static $up_start;
private static $db_duration = 0;
self::killStaleWorkers();
}
- // Count active workers and compare them with a maximum value that depends on the load
- if (self::tooMuchWorkers()) {
- Logger::info('Pre check: Active worker limit reached, quitting.');
- return;
- }
-
- // Do we have too few memory?
- if (DI::process()->isMinMemoryReached()) {
- Logger::info('Pre check: Memory limit reached, quitting.');
- return;
- }
-
- // Possibly there are too much database connections
- if (self::maxConnectionsReached()) {
- Logger::info('Pre check: maximum connections reached, quitting.');
- return;
- }
-
- // Possibly there are too much database processes that block the system
- if (DI::process()->isMaxProcessesReached()) {
- Logger::info('Pre check: maximum processes reached, quitting.');
+ // Check if the system is ready
+ if (!self::isReady()) {
return;
}
}
// Trying to fetch new processes - but only once when successful
- if (!$refetched && DI::lock()->acquire('worker_process', 0)) {
+ if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) {
self::findWorkerProcesses();
- DI::lock()->release('worker_process');
+ DI::lock()->release(self::LOCK_PROCESS);
self::$state = self::STATE_REFETCH;
$refetched = true;
} else {
if (!self::getWaitingJobForPID()) {
self::$state = self::STATE_LONG_LOOP;
- if (DI::lock()->acquire('worker', 0)) {
+ if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
// Count active workers and compare them with a maximum value that depends on the load
if (self::tooMuchWorkers()) {
Logger::info('Active worker limit reached, quitting.');
- DI::lock()->release('worker');
+ DI::lock()->release(self::LOCK_WORKER);
return;
}
// Check free memory
if (DI::process()->isMinMemoryReached()) {
Logger::info('Memory limit reached, quitting.');
- DI::lock()->release('worker');
+ DI::lock()->release(self::LOCK_WORKER);
return;
}
- DI::lock()->release('worker');
+ DI::lock()->release(self::LOCK_WORKER);
}
}
Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]);
}
+ /**
+ * Checks if the system is ready.
+ *
+ * Several system parameters like memory, connections and processes are checked.
+ *
+ * @return boolean
+ */
+ public static function isReady()
+ {
+ // Count active workers and compare them with a maximum value that depends on the load
+ if (self::tooMuchWorkers()) {
+ Logger::info('Active worker limit reached, quitting.');
+ return false;
+ }
+
+ // Do we have too few memory?
+ if (DI::process()->isMinMemoryReached()) {
+ Logger::info('Memory limit reached, quitting.');
+ return false;
+ }
+
+ // Possibly there are too much database connections
+ if (self::maxConnectionsReached()) {
+ Logger::info('Maximum connections reached, quitting.');
+ return false;
+ }
+
+ // Possibly there are too much database processes that block the system
+ if (DI::process()->isMaxProcessesReached()) {
+ Logger::info('Maximum processes reached, quitting.');
+ return false;
+ }
+
+ return true;
+ }
+
/**
* Check if non executed tasks do exist in the worker queue
*
* @return boolean Returns "true" if tasks are existing
* @throws \Exception
*/
- private static function entriesExists()
+ public static function entriesExists()
{
$stamp = (float)microtime(true);
$exists = DBA::exists('workerqueue', ["NOT `done` AND `pid` = 0 AND `next_try` < ?", DateTimeFormat::utcNow()]);
}
}
- Logger::log("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $waiting_processes . $processlist . " - maximum: " . $queues . "/" . $maxqueues, Logger::DEBUG);
+ Logger::notice("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $waiting_processes . $processlist . " - maximum: " . $queues . "/" . $maxqueues);
// Are there fewer workers running as possible? Then fork a new one.
if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) {
}
$stamp = (float)microtime(true);
- if (!DI::lock()->acquire('worker_process')) {
+ if (!DI::lock()->acquire(self::LOCK_PROCESS)) {
return false;
}
self::$lock_duration += (microtime(true) - $stamp);
$found = self::findWorkerProcesses();
- DI::lock()->release('worker_process');
+ DI::lock()->release(self::LOCK_PROCESS);
if ($found) {
$stamp = (float)microtime(true);
}
$url = DI::baseUrl() . '/worker';
- Network::fetchUrl($url, false, 1);
+ DI::httpRequest()->fetch($url, false, 1);
}
/**
}
// If there is a lock then we don't have to check for too much worker
- if (!DI::lock()->acquire('worker', 0)) {
+ if (!DI::lock()->acquire(self::LOCK_WORKER, 0)) {
return $added;
}
// If there are already enough workers running, don't fork another one
$quit = self::tooMuchWorkers();
- DI::lock()->release('worker');
+ DI::lock()->release(self::LOCK_WORKER);
if ($quit) {
return $added;