From: Michael <heluecht@pirati.ca>
Date: Sun, 17 Feb 2019 03:22:29 +0000 (+0000)
Subject: The number of workers per priority is now calculated dynamically
X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=01d6ba85ffac949fbd88d1157030056ddf921e4a;p=friendica.git

The number of workers per priority is now calculated dynamically
---

diff --git a/src/Core/Worker.php b/src/Core/Worker.php
index a435d86b64..b1e4aa41e1 100644
--- a/src/Core/Worker.php
+++ b/src/Core/Worker.php
@@ -834,64 +834,6 @@ class Worker
 		return $count;
 	}
 
-	/**
-	 * @brief Check if we should pass some slow processes
-	 *
-	 * When the active processes of the highest priority are using more than 2/3
-	 * of all processes, we let pass slower processes.
-	 *
-	 * @param string $highest_priority Returns the currently highest priority
-	 * @return bool We let pass a slower process than $highest_priority
-	 * @throws \Exception
-	 */
-	private static function passingSlow(&$highest_priority)
-	{
-		$highest_priority = 0;
-
-		$stamp = (float)microtime(true);
-		$r = DBA::p(
-			"SELECT `priority`
-				FROM `process`
-				INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done`"
-		);
-		self::$db_duration += (microtime(true) - $stamp);
-
-		// No active processes at all? Fine
-		if (!DBA::isResult($r)) {
-			return false;
-		}
-		$priorities = [];
-		while ($line = DBA::fetch($r)) {
-			$priorities[] = $line["priority"];
-		}
-		DBA::close($r);
-
-		// Should not happen
-		if (count($priorities) == 0) {
-			return false;
-		}
-		$highest_priority = min($priorities);
-
-		// The highest process is already the slowest one?
-		// Then we quit
-		if ($highest_priority == PRIORITY_NEGLIGIBLE) {
-			return false;
-		}
-		$high = 0;
-		foreach ($priorities as $priority) {
-			if ($priority == $highest_priority) {
-				++$high;
-			}
-		}
-		Logger::log("Highest priority: ".$highest_priority." Total processes: ".count($priorities)." Count high priority processes: ".$high, Logger::DEBUG);
-		$passing_slow = (($high/count($priorities)) > (2/3));
-
-		if ($passing_slow) {
-			Logger::log("Passing slower processes than priority ".$highest_priority, Logger::DEBUG);
-		}
-		return $passing_slow;
-	}
-
 	public static function nextProcess()
 	{
 		$priority = self::nextPriority();
@@ -937,6 +879,7 @@ class Worker
 		}
 
 		$running = [];
+		$running_total = 0;
 		$stamp = (float)microtime(true);
 		$processes = DBA::p("SELECT COUNT(DISTINCT(`process`.`pid`)) AS `running`, `priority` FROM `process`
 			INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid`
@@ -944,28 +887,43 @@ class Worker
 		self::$db_duration += (microtime(true) - $stamp);
 		while ($process = DBA::fetch($processes)) {
 			$running[$process['priority']] = $process['running'];
+			$running_total += $process['running'];
 		}
 		DBA::close($processes);
 
-		$active = self::activeWorkers();
-
 		foreach ($priorities as $priority) {
 			if (!empty($waiting[$priority]) && empty($running[$priority])) {
+				Logger::log('No running worker found with priority ' . $priority . ' - assigning it.', Logger::DEBUG);
 				return $priority;
 			}
 		}
 
-		// Temp
-		if (!empty($running[PRIORITY_LOW]) && ($running[PRIORITY_LOW] < 3)) {
-			return PRIORITY_LOW;
+		$active = max(self::activeWorkers(), $running_total);
+		$priorities = max(count($waiting), count($running));
+		$exponent = 2;
+
+		$total = 0;
+		for ($i = 1; $i <= $priorities; ++$i) {
+			$total += pow($i, $exponent);
 		}
 
-		if (!empty($running[PRIORITY_NEGLIGIBLE]) && ($running[PRIORITY_NEGLIGIBLE] < 2)) {
-			return PRIORITY_NEGLIGIBLE;
+		$limit = [];
+		for ($i = 1; $i <= $priorities; ++$i) {
+			$limit[$priorities - $i] = max(1, round($active * (pow($i, $exponent) / $total)));
+		}
+
+		$i = 0;
+		foreach ($running as $priority => $workers) {
+			if ($workers < $limit[$i++]) {
+				Logger::log('Priority ' . $priority . ' has got ' . $workers . ' workers out of a limit of ' . $limit[$i - 1], Logger::DEBUG);
+				return $priority;
+			}
 		}
 
 		if (!empty($waiting)) {
-			return array_shift(array_keys($waiting));
+			$priority =  array_shift(array_keys($waiting));
+			Logger::log('No underassigned priority found, now taking the highest priority (' . $priority . ').', Logger::DEBUG);
+			return $priority;
 		}
 
 		return false;
@@ -984,89 +942,11 @@ class Worker
 	{
 		$mypid = getmypid();
 
-		// Check if we should pass some low priority process
-		$highest_priority = 0;
-		$found = false;
 		$passing_slow = false;
 
-		// The higher the number of parallel workers, the more we prefetch to prevent concurring access
-		// We decrease the limit with the number of entries left in the queue
-		$worker_queues = Config::get("system", "worker_queues", 4);
-		$queue_length = Config::get('system', 'worker_fetch_limit', 1);
-		$lower_job_limit = $worker_queues * $queue_length * 2;
-		$entries = max($entries - $deferred, 0);
-
-		// Now do some magic
-		$exponent = 2;
-		$slope = $queue_length / pow($lower_job_limit, $exponent);
-		$limit = min($queue_length, ceil($slope * pow($entries, $exponent)));
-
-		Logger::log('Deferred: ' . $deferred . ' - Total: ' . $entries . ' - Maximum: ' . $queue_length . ' - jobs per queue: ' . $limit, Logger::DEBUG);
-
 		$ids = self::nextProcess();
 		$found = (count($ids) > 0);
 
-		if (!$found && self::passingSlow($highest_priority)) {
-			// Are there waiting processes with a higher priority than the currently highest?
-			$stamp = (float)microtime(true);
-			$result = DBA::select(
-				'workerqueue',
-				['id'],
-				["`pid` = 0 AND `priority` < ? AND NOT `done` AND `next_try` < ?",
-				$highest_priority, DateTimeFormat::utcNow()],
-				['limit' => 1, 'order' => ['priority', 'created']]
-			);
-			self::$db_duration += (microtime(true) - $stamp);
-
-			while ($id = DBA::fetch($result)) {
-				$ids[] = $id["id"];
-			}
-			DBA::close($result);
-
-			$found = (count($ids) > 0);
-
-			if (!$found) {
-				// Give slower processes some processing time
-				$stamp = (float)microtime(true);
-				$result = DBA::select(
-					'workerqueue',
-					['id'],
-					["`pid` = 0 AND `priority` > ? AND NOT `done` AND `next_try` < ?",
-					$highest_priority, DateTimeFormat::utcNow()],
-					['limit' => 1, 'order' => ['priority', 'created']]
-				);
-				self::$db_duration += (microtime(true) - $stamp);
-
-				while ($id = DBA::fetch($result)) {
-					$ids[] = $id["id"];
-				}
-				DBA::close($result);
-
-				$found = (count($ids) > 0);
-				$passing_slow = $found;
-			}
-		}
-
-		// At first try to fetch a bunch of high or medium tasks
-		if (!$found && ($limit > 1)) {
-			$stamp = (float)microtime(true);
-			$result = DBA::select(
-				'workerqueue',
-				['id'],
-				["`pid` = 0 AND NOT `done` AND `priority` <= ? AND `next_try` < ? AND `retrial` = 0",
-				PRIORITY_MEDIUM, DateTimeFormat::utcNow()],
-				['limit' => $limit, 'order' => ['created']]
-			);
-			self::$db_duration += (microtime(true) - $stamp);
-
-			while ($id = DBA::fetch($result)) {
-				$ids[] = $id["id"];
-			}
-			DBA::close($result);
-
-			$found = (count($ids) > 0);
-		}
-
 		// If there is no result (or we shouldn't pass lower processes) we check without priority limit
 		if (!$found) {
 			$stamp = (float)microtime(true);