]> git.mxchange.org Git - friendica.git/commitdiff
The number of workers per priority is now calculated dynamically
authorMichael <heluecht@pirati.ca>
Sun, 17 Feb 2019 03:22:29 +0000 (03:22 +0000)
committerMichael <heluecht@pirati.ca>
Sun, 17 Feb 2019 03:22:29 +0000 (03:22 +0000)
src/Core/Worker.php

index a435d86b64c7449960630e400e11a0098049e180..b1e4aa41e1c59acf8f1a07721a41ef363c291213 100644 (file)
@@ -834,64 +834,6 @@ class Worker
                return $count;
        }
 
-       /**
-        * @brief Check if we should pass some slow processes
-        *
-        * When the active processes of the highest priority are using more than 2/3
-        * of all processes, we let pass slower processes.
-        *
-        * @param string $highest_priority Returns the currently highest priority
-        * @return bool We let pass a slower process than $highest_priority
-        * @throws \Exception
-        */
-       private static function passingSlow(&$highest_priority)
-       {
-               $highest_priority = 0;
-
-               $stamp = (float)microtime(true);
-               $r = DBA::p(
-                       "SELECT `priority`
-                               FROM `process`
-                               INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done`"
-               );
-               self::$db_duration += (microtime(true) - $stamp);
-
-               // No active processes at all? Fine
-               if (!DBA::isResult($r)) {
-                       return false;
-               }
-               $priorities = [];
-               while ($line = DBA::fetch($r)) {
-                       $priorities[] = $line["priority"];
-               }
-               DBA::close($r);
-
-               // Should not happen
-               if (count($priorities) == 0) {
-                       return false;
-               }
-               $highest_priority = min($priorities);
-
-               // The highest process is already the slowest one?
-               // Then we quit
-               if ($highest_priority == PRIORITY_NEGLIGIBLE) {
-                       return false;
-               }
-               $high = 0;
-               foreach ($priorities as $priority) {
-                       if ($priority == $highest_priority) {
-                               ++$high;
-                       }
-               }
-               Logger::log("Highest priority: ".$highest_priority." Total processes: ".count($priorities)." Count high priority processes: ".$high, Logger::DEBUG);
-               $passing_slow = (($high/count($priorities)) > (2/3));
-
-               if ($passing_slow) {
-                       Logger::log("Passing slower processes than priority ".$highest_priority, Logger::DEBUG);
-               }
-               return $passing_slow;
-       }
-
        public static function nextProcess()
        {
                $priority = self::nextPriority();
@@ -937,6 +879,7 @@ class Worker
                }
 
                $running = [];
+               $running_total = 0;
                $stamp = (float)microtime(true);
                $processes = DBA::p("SELECT COUNT(DISTINCT(`process`.`pid`)) AS `running`, `priority` FROM `process`
                        INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid`
@@ -944,28 +887,43 @@ class Worker
                self::$db_duration += (microtime(true) - $stamp);
                while ($process = DBA::fetch($processes)) {
                        $running[$process['priority']] = $process['running'];
+                       $running_total += $process['running'];
                }
                DBA::close($processes);
 
-               $active = self::activeWorkers();
-
                foreach ($priorities as $priority) {
                        if (!empty($waiting[$priority]) && empty($running[$priority])) {
+                               Logger::log('No running worker found with priority ' . $priority . ' - assigning it.', Logger::DEBUG);
                                return $priority;
                        }
                }
 
-               // Temp
-               if (!empty($running[PRIORITY_LOW]) && ($running[PRIORITY_LOW] < 3)) {
-                       return PRIORITY_LOW;
+               $active = max(self::activeWorkers(), $running_total);
+               $priorities = max(count($waiting), count($running));
+               $exponent = 2;
+
+               $total = 0;
+               for ($i = 1; $i <= $priorities; ++$i) {
+                       $total += pow($i, $exponent);
                }
 
-               if (!empty($running[PRIORITY_NEGLIGIBLE]) && ($running[PRIORITY_NEGLIGIBLE] < 2)) {
-                       return PRIORITY_NEGLIGIBLE;
+               $limit = [];
+               for ($i = 1; $i <= $priorities; ++$i) {
+                       $limit[$priorities - $i] = max(1, round($active * (pow($i, $exponent) / $total)));
+               }
+
+               $i = 0;
+               foreach ($running as $priority => $workers) {
+                       if ($workers < $limit[$i++]) {
+                               Logger::log('Priority ' . $priority . ' has got ' . $workers . ' workers out of a limit of ' . $limit[$i - 1], Logger::DEBUG);
+                               return $priority;
+                       }
                }
 
                if (!empty($waiting)) {
-                       return array_shift(array_keys($waiting));
+                       $priority =  array_shift(array_keys($waiting));
+                       Logger::log('No underassigned priority found, now taking the highest priority (' . $priority . ').', Logger::DEBUG);
+                       return $priority;
                }
 
                return false;
@@ -984,89 +942,11 @@ class Worker
        {
                $mypid = getmypid();
 
-               // Check if we should pass some low priority process
-               $highest_priority = 0;
-               $found = false;
                $passing_slow = false;
 
-               // The higher the number of parallel workers, the more we prefetch to prevent concurring access
-               // We decrease the limit with the number of entries left in the queue
-               $worker_queues = Config::get("system", "worker_queues", 4);
-               $queue_length = Config::get('system', 'worker_fetch_limit', 1);
-               $lower_job_limit = $worker_queues * $queue_length * 2;
-               $entries = max($entries - $deferred, 0);
-
-               // Now do some magic
-               $exponent = 2;
-               $slope = $queue_length / pow($lower_job_limit, $exponent);
-               $limit = min($queue_length, ceil($slope * pow($entries, $exponent)));
-
-               Logger::log('Deferred: ' . $deferred . ' - Total: ' . $entries . ' - Maximum: ' . $queue_length . ' - jobs per queue: ' . $limit, Logger::DEBUG);
-
                $ids = self::nextProcess();
                $found = (count($ids) > 0);
 
-               if (!$found && self::passingSlow($highest_priority)) {
-                       // Are there waiting processes with a higher priority than the currently highest?
-                       $stamp = (float)microtime(true);
-                       $result = DBA::select(
-                               'workerqueue',
-                               ['id'],
-                               ["`pid` = 0 AND `priority` < ? AND NOT `done` AND `next_try` < ?",
-                               $highest_priority, DateTimeFormat::utcNow()],
-                               ['limit' => 1, 'order' => ['priority', 'created']]
-                       );
-                       self::$db_duration += (microtime(true) - $stamp);
-
-                       while ($id = DBA::fetch($result)) {
-                               $ids[] = $id["id"];
-                       }
-                       DBA::close($result);
-
-                       $found = (count($ids) > 0);
-
-                       if (!$found) {
-                               // Give slower processes some processing time
-                               $stamp = (float)microtime(true);
-                               $result = DBA::select(
-                                       'workerqueue',
-                                       ['id'],
-                                       ["`pid` = 0 AND `priority` > ? AND NOT `done` AND `next_try` < ?",
-                                       $highest_priority, DateTimeFormat::utcNow()],
-                                       ['limit' => 1, 'order' => ['priority', 'created']]
-                               );
-                               self::$db_duration += (microtime(true) - $stamp);
-
-                               while ($id = DBA::fetch($result)) {
-                                       $ids[] = $id["id"];
-                               }
-                               DBA::close($result);
-
-                               $found = (count($ids) > 0);
-                               $passing_slow = $found;
-                       }
-               }
-
-               // At first try to fetch a bunch of high or medium tasks
-               if (!$found && ($limit > 1)) {
-                       $stamp = (float)microtime(true);
-                       $result = DBA::select(
-                               'workerqueue',
-                               ['id'],
-                               ["`pid` = 0 AND NOT `done` AND `priority` <= ? AND `next_try` < ? AND `retrial` = 0",
-                               PRIORITY_MEDIUM, DateTimeFormat::utcNow()],
-                               ['limit' => $limit, 'order' => ['created']]
-                       );
-                       self::$db_duration += (microtime(true) - $stamp);
-
-                       while ($id = DBA::fetch($result)) {
-                               $ids[] = $id["id"];
-                       }
-                       DBA::close($result);
-
-                       $found = (count($ids) > 0);
-               }
-
                // If there is no result (or we shouldn't pass lower processes) we check without priority limit
                if (!$found) {
                        $stamp = (float)microtime(true);