]> git.mxchange.org Git - friendica.git/commitdiff
Wait for child being ready
authorMichael <heluecht@pirati.ca>
Tue, 5 Jan 2021 10:18:25 +0000 (10:18 +0000)
committerMichael <heluecht@pirati.ca>
Tue, 5 Jan 2021 10:18:25 +0000 (10:18 +0000)
src/Core/Worker.php
static/defaults.config.php

index bb17e430c14ab736dee2934b9bc010ad22050e95..f84dd294752fba3f8aeda4f7601f2ab478531d3a 100644 (file)
@@ -94,101 +94,70 @@ class Worker
 
                $last_check = $starttime = time();
                self::$state = self::STATE_STARTUP;
-               $wait_interval = self::isDaemonMode() ? 360 : 10;
-               $start = time();
-
-               do {
-                       // We fetch the next queue entry that is about to be executed
-                       while ($r = self::workerProcess()) {
-                               // Don't refetch when a worker fetches tasks for multiple workers
-                               $refetched = DI::config()->get('system', 'worker_multiple_fetch');
-                               foreach ($r as $entry) {
-                                       // Assure that the priority is an integer value
-                                       $entry['priority'] = (int)$entry['priority'];
-
-                                       // The work will be done
-                                       if (!self::execute($entry)) {
-                                               Logger::notice('Process execution failed, quitting.');
-                                               return;
-                                       }
-
-                                       // Trying to fetch new processes - but only once when successful
-                                       if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) {
-                                               self::findWorkerProcesses();
-                                               DI::lock()->release(self::LOCK_PROCESS);
-                                               self::$state = self::STATE_REFETCH;
-                                               $refetched = true;
-                                       } else {
-                                               self::$state = self::STATE_SHORT_LOOP;
-                                       }
-                               }
 
-                               // To avoid the quitting of multiple workers only one worker at a time will execute the check
-                               if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) {
-                                       self::$state = self::STATE_LONG_LOOP;
-
-                                       if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
-                                       // Count active workers and compare them with a maximum value that depends on the load
-                                               if (self::tooMuchWorkers()) {
-                                                       Logger::notice('Active worker limit reached, quitting.');
-                                                       DI::lock()->release(self::LOCK_WORKER);
-                                                       return;
-                                               }
-
-                                               // Check free memory
-                                               if (DI::process()->isMinMemoryReached()) {
-                                                       Logger::warning('Memory limit reached, quitting.');
-                                                       DI::lock()->release(self::LOCK_WORKER);
-                                                       return;
-                                               }
-                                               DI::lock()->release(self::LOCK_WORKER);
-                                       }
-                                       $last_check = time();
+               // We fetch the next queue entry that is about to be executed
+               while ($r = self::workerProcess()) {
+                       if (self::IPCJobsExists(getmypid())) {
+                               self::IPCSetJobState(false, getmypid());
+                       }
+                       // Don't refetch when a worker fetches tasks for multiple workers
+                       $refetched = DI::config()->get('system', 'worker_multiple_fetch');
+                       foreach ($r as $entry) {
+                               // Assure that the priority is an integer value
+                               $entry['priority'] = (int)$entry['priority'];
+
+                               // The work will be done
+                               if (!self::execute($entry)) {
+                                       Logger::notice('Process execution failed, quitting.');
+                                       return;
                                }
 
-                               // Quit the worker once every cron interval
-                               if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
-                                       Logger::info('Process lifetime reached, respawning.');
-                                       self::unclaimProcess();
-                                       if (self::isDaemonMode()) {
-                                               self::IPCSetJobState(true);
-                                       } else {
-                                               self::spawnWorker();
-                                       }
-                                       return;
+                               // Trying to fetch new processes - but only once when successful
+                               if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) {
+                                       self::findWorkerProcesses();
+                                       DI::lock()->release(self::LOCK_PROCESS);
+                                       self::$state = self::STATE_REFETCH;
+                                       $refetched = true;
+                               } else {
+                                       self::$state = self::STATE_SHORT_LOOP;
                                }
-                               $start = time();
                        }
 
-                       $seconds = (time() - $start);
+                       // To avoid the quitting of multiple workers only one worker at a time will execute the check
+                       if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) {
+                               self::$state = self::STATE_LONG_LOOP;
 
-                       // logarithmic wait time calculation.
-                       $arg = (($seconds + 1) / ($wait_interval / 9)) + 1;
-                       $sleep = min(1000000, round(log10($arg) * 1000000, 0));
-                       usleep($sleep);
-
-                       $timeout = ($seconds >= $wait_interval);
-                       Logger::info('Timeout', ['timeout' => $timeout, 'seconds' => $seconds, 'sleep' => $sleep]);
+                               if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
+                               // Count active workers and compare them with a maximum value that depends on the load
+                                       if (self::tooMuchWorkers()) {
+                                               Logger::notice('Active worker limit reached, quitting.');
+                                               DI::lock()->release(self::LOCK_WORKER);
+                                               return;
+                                       }
 
-                       if (!$timeout) {
-                               if (DI::process()->isMaxLoadReached()) {
-                                       Logger::notice('maximum load reached, quitting.');
-                                       return;
+                                       // Check free memory
+                                       if (DI::process()->isMinMemoryReached()) {
+                                               Logger::warning('Memory limit reached, quitting.');
+                                               DI::lock()->release(self::LOCK_WORKER);
+                                               return;
+                                       }
+                                       DI::lock()->release(self::LOCK_WORKER);
                                }
+                               $last_check = time();
+                       }
 
-                               // Kill stale processes every 5 minutes
-                               $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0);
-                               if (time() > ($last_cleanup + 300)) {
-                                       DI::config()->set('system', 'worker_last_cleaned', time());
-                                       self::killStaleWorkers();
+                       // Quit the worker once every cron interval
+                       if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
+                               Logger::info('Process lifetime reached, respawning.');
+                               self::unclaimProcess();
+                               if (self::isDaemonMode()) {
+                                       self::IPCSetJobState(true);
+                               } else {
+                                       self::spawnWorker();
                                }
-
-                               // Check if the system is ready
-                               if (!self::isReady()) {
-                                       return;
-                               }               
+                               return;
                        }
