]> git.mxchange.org Git - quix0rs-gnu-social.git/blobdiff - lib/spawningdaemon.php
Fixed group representation in Directory plugin, also some ->raw calls
[quix0rs-gnu-social.git] / lib / spawningdaemon.php
index 8baefe88e8184db42b216f589f19e79f9207ea5f..732237403dfa49cd75450f81120e7378f9ad71cb 100644 (file)
@@ -36,6 +36,11 @@ abstract class SpawningDaemon extends Daemon
 {
     protected $threads=1;
 
+    const EXIT_OK = 0;
+    const EXIT_ERR = 1;
+    const EXIT_SHUTDOWN = 100;
+    const EXIT_RESTART = 101;
+
     function __construct($id=null, $daemonize=true, $threads=1)
     {
         parent::__construct($daemonize);
@@ -49,7 +54,7 @@ abstract class SpawningDaemon extends Daemon
     /**
      * Perform some actual work!
      *
-     * @return boolean true on success, false on failure
+     * @return int exit code; use self::EXIT_SHUTDOWN to request not to respawn.
      */
     public abstract function runThread();
 
@@ -66,11 +71,13 @@ abstract class SpawningDaemon extends Daemon
      */
     function run()
     {
+        $this->initPipes();
+
         $children = array();
         for ($i = 1; $i <= $this->threads; $i++) {
             $pid = pcntl_fork();
             if ($pid < 0) {
-                $this->log(LOG_ERROR, "Couldn't fork for thread $i; aborting\n");
+                $this->log(LOG_ERR, "Couldn't fork for thread $i; aborting\n");
                 exit(1);
             } else if ($pid == 0) {
                 $this->initAndRunChild($i);
@@ -78,6 +85,7 @@ abstract class SpawningDaemon extends Daemon
                 $this->log(LOG_INFO, "Spawned thread $i as pid $pid");
                 $children[$i] = $pid;
             }
+            sleep(common_config('queue', 'spawndelay'));
         }
         
         $this->log(LOG_INFO, "Waiting for children to complete.");
@@ -87,20 +95,34 @@ abstract class SpawningDaemon extends Daemon
             if ($pid > 0) {
                 $i = array_search($pid, $children);
                 if ($i === false) {
-                    $this->log(LOG_ERR, "Unrecognized child pid $pid exited!");
+                    $this->log(LOG_ERR, "Ignoring exit of unrecognized child pid $pid");
                     continue;
                 }
+                if (pcntl_wifexited($status)) {
+                    $exitCode = pcntl_wexitstatus($status);
+                    $info = "status $exitCode";
+                } else if (pcntl_wifsignaled($status)) {
+                    $exitCode = self::EXIT_ERR;
+                    $signal = pcntl_wtermsig($status);
+                    $info = "signal $signal";
+                }
                 unset($children[$i]);
-                $this->log(LOG_INFO, "Thread $i pid $pid exited.");
-                
-                $pid = pcntl_fork();
-                if ($pid < 0) {
-                    $this->log(LOG_ERROR, "Couldn't fork to respawn thread $i; aborting thread.\n");
-                } else if ($pid == 0) {
-                    $this->initAndRunChild($i);
+
+                if ($this->shouldRespawn($exitCode)) {
+                    $this->log(LOG_INFO, "Thread $i pid $pid exited with $info; respawing.");
+
+                    $pid = pcntl_fork();
+                    if ($pid < 0) {
+                        $this->log(LOG_ERR, "Couldn't fork to respawn thread $i; aborting thread.\n");
+                    } else if ($pid == 0) {
+                        $this->initAndRunChild($i);
+                    } else {
+                        $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
+                        $children[$i] = $pid;
+                    }
+                    sleep(common_config('queue', 'spawndelay'));
                 } else {
-                    $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
-                    $children[$i] = $pid;
+                    $this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; closing out thread.");
                 }
             }
         }
@@ -108,16 +130,64 @@ abstract class SpawningDaemon extends Daemon
         return true;
     }
 
+    /**
+     * Create an IPC socket pair which child processes can use to detect
+     * if the parent process has been killed.
+     */
+    function initPipes()
+    {
+        $sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
+        if ($sockets) {
+            $this->parentWriter = $sockets[0];
+            $this->parentReader = $sockets[1];
+        } else {
+            $this->log(LOG_ERR, "Couldn't create inter-process sockets");
+            exit(1);
+        }
+    }
+
+    /**
+     * Build an IOManager that simply ensures that we have a connection
+     * to the parent process open. If it breaks, the child process will
+     * die.
+     *
+     * @return ProcessManager
+     */
+    public function processManager()
+    {
+        return new ProcessManager($this->parentReader);
+    }
+
+    /**
+     * Determine whether to respawn an exited subprocess based on its exit code.
+     * Otherwise we'll respawn all exits by default.
+     *
+     * @param int $exitCode
+     * @return boolean true to respawn
+     */
+    protected function shouldRespawn($exitCode)
+    {
+        if ($exitCode == self::EXIT_SHUTDOWN) {
+            // Thread requested a clean shutdown.
+            return false;
+        } else {
+            // Otherwise we should always respawn!
+            return true;
+        }
+    }
+
     /**
      * Initialize things for a fresh thread, call runThread(), and
      * exit at completion with appropriate return value.
      */
     protected function initAndRunChild($thread)
     {
+        // Close the writer end of our parent<->children pipe.
+        fclose($this->parentWriter);
         $this->set_id($this->get_id() . "." . $thread);
         $this->resetDb();
-        $ok = $this->runThread();
-        exit($ok ? 0 : 1);
+        $exitCode = $this->runThread();
+        exit($exitCode);
     }
 
     /**
@@ -134,7 +204,7 @@ abstract class SpawningDaemon extends Daemon
 
         // Reconnect main memcached, or threads will stomp on
         // each other and corrupt their requests.
-        $cache = common_memcache();
+        $cache = Cache::instance();
         if ($cache) {
             $cache->reconnect();
         }