]> git.mxchange.org Git - friendica.git/blobdiff - src/Core/Worker.php
Merge pull request #11258 from MrPetovan/bug/11234-categories-profile
[friendica.git] / src / Core / Worker.php
index 3d1648ea45936bfee7b81cb80ac1923c57012b56..d43d8c7281008198beac9b812ee9bd160fbf9d56 100644 (file)
@@ -1,6 +1,6 @@
 <?php
 /**
- * @copyright Copyright (C) 2010-2021, the Friendica project
+ * @copyright Copyright (C) 2010-2022, the Friendica project
  *
  * @license GNU AGPL version 3 or any later version
  *
@@ -23,6 +23,7 @@ namespace Friendica\Core;
 
 use Friendica\App\Mode;
 use Friendica\Core;
+use Friendica\Core\Worker\Entity\Process;
 use Friendica\Database\DBA;
 use Friendica\DI;
 use Friendica\Util\DateTimeFormat;
@@ -51,26 +52,29 @@ class Worker
        private static $last_update;
        private static $state;
        private static $daemon_mode = null;
+       /** @var Process */
+       private static $process;
 
        /**
         * Processes the tasks that are in the workerqueue table
         *
         * @param boolean $run_cron Should the cron processes be executed?
+        * @param Process $process  The current running process
         * @return void
         * @throws \Friendica\Network\HTTPException\InternalServerErrorException
         */
-       public static function processQueue($run_cron = true)
+       public static function processQueue($run_cron, Process $process)
        {
                self::$up_start = microtime(true);
 
                // At first check the maximum load. We shouldn't continue with a high load
-               if (DI::process()->isMaxLoadReached()) {
+               if (DI::system()->isMaxLoadReached()) {
                        Logger::notice('Pre check: maximum load reached, quitting.');
                        return;
                }
 
                // We now start the process. This is done after the load check since this could increase the load.
-               DI::process()->start();
+               self::$process = $process;
 
                // Kill stale processes every 5 minutes
                $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0);
@@ -134,7 +138,7 @@ class Worker
                                        }
 
                                        // Check free memory
-                                       if (DI::process()->isMinMemoryReached()) {
+                                       if (DI::system()->isMinMemoryReached()) {
                                                Logger::warning('Memory limit reached, quitting.');
                                                DI::lock()->release(self::LOCK_WORKER);
                                                return;
@@ -147,7 +151,7 @@ class Worker
                        // Quit the worker once every cron interval
                        if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
                                Logger::info('Process lifetime reached, respawning.');
-                               self::unclaimProcess();
+                               self::unclaimProcess($process);
                                if (self::isDaemonMode()) {
                                        self::IPCSetJobState(true);
                                } else {
@@ -180,7 +184,7 @@ class Worker
                }
 
                // Do we have too few memory?
-               if (DI::process()->isMinMemoryReached()) {
+               if (DI::system()->isMinMemoryReached()) {
                        Logger::warning('Memory limit reached, quitting.');
                        return false;
                }
@@ -192,7 +196,7 @@ class Worker
                }
 
                // Possibly there are too much database processes that block the system
-               if (DI::process()->isMaxProcessesReached()) {
+               if (DI::system()->isMaxProcessesReached()) {
                        Logger::warning('Maximum processes reached, quitting.');
                        return false;
                }
@@ -334,7 +338,7 @@ class Worker
                }
 
                // Constantly check the number of parallel database processes
-               if (DI::process()->isMaxProcessesReached()) {
+               if (DI::system()->isMaxProcessesReached()) {
                        Logger::warning("Max processes reached for process", ['pid' => $mypid]);
                        return false;
                }
@@ -750,7 +754,7 @@ class Worker
                                        }
 
                                        $stamp = (float)microtime(true);
-                                       $jobs = DBA::count('workerqueue', ["`done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE", $interval]);
+                                       $jobs = DBA::count('workerqueue', ["`done` AND `executed` > ?", DateTimeFormat::utc('now - ' . $interval . ' minute')]);
                                        self::$db_duration += (microtime(true) - $stamp);
                                        self::$db_duration_stat += (microtime(true) - $stamp);
                                        $jobs_per_minute[$interval] = number_format($jobs / $interval, 0);
@@ -842,7 +846,7 @@ class Worker
        private static function activeWorkers()
        {
                $stamp = (float)microtime(true);
-               $count = DBA::count('process', ['command' => 'Worker.php']);
+               $count = DI::process()->countCommand('Worker.php');
                self::$db_duration += (microtime(true) - $stamp);
                self::$db_duration_count += (microtime(true) - $stamp);
                return $count;
@@ -1105,15 +1109,15 @@ class Worker
        /**
         * Removes a workerqueue entry from the current process
         *
+        * @param Process $process the process behind the workerqueue
+        *
         * @return void
         * @throws \Exception
         */
-       public static function unclaimProcess()
+       public static function unclaimProcess(Process $process)
        {
-               $mypid = getmypid();
-
                $stamp = (float)microtime(true);
-               DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $mypid, 'done' => false]);
+               DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $process->pid, 'done' => false]);
                self::$db_duration += (microtime(true) - $stamp);
                self::$db_duration_write += (microtime(true) - $stamp);
        }
