]> git.mxchange.org Git - friendica.git/blobdiff - src/Core/Worker.php
Add expected method MailBuilder->withHeaders
[friendica.git] / src / Core / Worker.php
index 04cd4cc60894f5c1ce286e4b211de402ea9e04c6..426389ab00c7a706393902736a58e4cf89accd2b 100644 (file)
@@ -22,6 +22,7 @@
 namespace Friendica\Core;
 
 use Friendica\Core;
+use Friendica\Core\Process as ProcessAlias;
 use Friendica\Database\DBA;
 use Friendica\DI;
 use Friendica\Model\Process;
@@ -67,12 +68,12 @@ class Worker
 
                // At first check the maximum load. We shouldn't continue with a high load
                if (DI::process()->isMaxLoadReached()) {
-                       Logger::info('Pre check: maximum load reached, quitting.');
+                       Logger::notice('Pre check: maximum load reached, quitting.');
                        return;
                }
 
                // We now start the process. This is done after the load check since this could increase the load.
-               self::startProcess();
+               DI::process()->start();
 
                // Kill stale processes every 5 minutes
                $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0);
@@ -91,12 +92,13 @@ class Worker
                        self::runCron();
                }
 
-               $starttime = time();
+               $last_check = $starttime = time();
                self::$state = self::STATE_STARTUP;
 
                // We fetch the next queue entry that is about to be executed
                while ($r = self::workerProcess()) {
-                       $refetched = false;
+                       // Don't refetch when a worker fetches tasks for multiple workers
+                       $refetched = DI::config()->get('system', 'worker_multiple_fetch');
                        foreach ($r as $entry) {
                                // Assure that the priority is an integer value
                                $entry['priority'] = (int)$entry['priority'];
@@ -119,7 +121,7 @@ class Worker
                        }
 
                        // To avoid the quitting of multiple workers only one worker at a time will execute the check
-                       if (!self::getWaitingJobForPID()) {
+                       if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) {
                                self::$state = self::STATE_LONG_LOOP;
 
                                if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
@@ -132,17 +134,19 @@ class Worker
 
                                        // Check free memory
                                        if (DI::process()->isMinMemoryReached()) {
-                                               Logger::info('Memory limit reached, quitting.');
+                                               Logger::notice('Memory limit reached, quitting.');
                                                DI::lock()->release(self::LOCK_WORKER);
                                                return;
                                        }
                                        DI::lock()->release(self::LOCK_WORKER);
                                }
+                               $last_check = time();
                        }
 
                        // Quit the worker once every cron interval
                        if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
                                Logger::info('Process lifetime reached, respawning.');
+                               self::unclaimProcess();
                                self::spawnWorker();
                                return;
                        }
@@ -172,19 +176,19 @@ class Worker
 
                // Do we have too few memory?
                if (DI::process()->isMinMemoryReached()) {
-                       Logger::info('Memory limit reached, quitting.');
+                       Logger::notice('Memory limit reached, quitting.');
                        return false;
                }
 
                // Possibly there are too much database connections
                if (self::maxConnectionsReached()) {
-                       Logger::info('Maximum connections reached, quitting.');
+                       Logger::notice('Maximum connections reached, quitting.');
                        return false;
                }
 
                // Possibly there are too much database processes that block the system
                if (DI::process()->isMaxProcessesReached()) {
-                       Logger::info('Maximum processes reached, quitting.');
+                       Logger::notice('Maximum processes reached, quitting.');
                        return false;
                }
                
@@ -282,25 +286,25 @@ class Worker
 
                // Quit when in maintenance
                if (DI::config()->get('system', 'maintenance', false, true)) {
-                       Logger::info("Maintenance mode - quit process", ['pid' => $mypid]);
+                       Logger::notice("Maintenance mode - quit process", ['pid' => $mypid]);
                        return false;
                }
 
                // Constantly check the number of parallel database processes
                if (DI::process()->isMaxProcessesReached()) {
-                       Logger::info("Max processes reached for process", ['pid' => $mypid]);
+                       Logger::notice("Max processes reached for process", ['pid' => $mypid]);
                        return false;
                }
 
                // Constantly check the number of available database connections to let the frontend be accessible at any time
                if (self::maxConnectionsReached()) {
-                       Logger::info("Max connection reached for process", ['pid' => $mypid]);
+                       Logger::notice("Max connection reached for process", ['pid' => $mypid]);
                        return false;
                }
 
                $argv = json_decode($queue["parameter"], true);
                if (empty($argv)) {
-                       Logger::error('Parameter is empty', ['queue' => $queue]);
+                       Logger::warning('Parameter is empty', ['queue' => $queue]);
                        return false;
                }
 