-               } while (!$timeout);
+               }
 
                // Cleaning up. Possibly not needed, but it doesn't harm anything.
                if (self::isDaemonMode()) {
@@ -1248,33 +1217,36 @@ class Worker
                } elseif ($pid) {
                        // The parent process continues here
                        DBA::connect();
-                       Logger::info('Spawned new worker', ['cron' => $do_cron, 'pid' => $pid]);
+                       Logger::info('Spawned new worker', ['pid' => $pid]);
+                       self::IPCSetJobState(true, $pid);
+
+                       $cycles = 0;
+                       while (self::IPCJobsExists($pid) && (++$cycles < 100)) {
+                               usleep(10000);
+                       }
+
+                       Logger::info('Spawned worker is ready', ['pid' => $pid, 'wait_cycles' => $cycles]);
                        return;
                }
 
                // We now are in the new worker
                DBA::connect();
-               Logger::info('Worker spawned', ['cron' => $do_cron, 'pid' => getmypid()]);
+               Logger::info('Worker spawned', ['pid' => getmypid()]);
 
-               DI::process()->start();
+               $cycles = 0;
+               while (!self::IPCJobsExists($pid) && (++$cycles < 100)) {
+                       usleep(10000);
+               }
+
+               Logger::info('Parent is ready', ['pid' => getmypid(), 'wait_cycles' => $cycles]);
 
                self::processQueue($do_cron);
 
                self::unclaimProcess();
 
+               self::IPCSetJobState(false, getmypid());
                DI::process()->end();
-               Logger::info('Worker ended', ['cron' => $do_cron, 'pid' => getmypid()]);
-
-               DBA::disconnect();
-/*
-               $php = '/usr/bin/php';
-               $param = ['bin/worker.php'];
-               if ($do_cron) {
-                       $param[] = 'no_cron';
-               }
-               pcntl_exec($php, $param);
-               Logger::warning('Error calling worker', ['cron' => $do_cron, 'pid' => getmypid()]);
-*/
+               Logger::info('Worker ended', ['pid' => getmypid()]);
                exit();
        }
 
@@ -1287,14 +1259,16 @@ class Worker
         */
        public static function spawnWorker($do_cron = false)
        {
-               if (self::isDaemonMode()) {
+               if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) {
                        self::forkProcess($do_cron);
-                       self::IPCSetJobState(false);
                } else {
                        $process = new Core\Process(DI::logger(), DI::mode(), DI::config(),
                                DI::modelProcess(), DI::app()->getBasePath(), getmypid());
                        $process->run('bin/worker.php', ['no_cron' => !$do_cron]);
                }
+               if (self::isDaemonMode()) {
+                       self::IPCSetJobState(false);
+               }
        }
 
        /**
@@ -1505,10 +1479,10 @@ class Worker
         * @param boolean $jobs Is there a waiting job?
         * @throws \Exception
         */
-       public static function IPCSetJobState($jobs)
+       public static function IPCSetJobState(bool $jobs, int $key = 0)
        {
                $stamp = (float)microtime(true);
-               DBA::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true);
+               DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]);
                self::$db_duration += (microtime(true) - $stamp);
                self::$db_duration_write += (microtime(true) - $stamp);
        }
@@ -1519,10 +1493,10 @@ class Worker
         * @return bool
         * @throws \Exception
         */
-       public static function IPCJobsExists()
+       public static function IPCJobsExists(int $key = 0)
        {
                $stamp = (float)microtime(true);
-               $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]);
+               $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]);
                self::$db_duration += (microtime(true) - $stamp);
 
                // When we don't have a row, no job is running
index 310d1ea08e77da527e609c0f13992b2ea7f7aa03..ed0f8f871a32de45f97effcfc0a2c61d7f0c39bf 100644 (file)
@@ -538,6 +538,10 @@ return [
                // Number of worker tasks that are fetched in a single query.
                'worker_fetch_limit' => 1,
 
+               // worker_fork (Boolean)
+               // Experimental setting. use pcntl_fork to spawn a new worker process
+               'worker_fork' => false,
+
                // worker_jpm (Boolean)
                // If enabled, it prints out the jobs per minute.
                'worker_jpm' => false,