]> git.mxchange.org Git - friendica.git/blobdiff - src/Core/Worker.php
Merge pull request #10033 from friendica/bug/10032-event-query
[friendica.git] / src / Core / Worker.php
index eb769ad1c03d73f1fd1ecd0d506b7aedd9f31e4d..56c62451b9bfb8d17df722013ea7b00c2d865b05 100644 (file)
@@ -97,6 +97,10 @@ class Worker
 
                // We fetch the next queue entry that is about to be executed
                while ($r = self::workerProcess()) {
+                       if (self::IPCJobsExists(getmypid())) {
+                               self::IPCDeleteJobState(getmypid());
+                       }
+
                        // Don't refetch when a worker fetches tasks for multiple workers
                        $refetched = DI::config()->get('system', 'worker_multiple_fetch');
                        foreach ($r as $entry) {
@@ -417,6 +421,12 @@ class Worker
        {
                $a = DI::app();
 
+               $cooldown = DI::config()->get("system", "worker_cooldown", 0);
+               if ($cooldown > 0) {
+                       Logger::info('Pre execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]);
+                       sleep($cooldown);
+               }
+
                Logger::enableWorker($funcname);
 
                Logger::info("Process start.", ['priority' => $queue["priority"], 'id' => $queue["id"]]);
@@ -489,10 +499,8 @@ class Worker
 
                DI::profiler()->saveLog(DI::logger(), "ID " . $queue["id"] . ": " . $funcname);
 
-               $cooldown = DI::config()->get("system", "worker_cooldown", 0);
-
                if ($cooldown > 0) {
-                       Logger::info('Cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]);
+                       Logger::info('Post execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]);
                        sleep($cooldown);
                }
        }
@@ -1077,95 +1085,6 @@ class Worker
                self::$db_duration_write += (microtime(true) - $stamp);
        }
 
-       /**
-        * Call the front end worker
-        *
-        * @return void
-        * @throws \Friendica\Network\HTTPException\InternalServerErrorException
-        */
-       public static function callWorker()
-       {
-               if (!DI::config()->get("system", "frontend_worker")) {
-                       return;
-               }
-
-               $url = DI::baseUrl() . '/worker';
-               DI::httpRequest()->fetch($url, 1);
-       }
-
-       /**
-        * Call the front end worker if there aren't any active
-        *
-        * @return void
-        * @throws \Friendica\Network\HTTPException\InternalServerErrorException
-        */
-       public static function executeIfIdle()
-       {
-               self::checkDaemonState();
-
-               if (!DI::config()->get("system", "frontend_worker")) {
-                       return;
-               }
-
-               // Do we have "proc_open"? Then we can fork the worker
-               if (function_exists("proc_open")) {
-                       // When was the last time that we called the worker?
-                       // Less than one minute? Then we quit
-                       if ((time() - DI::config()->get("system", "worker_started")) < 60) {
-                               return;
-                       }
-
-                       DI::config()->set("system", "worker_started", time());
-
-                       // Do we have enough running workers? Then we quit here.
-                       if (self::tooMuchWorkers()) {
-                               // Cleaning dead processes
-                               self::killStaleWorkers();
-                               DI::modelProcess()->deleteInactive();
-
-                               return;
-                       }
-
-                       self::runCron();
-
-                       Logger::info('Call worker');
-                       self::spawnWorker();
-                       return;
-               }
-
-               // We cannot execute background processes.
-               // We now run the processes from the frontend.
-               // This won't work with long running processes.
-               self::runCron();
-
-               self::clearProcesses();
-
-               $workers = self::activeWorkers();
-
-               if ($workers == 0) {
-                       self::callWorker();
-               }
-       }
-
-       /**
-        * Removes long running worker processes
-        *
-        * @return void
-        * @throws \Friendica\Network\HTTPException\InternalServerErrorException
-        */
-       public static function clearProcesses()
-       {
-               $timeout = DI::config()->get("system", "frontend_worker_timeout", 10);
-
-               /// @todo We should clean up the corresponding workerqueue entries as well
-               $stamp = (float)microtime(true);
-               $condition = ["`created` < ? AND `command` = 'worker.php'",
-                               DateTimeFormat::utc("now - ".$timeout." minutes")];
-               DBA::delete('process', $condition);
-               self::$db_duration = (microtime(true) - $stamp);
-               self::$db_duration_write += (microtime(true) - $stamp);
-       }
-
        /**
         * Runs the cron processes
         *
@@ -1210,22 +1129,40 @@ class Worker
                } elseif ($pid) {
                        // The parent process continues here
                        DBA::connect();
-                       Logger::info('Spawned new worker', ['cron' => $do_cron, 'pid' => $pid]);
+
+                       self::IPCSetJobState(true, $pid);
+                       Logger::info('Spawned new worker', ['pid' => $pid]);
+
+                       $cycles = 0;
+                       while (self::IPCJobsExists($pid) && (++$cycles < 100)) {
+                               usleep(10000);
+                       }
+
+                       Logger::info('Spawned worker is ready', ['pid' => $pid, 'wait_cycles' => $cycles]);
                        return;
                }
 
                // We now are in the new worker
+               $pid = getmypid();
+
                DBA::connect();
-               Logger::info('Worker spawned', ['cron' => $do_cron, 'pid' => getmypid()]);
+               /// @todo Reinitialize the logger to set a new process_id and uid
+               DI::process()->setPid($pid);
 
-               DI::process()->start();
+               $cycles = 0;
+               while (!self::IPCJobsExists($pid) && (++$cycles < 100)) {
+                       usleep(10000);
+               }
+
+               Logger::info('Worker spawned', ['pid' => $pid, 'wait_cycles' => $cycles]);
 
                self::processQueue($do_cron);
 
                self::unclaimProcess();
 
+               self::IPCSetJobState(false, $pid);
                DI::process()->end();
-               Logger::info('Worker ended', ['cron' => $do_cron, 'pid' => getmypid()]);
+               Logger::info('Worker ended', ['pid' => $pid]);
                exit();
        }
 
@@ -1238,18 +1175,13 @@ class Worker
         */
        public static function spawnWorker($do_cron = false)
        {
-               Logger::notice("Spawn", ['do_cron' => $do_cron, 'callstack' => System::callstack(20)]);
-               // Worker and daemon are started from the command line.
-               // This means that this is executed by a PHP interpreter without runtime limitations
-               if (function_exists('pcntl_fork') && in_array(DI::mode()->getExecutor(), [Mode::DAEMON, Mode::WORKER])) {
+               if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) {
                        self::forkProcess($do_cron);
                } else {
                        $process = new Core\Process(DI::logger(), DI::mode(), DI::config(),
                                DI::modelProcess(), DI::app()->getBasePath(), getmypid());
                        $process->run('bin/worker.php', ['no_cron' => !$do_cron]);
                }
-
-               // after spawning we have to remove the flag.
                if (self::isDaemonMode()) {
                        self::IPCSetJobState(false);
                }
@@ -1461,12 +1393,27 @@ class Worker
         * Set the flag if some job is waiting
         *
         * @param boolean $jobs Is there a waiting job?
+        * @param int $key Key number
         * @throws \Exception
         */
-       public static function IPCSetJobState($jobs)
+       public static function IPCSetJobState(bool $jobs, int $key = 0)
        {
                $stamp = (float)microtime(true);
-               DBA::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true);
+               DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]);
+               self::$db_duration += (microtime(true) - $stamp);
+               self::$db_duration_write += (microtime(true) - $stamp);
+       }
+
+       /**
+        * Delete a key entry
+        *
+        * @param int $key Key number
+        * @throws \Exception
+        */
+       public static function IPCDeleteJobState(int $key)
+       {
+               $stamp = (float)microtime(true);
+               DBA::delete('worker-ipc', ['key' => $key]);
                self::$db_duration += (microtime(true) - $stamp);
                self::$db_duration_write += (microtime(true) - $stamp);
        }
@@ -1474,13 +1421,14 @@ class Worker
        /**
         * Checks if some worker job waits to be executed
         *
+        * @param int $key Key number
         * @return bool
         * @throws \Exception
         */
-       public static function IPCJobsExists()
+       public static function IPCJobsExists(int $key = 0)
        {
                $stamp = (float)microtime(true);
-               $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]);
+               $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]);
                self::$db_duration += (microtime(true) - $stamp);
 
                // When we don't have a row, no job is running
@@ -1511,6 +1459,11 @@ class Worker
                        return $daemon_mode;
                }
 
+               if (!function_exists('pcntl_fork')) {
+                       self::$daemon_mode = false;
+                       return false;
+               }
+
                $pidfile = DI::config()->get('system', 'pidfile');
                if (empty($pidfile)) {
                        // No pid file, no daemon