]> git.mxchange.org Git - friendica.git/commitdiff
Command based pre fetching of worker tasks
authorMichael <heluecht@pirati.ca>
Sat, 23 Mar 2019 06:08:18 +0000 (06:08 +0000)
committerMichael <heluecht@pirati.ca>
Sat, 23 Mar 2019 06:08:18 +0000 (06:08 +0000)
src/Core/Worker.php

index ac5c12fdd21808443cdd461d5618ec4a5730981a..0f4a527f6bd21aa95cb9f387ba1f29e5264aab73 100644 (file)
@@ -27,6 +27,9 @@ class Worker
        const STATE_REFETCH    = 3; // Worker had refetched jobs in the execution loop.
        const STATE_SHORT_LOOP = 4; // Worker is processing preassigned jobs, thus saving much time.
 
+       const FAST_COMMANDS = ['APDelivery', 'Delivery', 'CreateShadowEntry'];
+
+
        private static $up_start;
        private static $db_duration = 0;
        private static $db_duration_count = 0;
@@ -783,23 +786,24 @@ class Worker
                        return [];
                }
 
-               if ($priority <= PRIORITY_MEDIUM) {
-                       $limit = Config::get('system', 'worker_fetch_limit', 1);
-               } else {
-                       $limit = 1;
-               }
+               $limit = Config::get('system', 'worker_fetch_limit', 1);
 
                $ids = [];
                $stamp = (float)microtime(true);
                $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
-               $tasks = DBA::select('workerqueue', ['id'], $condition, ['limit' => $limit, 'order' => ['created']]);
+               $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['created']]);
                self::$db_duration += (microtime(true) - $stamp);
                while ($task = DBA::fetch($tasks)) {
                        $ids[] = $task['id'];
+                       // Only continue that loop while we are storing commands that can be processed quickly
+                       $command = json_decode($task['parameter'])[0];
+                       if (!in_array($command, self::FAST_COMMANDS)) {
+                               break;
+                       }
                }
                DBA::close($tasks);
 
-               Logger::info('Found:', ['id' => $ids, 'priority' => $priority]);
+               Logger::info('Found:', ['priority' => $priority, 'id' => $ids]);
                return $ids;
        }
 
@@ -890,15 +894,22 @@ class Worker
 
                // If there is no result we check without priority limit
                if (empty($ids)) {
+                       $limit = Config::get('system', 'worker_fetch_limit', 1);
+
                        $stamp = (float)microtime(true);
                        $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
-                       $result = DBA::select('workerqueue', ['id'], $condition, ['limit' => 1, 'order' => ['priority', 'created']]);
+                       $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]);
                        self::$db_duration += (microtime(true) - $stamp);
 
-                       while ($id = DBA::fetch($result)) {
-                               $ids[] = $id["id"];
+                       while ($task = DBA::fetch($tasks)) {
+                               $ids[] = $task['id'];
+                               // Only continue that loop while we are storing commands that can be processed quickly
+                               $command = json_decode($task['parameter'])[0];
+                               if (!in_array($command, self::FAST_COMMANDS)) {
+                                       break;
+                               }
                        }
-                       DBA::close($result);
+                       DBA::close($tasks);
                }
 
                if (!empty($ids)) {