@@ -344,7 +348,7 @@ class Worker
                }
 
                if (!validate_include($include)) {
-                       Logger::log("Include file ".$argv[0]." is not valid!");
+                       Logger::warning("Include file is not valid", ['file' => $argv[0]]);
                        $stamp = (float)microtime(true);
                        DBA::delete('workerqueue', ['id' => $queue["id"]]);
                        self::$db_duration = (microtime(true) - $stamp);
@@ -381,7 +385,7 @@ class Worker
                        self::$db_duration = (microtime(true) - $stamp);
                        self::$db_duration_write += (microtime(true) - $stamp);
                } else {
-                       Logger::log("Function ".$funcname." does not exist");
+                       Logger::warning("Function does not exist", ['function' => $funcname]);
                        $stamp = (float)microtime(true);
                        DBA::delete('workerqueue', ['id' => $queue["id"]]);
                        self::$db_duration = (microtime(true) - $stamp);
@@ -529,7 +533,7 @@ class Worker
                        $level = ($used / $max) * 100;
 
                        if ($level >= $maxlevel) {
-                               Logger::log("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max);
+                               Logger::notice("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max);
                                return true;
                        }
                }
@@ -559,7 +563,7 @@ class Worker
                if ($level < $maxlevel) {
                        return false;
                }
-               Logger::log("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
+               Logger::notice("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
                return true;
        }
 
@@ -611,7 +615,7 @@ class Worker
                                // How long is the process already running?
                                $duration = (time() - strtotime($entry["executed"])) / 60;
                                if ($duration > $max_duration) {
-                                       Logger::log("Worker process ".$entry["pid"]." (".substr(json_encode($argv), 0, 50).") took more than ".$max_duration." minutes. It will be killed now.");
+                                       Logger::notice("Worker process ".$entry["pid"]." (".substr(json_encode($argv), 0, 50).") took more than ".$max_duration." minutes. It will be killed now.");
                                        posix_kill($entry["pid"], SIGTERM);
 
                                        // We killed the stale process.
@@ -782,6 +786,7 @@ class Worker
                $stamp = (float)microtime(true);
                $count = DBA::count('process', ['command' => 'Worker.php']);
                self::$db_duration += (microtime(true) - $stamp);
+               self::$db_duration_count += (microtime(true) - $stamp);
                return $count;
        }
 
@@ -805,6 +810,7 @@ class Worker
                DBA::close($queues);
 
                self::$db_duration += (microtime(true) - $stamp);
+               self::$db_duration_count += (microtime(true) - $stamp);
                return $ids;
        }
 
@@ -943,8 +949,7 @@ class Worker
 
                if (DI::config()->get('system', 'worker_multiple_fetch')) {
                        $pids = [];
-                       $worker_pids = self::getWorkerPIDList();
-                       foreach ($worker_pids as $pid => $count) {
+                       foreach (self::getWorkerPIDList() as $pid => $count) {
                                if ($count <= $fetch_limit) {
                                        $pids[] = $pid;
                                }
@@ -979,27 +984,28 @@ class Worker
                        DBA::close($tasks);
                }
 
-               if (!empty($ids)) {
-                       $worker = [];
-                       foreach (array_unique($ids) as $id) {
-                               $pid = next($pids);
-                               if (!$pid) {
-                                       $pid = reset($pids);
-                               }
-                               $worker[$pid][] = $id;
-                       }
+               if (empty($ids)) {
+                       return;
+               }
 
-                       $stamp = (float)microtime(true);
-                       foreach ($worker as $worker_pid => $worker_ids) {
-                               Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
-                               DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
-                                       ['id' => $worker_ids, 'done' => false, 'pid' => 0]);
+               // Assign the task ids to the workers
+               $worker = [];
+               foreach (array_unique($ids) as $id) {
+                       $pid = next($pids);
+                       if (!$pid) {
+                               $pid = reset($pids);
                        }
-                       self::$db_duration += (microtime(true) - $stamp);
-                       self::$db_duration_write += (microtime(true) - $stamp);
+                       $worker[$pid][] = $id;
                }
 
-               return !empty($ids);
+               $stamp = (float)microtime(true);
+               foreach ($worker as $worker_pid => $worker_ids) {
+                       Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
+                       DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
+                               ['id' => $worker_ids, 'done' => false, 'pid' => 0]);
+               }
+               self::$db_duration += (microtime(true) - $stamp);
+               self::$db_duration_write += (microtime(true) - $stamp);
        }
 
        /**
@@ -1022,17 +1028,11 @@ class Worker
                }
                self::$lock_duration += (microtime(true) - $stamp);
 
-               $found = self::findWorkerProcesses();
+               self::findWorkerProcesses();
 
                DI::lock()->release(self::LOCK_PROCESS);
 
-               if ($found) {
-                       $stamp = (float)microtime(true);
-                       $r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]);
-                       self::$db_duration += (microtime(true) - $stamp);
-                       return DBA::toArray($r);
-               }
-               return false;
+               return self::getWaitingJobForPID();
        }
 
        /**
@@ -1093,7 +1093,7 @@ class Worker
                        if (self::tooMuchWorkers()) {
                                // Cleaning dead processes
                                self::killStaleWorkers();
-                               Process::deleteInactive();
+                               DI::modelProcess()->deleteInactive();
 
                                return;
                        }
@@ -1172,7 +1172,7 @@ class Worker
                $args = ['no_cron' => !$do_cron];
 
                $a = DI::app();
-               $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), $a->getBasePath());
+               $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), $a->getBasePath(), getmypid());
                $process->run($command, $args);
 
                // after spawning we have to remove the flag.
@@ -1361,31 +1361,6 @@ class Worker
                return true;
        }
 
-       /**
-        * Log active processes into the "process" table
-        */
-       public static function startProcess()
-       {
-               $trace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1);
-
-               $command = basename($trace[0]['file']);
-
-               Process::deleteInactive();
-
-               Process::insert($command);
-       }
-
-       /**
-        * Remove the active process from the "process" table
-        *
-        * @return bool
-        * @throws \Exception
-        */
-       public static function endProcess()
-       {
-               return Process::deleteByPid();
-       }
-
        /**
         * Set the flag if some job is waiting
         *