@@ -1146,7 +1150,7 @@ class Worker
         */
        private static function forkProcess(bool $do_cron)
        {
-               if (DI::process()->isMinMemoryReached()) {
+               if (DI::system()->isMinMemoryReached()) {
                        Logger::warning('Memory limit reached - quitting');
                        return;
                }
@@ -1176,26 +1180,25 @@ class Worker
                }
 
                // We now are in the new worker
-               $pid = getmypid();
-
                DBA::connect();
-               /// @todo Reinitialize the logger to set a new process_id and uid
-               DI::process()->setPid($pid);
+
+               DI::flushLogger();
+               $process = DI::process()->create(getmypid(), basename(__FILE__));
 
                $cycles = 0;
-               while (!self::IPCJobsExists($pid) && (++$cycles < 100)) {
+               while (!self::IPCJobsExists($process->pid) && (++$cycles < 100)) {
                        usleep(10000);
                }
 
-               Logger::info('Worker spawned', ['pid' => $pid, 'wait_cycles' => $cycles]);
+               Logger::info('Worker spawned', ['pid' => $process->pid, 'wait_cycles' => $cycles]);
 
-               self::processQueue($do_cron);
+               self::processQueue($do_cron, $process);
 
-               self::unclaimProcess();
+               self::unclaimProcess($process);
 
-               self::IPCSetJobState(false, $pid);
-               DI::process()->end();
-               Logger::info('Worker ended', ['pid' => $pid]);
+               self::IPCSetJobState(false, $process->pid);
+               DI::process()->delete($process);
+               Logger::info('Worker ended', ['pid' => $process->pid]);
                exit();
        }
 
@@ -1211,9 +1214,7 @@ class Worker
                if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) {
                        self::forkProcess($do_cron);
                } else {
-                       $process = new Core\Process(DI::logger(), DI::mode(), DI::config(),
-                               DI::modelProcess(), DI::app()->getBasePath(), getmypid());
-                       $process->run('bin/worker.php', ['no_cron' => !$do_cron]);
+                       DI::system()->run('bin/worker.php', ['no_cron' => !$do_cron]);
                }
                if (self::isDaemonMode()) {
                        self::IPCSetJobState(false);
@@ -1571,8 +1572,7 @@ class Worker
                Logger::notice('Starting new daemon process');
                $command = 'bin/daemon.php';
                $a = DI::app();
-               $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), $a->getBasePath(), getmypid());
-               $process->run($command, ['start']);
+               DI::system()->run($command, ['start']);
                Logger::notice('New daemon process started');
        }