]> git.mxchange.org Git - friendica.git/commitdiff
Workers can now be started exclusively from the daemon and other workers
authorMichael <heluecht@pirati.ca>
Fri, 1 Jun 2018 22:09:27 +0000 (22:09 +0000)
committerMichael <heluecht@pirati.ca>
Fri, 1 Jun 2018 22:09:27 +0000 (22:09 +0000)
bin/daemon.php
include/dba.php
src/Core/Worker.php

index b51dd392ef7fdd1e7c831871ab15a55f38df4575..8ba86b3de08ce29b4ce6a2f476a7b1283e9afbfb 100755 (executable)
@@ -6,8 +6,38 @@
  *
  * This script was taken from http://php.net/manual/en/function.pcntl-fork.php
  */
-function shutdown() {
-       posix_kill(posix_getpid(), SIGHUP);
+
+use Friendica\App;
+use Friendica\BaseObject;
+use Friendica\Core\Config;
+use Friendica\Core\Worker;
+
+// Ensure that daemon.php is executed from the base path of the installation
+if (!file_exists("boot.php") && (sizeof($_SERVER["argv"]) != 0)) {
+       $directory = dirname($_SERVER["argv"][0]);
+
+       if (substr($directory, 0, 1) != "/") {
+               $directory = $_SERVER["PWD"]."/".$directory;
+       }
+       $directory = realpath($directory."/..");
+
+       chdir($directory);
+}
+
+require_once "boot.php";
+require_once "include/dba.php";
+
+$a = new App(dirname(__DIR__));
+BaseObject::setApp($a);
+
+require_once ".htconfig.php";
+dba::connect($db_host, $db_user, $db_pass, $db_data);
+
+Config::load();
+
+if (!isset($pidfile)) {
+       die('Please specify a pid file in the variable $pidfile in the .htconfig.php. For example:'."\n".
+               '$pidfile = "/path/to/daemon.pid";'."\n");
 }
 
 if (in_array("start", $_SERVER["argv"])) {
@@ -30,27 +60,11 @@ if (empty($_SERVER["argv"][0])) {
        die("Unexpected script behaviour. This message should never occur.\n");
 }
 
-// Fetch the base directory
-$directory = dirname($_SERVER["argv"][0]);
+$pid = @file_get_contents($pidfile);
 
-if (substr($directory, 0, 1) != "/") {
-       $directory = $_SERVER["PWD"]."/".$directory;
-}
-$directory = realpath($directory."/..");
-
-include $directory."/.htconfig.php";
-
-if (!isset($pidfile)) {
-       die('Please specify a pid file in the variable $pidfile in the .htconfig.php. For example:'."\n".
-               '$pidfile = "/path/to/daemon.pid";'."\n");
-}
-
-if (in_array($mode, array("stop", "status"))) {
-       $pid = @file_get_contents($pidfile);
-
-       if (!$pid) {
-               die("Pidfile wasn't found. Is the daemon running?\n");
-       }
+if (empty($pid) && in_array($mode, ["stop", "status"])) {
+       Config::set('system', 'worker_daemon_mode', false);
+       die("Pidfile wasn't found. Is the daemon running?\n");
 }
 
 if ($mode == "status") {
@@ -60,6 +74,7 @@ if ($mode == "status") {
 
        unlink($pidfile);
 
+       Config::set('system', 'worker_daemon_mode', false);
        die("Daemon process $pid isn't running.\n");
 }
 
@@ -68,17 +83,19 @@ if ($mode == "stop") {
 
        unlink($pidfile);
 
+       logger("Worker daemon process $pid was killed.", LOGGER_DEBUG);
+
+       Config::set('system', 'worker_daemon_mode', false);
        die("Worker daemon process $pid was killed.\n");
 }
 
-echo "Starting worker daemon.\n";
-
-if (isset($a->config['php_path'])) {
-       $php = $a->config['php_path'];
-} else {
-       $php = "php";
+if (!empty($pid) && posix_kill($pid, 0)) {
+       die("Daemon process $pid is already running.\n");
 }
 
+logger('Starting worker daemon.', LOGGER_DEBUG);
+echo "Starting worker daemon.\n";
+
 // Switch over to daemon mode.
 if ($pid = pcntl_fork())
        return;     // Parent
@@ -95,32 +112,32 @@ if (posix_setsid() < 0)
 if ($pid = pcntl_fork())
        return;     // Parent
 
-$pid = getmypid();
-file_put_contents($pidfile, $pid);
-
-// Now running as a daemon.
-while (true) {
-       // Just to be sure that this script really runs endlessly
-       set_time_limit(0);
+// We lose the database connection upon forking
+dba::connect($db_host, $db_user, $db_pass, $db_data);
+unset($db_host, $db_user, $db_pass, $db_data);
 
-       // Call the worker
-       $cmdline = $php.' bin/worker.php';
+Config::set('system', 'worker_daemon_mode', true);
 
-       $executed = false;
+// Just to be sure that this script really runs endlessly
+set_time_limit(0);
 
-       if (function_exists('proc_open')) {
-               $resource = proc_open($cmdline . ' &', array(), $foo, $directory);
+$pid = getmypid();
+file_put_contents($pidfile, $pid);
 
-               if (is_resource($resource)) {
-                       $executed = true;
-                       proc_close($resource);
-               }
-       }
+$wait_interval = intval(Config::get('system', 'cron_interval', 5)) * 60;
 
-       if (!$executed) {
-               exec($cmdline.' spawn');
-       }
+// Now running as a daemon.
+while (true) {
+       logger('Call the worker', LOGGER_DEBUG);
+       Worker::spawnWorker();
+
+       logger("Sleep for $wait_interval seconds - or when a worker needs to be called", LOGGER_DEBUG);
+       $i = 0;
+       do {
+               sleep(1);
+       } while (($i++ < $wait_interval) && !Worker::IPCJobsExists());
+}
 
-       // Now sleep for 5 minutes
-       sleep(300);
+function shutdown() {
+       posix_kill(posix_getpid(), SIGHUP);
 }
index aa63a5b36145d06da05b0faaab8db10aedf4e424..5245538cedea08d0c96560849d0e9381d5d8cb83 100644 (file)
@@ -26,7 +26,7 @@ class dba {
        private static $relation = [];
 
        public static function connect($serveraddr, $user, $pass, $db) {
-               if (!is_null(self::$db)) {
+               if (!is_null(self::$db) && self::connected()) {
                        return true;
                }
 
index 81cdf1bbd28a8985a5db29800dd18790343529a2..e2135f0daf2e447955b93f99f6957a0c289fd5c4 100644 (file)
@@ -53,10 +53,15 @@ class Worker
                // We now start the process. This is done after the load check since this could increase the load.
                self::startProcess();
 
+               // The daemon doesn't need to fork new workers anymore, since we started a process
+               if (Config::get('system', 'worker_daemon_mode', false)) {
+                       self::IPCSetJobState(false);
+               }
+
                // Kill stale processes every 5 minutes
-               $last_cleanup = Config::get('system', 'poller_last_cleaned', 0);
+               $last_cleanup = Config::get('system', 'worker_last_cleaned', 0);
                if (time() > ($last_cleanup + 300)) {
-                       Config::set('system', 'poller_last_cleaned', time());
+                       Config::set('system', 'worker_last_cleaned', time());
                        self::killStaleWorkers();
                }
 
@@ -108,16 +113,16 @@ class Worker
                                }
 
                                // If possible we will fetch new jobs for this worker
-                               if (!$refetched && Lock::set('poller_worker_process', 0)) {
+                               if (!$refetched && Lock::set('worker_process', 0)) {
                                        $stamp = (float)microtime(true);
                                        $refetched = self::findWorkerProcesses($passing_slow);
                                        self::$db_duration += (microtime(true) - $stamp);
-                                       Lock::remove('poller_worker_process');
+                                       Lock::remove('worker_process');
                                }
                        }
 
                        // To avoid the quitting of multiple workers only one worker at a time will execute the check
-                       if (Lock::set('poller_worker', 0)) {
+                       if (Lock::set('worker', 0)) {
                                $stamp = (float)microtime(true);
                                // Count active workers and compare them with a maximum value that depends on the load
                                if (self::tooMuchWorkers()) {
@@ -130,7 +135,7 @@ class Worker
                                        logger('Memory limit reached, quitting.', LOGGER_DEBUG);
                                        return;
                                }
-                               Lock::remove('poller_worker');
+                               Lock::remove('worker');
                                self::$db_duration += (microtime(true) - $stamp);
                        }
 
@@ -140,6 +145,9 @@ class Worker
                                return;
                        }
                }
+               if (Config::get('system', 'worker_daemon_mode', false)) {
+                       self::IPCSetJobState(false);
+               }
                logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
        }
 
@@ -244,7 +252,7 @@ class Worker
 
                        $stamp = (float)microtime(true);
                        if (dba::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) {
-                               Config::set('system', 'last_poller_execution', DateTimeFormat::utcNow());
+                               Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow());
                        }
                        self::$db_duration = (microtime(true) - $stamp);
 
@@ -285,7 +293,7 @@ class Worker
 
                        $stamp = (float)microtime(true);
                        if (dba::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) {
-                               Config::set('system', 'last_poller_execution', DateTimeFormat::utcNow());
+                               Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow());
                        }
                        self::$db_duration = (microtime(true) - $stamp);
                } else {
@@ -851,6 +859,11 @@ class Worker
                        dba::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $ids);
                }
 
+               // The daemon doesn't need to fork new workers anymore, since we are inside the worker
+               if (Config::get('system', 'worker_daemon_mode', false)) {
+                       self::IPCSetJobState(false);
+               }
+
                return $found;
        }
 
@@ -873,7 +886,7 @@ class Worker
                dba::close($r);
 
                $stamp = (float)microtime(true);
-               if (!Lock::set('poller_worker_process')) {
+               if (!Lock::set('worker_process')) {
                        return false;
                }
                self::$lock_duration = (microtime(true) - $stamp);
@@ -882,7 +895,7 @@ class Worker
                $found = self::findWorkerProcesses($passing_slow);
                self::$db_duration += (microtime(true) - $stamp);
 
-               Lock::remove('poller_worker_process');
+               Lock::remove('worker_process');
 
                if ($found) {
                        $r = dba::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]);
@@ -1071,19 +1084,25 @@ class Worker
                        dba::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, 'priority' => $priority]);
                }
 
