]> git.mxchange.org Git - friendica.git/blobdiff - src/Core/Worker.php
Add search types
[friendica.git] / src / Core / Worker.php
index 0a97c11c13724d25db490fab2b78cbb63a89bab2..0f4a527f6bd21aa95cb9f387ba1f29e5264aab73 100644 (file)
@@ -22,6 +22,14 @@ use Friendica\Util\Network;
  */
 class Worker
 {
+       const STATE_STARTUP    = 1; // Worker is in startup. This takes most time.
+       const STATE_LONG_LOOP  = 2; // Worker is processing the whole - long - loop.
+       const STATE_REFETCH    = 3; // Worker had refetched jobs in the execution loop.
+       const STATE_SHORT_LOOP = 4; // Worker is processing preassigned jobs, thus saving much time.
+
+       const FAST_COMMANDS = ['APDelivery', 'Delivery', 'CreateShadowEntry'];
+
+
        private static $up_start;
        private static $db_duration = 0;
        private static $db_duration_count = 0;
@@ -29,6 +37,7 @@ class Worker
        private static $db_duration_stat = 0;
        private static $lock_duration = 0;
        private static $last_update;
+       private static $state;
 
        /**
         * @brief Processes the tasks that are in the workerqueue table
@@ -92,9 +101,11 @@ class Worker
                }
 
                $starttime = time();
+               self::$state = self::STATE_STARTUP;
 
                // We fetch the next queue entry that is about to be executed
                while ($r = self::workerProcess()) {
+                       $refetched = false;
                        foreach ($r as $entry) {
                                // Assure that the priority is an integer value
                                $entry['priority'] = (int)$entry['priority'];
@@ -105,34 +116,43 @@ class Worker
                                        return;
                                }
 
-                               // If possible we will fetch new jobs for this worker
-                               if (!self::getWaitingJobForPID() && Lock::acquire('worker_process', 0)) {
+                               // Trying to fetch new processes - but only once when successful
+                               if (!$refetched && Lock::acquire('worker_process', 0)) {
                                        self::findWorkerProcesses();
                                        Lock::release('worker_process');
+                                       self::$state = self::STATE_REFETCH;
+                                       $refetched = true;
+                               } else {
+                                       self::$state = self::STATE_SHORT_LOOP;
                                }
                        }
 
                        // To avoid the quitting of multiple workers only one worker at a time will execute the check
-                       if (Lock::acquire('worker', 0)) {
+                       if (!self::getWaitingJobForPID()) {
+                               self::$state = self::STATE_LONG_LOOP;
+
+                               if (Lock::acquire('worker', 0)) {
                                // Count active workers and compare them with a maximum value that depends on the load
-                               if (self::tooMuchWorkers()) {
-                                       Logger::log('Active worker limit reached, quitting.', Logger::DEBUG);
-                                       Lock::release('worker');
-                                       return;
-                               }
+                                       if (self::tooMuchWorkers()) {
+                                               Logger::log('Active worker limit reached, quitting.', Logger::DEBUG);
+                                               Lock::release('worker');
+                                               return;
+                                       }
 
-                               // Check free memory
-                               if ($a->isMinMemoryReached()) {
-                                       Logger::log('Memory limit reached, quitting.', Logger::DEBUG);
+                                       // Check free memory
+                                       if ($a->isMinMemoryReached()) {
+                                               Logger::log('Memory limit reached, quitting.', Logger::DEBUG);
+                                               Lock::release('worker');
+                                               return;
+                                       }
                                        Lock::release('worker');
-                                       return;
                                }
-                               Lock::release('worker');
                        }
 
-                       // Quit the worker once every 5 minutes
-                       if (time() > ($starttime + 300)) {
-                               Logger::log('Process lifetime reached, quitting.', Logger::DEBUG);
+                       // Quit the worker once every cron interval
+                       if (time() > ($starttime + (Config::get('system', 'cron_interval') * 60))) {
+                               Logger::info('Process lifetime reached, respawning.');
+                               self::spawnWorker();
                                return;
                        }
                }
@@ -394,15 +414,16 @@ class Worker
                 * The execution time is the productive time.
                 * By changing parameters like the maximum number of workers we can check the effectivness.
                */
-               $dbtotal = number_format(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 4);
-               $dbcount = number_format(self::$db_duration_count, 4);
-               $dbstat  = number_format(self::$db_duration_stat, 4);
-               $dbwrite = number_format(self::$db_duration_write, 4);
-               $dblock  = number_format(self::$lock_duration, 4);
-               $rest    = number_format(max(0, $up_duration - (self::$db_duration + self::$lock_duration)), 4);
-               $exec    = number_format($duration, 4);
+               $dbtotal = round(self::$db_duration, 2);
+               $dbread  = round(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 2);
+               $dbcount = round(self::$db_duration_count, 2);
+               $dbstat  = round(self::$db_duration_stat, 2);
+               $dbwrite = round(self::$db_duration_write, 2);
+               $dblock  = round(self::$lock_duration, 2);
+               $rest    = round(max(0, $up_duration - (self::$db_duration + self::$lock_duration)), 2);
+               $exec    = round($duration, 2);
 
-               $logger->info('Performance log.', ['total' => $dbtotal, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'block' => $dblock, 'rest' => $rest, 'exec' => $exec]);
+               $logger->info('Performance:', ['state' => self::$state, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'lock' => $dblock, 'total' => $dbtotal, 'rest' => $rest, 'exec' => $exec]);
 
                self::$up_start = microtime(true);
                self::$db_duration = 0;
@@ -412,16 +433,16 @@ class Worker
                self::$lock_duration = 0;
 
                if ($duration > 3600) {
-                       $logger->info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' =>   round($duration/60, 3)]);
+                       $logger->info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
                } elseif ($duration > 600) {
-                       $logger->info('Longer than 10 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' =>   round($duration/60, 3)]);
+                       $logger->info('Longer than 10 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
                } elseif ($duration > 300) {
-                       $logger->info('Longer than 5 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' =>   round($duration/60, 3)]);
+                       $logger->info('Longer than 5 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
                } elseif ($duration > 120) {
-                       $logger->info('Longer than 2 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' =>   round($duration/60, 3)]);
+                       $logger->info('Longer than 2 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
                }
 
-               $workerLogger->info('Process done. ', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' =>   number_format($duration, 4)]);
+               $workerLogger->info('Process done.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration, 3)]);
 
                $a->getProfiler()->saveLog($a->getLogger(), "ID " . $queue["id"] . ": " . $funcname);
 
@@ -596,7 +617,7 @@ class Worker
         */
        private static function tooMuchWorkers()
        {
-               $queues = Config::get("system", "worker_queues", 4);
+               $queues = Config::get("system", "worker_queues", 10);
 
                $maxqueues = $queues;
 
@@ -605,7 +626,7 @@ class Worker
                // Decrease the number of workers at higher load
                $load = System::currentLoad();
                if ($load) {
-                       $maxsysload = intval(Config::get("system", "maxloadavg", 50));
+                       $maxsysload = intval(Config::get("system", "maxloadavg", 20));
 
                        /* Default exponent 3 causes queues to rapidly decrease as load increases.
                         * If you have 20 max queues at idle, then you get only 5 queues at 37.1% of $maxsysload.
@@ -687,7 +708,7 @@ class Worker
 
                        $processlist .= ' ('.implode(', ', $listitem).')';
 
-                       if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && self::entriesExists() && ($active >= $queues)) {
+                       if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) {
                                $top_priority = self::highestPriority();
                                $high_running = self::processWithPriorityActive($top_priority);
 
@@ -700,7 +721,7 @@ class Worker
                        Logger::log("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $waiting_processes . $processlist . " - maximum: " . $queues . "/" . $maxqueues, Logger::DEBUG);
 
                        // Are there fewer workers running as possible? Then fork a new one.
-                       if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) {
+                       if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) {
                                Logger::log("Active workers: ".$active."/".$queues." Fork a new worker.", Logger::DEBUG);
                                if (Config::get('system', 'worker_daemon_mode', false)) {
                                        self::IPCSetJobState(true);
@@ -765,23 +786,24 @@ class Worker
                        return [];
                }
 
-               if ($priority <= PRIORITY_MEDIUM) {
-                       $limit = Config::get('system', 'worker_fetch_limit', 1);
-               } else {
-                       $limit = 1;
-               }
+               $limit = Config::get('system', 'worker_fetch_limit', 1);
 
                $ids = [];
                $stamp = (float)microtime(true);
                $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
-               $tasks = DBA::select('workerqueue', ['id'], $condition, ['limit' => $limit, 'order' => ['created']]);
+               $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['created']]);
                self::$db_duration += (microtime(true) - $stamp);
                while ($task = DBA::fetch($tasks)) {
                        $ids[] = $task['id'];
+                       // Only continue that loop while we are storing commands that can be processed quickly
+                       $command = json_decode($task['parameter'])[0];
+                       if (!in_array($command, self::FAST_COMMANDS)) {
+                               break;
+                       }
                }
                DBA::close($tasks);
 
-               Logger::info('Found:', ['id' => $ids, 'priority' => $priority]);
+               Logger::info('Found:', ['priority' => $priority, 'id' => $ids]);
                return $ids;
        }
 
@@ -850,7 +872,7 @@ class Worker
                }
 
                if (!empty($waiting)) {
-                       $priority =  array_shift(array_keys($waiting));
+                       $priority = array_keys($waiting)[0];
                        Logger::info('No underassigned priority found, now taking the highest priority.', ['priority' => $priority]);
                        return $priority;
                }
@@ -872,15 +894,22 @@ class Worker
 
                // If there is no result we check without priority limit
                if (empty($ids)) {
+                       $limit = Config::get('system', 'worker_fetch_limit', 1);
+
                        $stamp = (float)microtime(true);
                        $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
-                       $result = DBA::select('workerqueue', ['id'], $condition, ['limit' => 1, 'order' => ['priority', 'created']]);
+                       $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]);
                        self::$db_duration += (microtime(true) - $stamp);
 
-                       while ($id = DBA::fetch($result)) {
-                               $ids[] = $id["id"];
+                       while ($task = DBA::fetch($tasks)) {
+                               $ids[] = $task['id'];
+                               // Only continue that loop while we are storing commands that can be processed quickly
+                               $command = json_decode($task['parameter'])[0];
+                               if (!in_array($command, self::FAST_COMMANDS)) {
+                                       break;
+                               }
                        }
-                       DBA::close($result);
+                       DBA::close($tasks);
                }
 
                if (!empty($ids)) {