]> git.mxchange.org Git - friendica.git/commitdiff
Worker: Fetch jobs for multiple workers
authorMichael <heluecht@pirati.ca>
Sat, 29 Aug 2020 09:03:50 +0000 (09:03 +0000)
committerMichael <heluecht@pirati.ca>
Sat, 29 Aug 2020 09:03:50 +0000 (09:03 +0000)
src/Core/Worker.php
static/defaults.config.php

index 1cde61351c0f2fc0ce9a30fdc1f2b694b3226b70..04cd4cc60894f5c1ce286e4b211de402ea9e04c6 100644 (file)
@@ -785,6 +785,29 @@ class Worker
                return $count;
        }
 
+       /**
+        * Returns the number of active worker processes
+        *
+        * @return array List of worker process ids
+        * @throws \Exception
+        */
+       private static function getWorkerPIDList()
+       {
+               $ids = [];
+               $stamp = (float)microtime(true);
+
+               $queues = DBA::p("SELECT `process`.`pid`, COUNT(`workerqueue`.`pid`) AS `entries` FROM `process`
+                       LEFT JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `workerqueue`.`done` 
+                       GROUP BY `process`.`pid`");
+               while ($queue = DBA::fetch($queues)) {
+                       $ids[$queue['pid']] = $queue['entries'];
+               }
+               DBA::close($queues);
+
+               self::$db_duration += (microtime(true) - $stamp);
+               return $ids;
+       }
+
        /**
         * Returns waiting jobs for the current process id
         *
@@ -806,11 +829,11 @@ class Worker
 
        /**
         * Returns the next jobs that should be executed
-        *
+        * @param int $limit
         * @return array array with next jobs
         * @throws \Exception
         */
-       private static function nextProcess()
+       private static function nextProcess(int $limit)
        {
                $priority = self::nextPriority();
                if (empty($priority)) {
@@ -818,8 +841,6 @@ class Worker
                        return [];
                }
 
-               $limit = DI::config()->get('system', 'worker_fetch_limit', 1);
-
                $ids = [];
                $stamp = (float)microtime(true);
                $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
@@ -918,14 +939,30 @@ class Worker
         */
        private static function findWorkerProcesses()
        {
-               $mypid = getmypid();
-
-               $ids = self::nextProcess();
+               $fetch_limit = DI::config()->get('system', 'worker_fetch_limit', 1);
+
+               if (DI::config()->get('system', 'worker_multiple_fetch')) {
+                       $pids = [];
+                       $worker_pids = self::getWorkerPIDList();
+                       foreach ($worker_pids as $pid => $count) {
+                               if ($count <= $fetch_limit) {
+                                       $pids[] = $pid;
+                               }
+                       }
+                       if (empty($pids)) {
+                               return;
+                       }
+                       $limit = $fetch_limit * count($pids);
+               } else {
+                       $pids = [getmypid()];
+                       $limit = $fetch_limit;
+               }
 
-               // If there is no result we check without priority limit
-               if (empty($ids)) {
-                       $limit = DI::config()->get('system', 'worker_fetch_limit', 1);
+               $ids = self::nextProcess($limit);
+               $limit -= count($ids);
 
+               // If there is not enough results we check without priority limit
+               if ($limit > 0) {
                        $stamp = (float)microtime(true);
                        $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
                        $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]);
@@ -943,9 +980,21 @@ class Worker
                }
 
                if (!empty($ids)) {
+                       $worker = [];
+                       foreach (array_unique($ids) as $id) {
+                               $pid = next($pids);
+                               if (!$pid) {
+                                       $pid = reset($pids);
+                               }
+                               $worker[$pid][] = $id;
+                       }
+
                        $stamp = (float)microtime(true);
-                       $condition = ['id' => $ids, 'done' => false, 'pid' => 0];
-                       DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $condition);
+                       foreach ($worker as $worker_pid => $worker_ids) {
+                               Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
+                               DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
+                                       ['id' => $worker_ids, 'done' => false, 'pid' => 0]);
+                       }
                        self::$db_duration += (microtime(true) - $stamp);
                        self::$db_duration_write += (microtime(true) - $stamp);
                }
index 947ac1dc01d4af4c50a26bcd23909e0028d53859..4d595010ce5f92525560d86883be14b78c508a7b 100644 (file)
@@ -499,6 +499,11 @@ return [
                // Setting 0 would allow maximum worker queues at all times, which is not recommended.
                'worker_load_exponent' => 3,
 
+               // worker_multiple_fetch (Boolean)
+               // When activated, the worker fetches jobs for multiple workers (not only for itself).
+               // This is an experimental setting without knowing the performance impact.
+               'worker_multiple_fetch' => false,
+               
                // worker_defer_limit (Integer)
                // Per default the systems tries delivering for 15 times before dropping it.
                'worker_defer_limit' => 15,