+               // We tell the daemon that a new job entry exists
+               if (Config::get('system', 'worker_daemon_mode', false)) {
+                       self::IPCSetJobState(true);
+                       return true;
+               }
+
                // Should we quit and wait for the worker to be called as a cronjob?
                if ($dont_fork) {
                        return true;
                }
 
                // If there is a lock then we don't have to check for too much worker
-               if (!Lock::set('poller_worker', 0)) {
+               if (!Lock::set('worker', 0)) {
                        return true;
                }
 
                // If there are already enough workers running, don't fork another one
                $quit = self::tooMuchWorkers();
-               Lock::remove('poller_worker');
+               Lock::remove('worker');
 
                if ($quit) {
                        return true;
@@ -1121,4 +1140,30 @@ class Worker
        {
                return Process::deleteByPid();
        }
+
+       private static function checkIPC()
+       {
+               dba::e("CREATE TABLE IF NOT EXISTS `worker-ipc` (`key` integer, `jobs` boolean) ENGINE = MEMORY;");
+       }
+
+       public static function IPCSetJobState($jobs)
+       {
+               self::checkIPC();
+
+               dba::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true);
+       }
+
+       public static function IPCJobsExists()
+       {
+               self::checkIPC();
+
+               $row = dba::selectFirst('worker-ipc', ['jobs'], ['key' => 1]);
+
+               // When we don't have a row, no job is running
+               if (!DBM::is_result($row)) {
+                       return false;
+               }
+
+               return (bool)$row['jobs'];
+       }
 }