]> git.mxchange.org Git - quix0rs-gnu-social.git/blobdiff - scripts/queuedaemon.php
Merge remote-tracking branch 'upstream/master' into social-master
[quix0rs-gnu-social.git] / scripts / queuedaemon.php
index 162f617e0d6da3a0838469f3fb98148acedd3b11..bdd630f3d5a2e6ef54c8ff3982a5fe7d8a27977b 100755 (executable)
@@ -21,7 +21,7 @@
 define('INSTALLDIR', realpath(dirname(__FILE__) . '/..'));
 
 $shortoptions = 'fi:at:';
-$longoptions = array('id=', 'foreground', 'all', 'threads=', 'skip-xmpp', 'xmpp-only');
+$longoptions = array('id=', 'foreground', 'all', 'threads=');
 
 /**
  * Attempts to get a count of the processors available on the current system
@@ -29,6 +29,8 @@ $longoptions = array('id=', 'foreground', 'all', 'threads=', 'skip-xmpp', 'xmpp-
  *
  * Recognizes Linux and Mac OS X; others will return default of 1.
  *
+ * @fixme move this to SpawningDaemon, but to get the default val for help
+ *        text we seem to need it before loading infrastructure
  * @return intval
  */
 function getProcessorCount()
@@ -66,14 +68,12 @@ Daemon script for running queued items.
 
 END_OF_QUEUE_HELP;
 
-require_once INSTALLDIR.'/scripts/commandline.inc';
+require_once INSTALLDIR.'/scripts/commandline.inc.php';
 
 require_once(INSTALLDIR.'/lib/daemon.php');
 require_once(INSTALLDIR.'/classes/Queue_item.php');
 require_once(INSTALLDIR.'/classes/Notice.php');
 
-define('CLAIM_TIMEOUT', 1200);
-
 /**
  * Queue handling daemon...
  *
@@ -83,156 +83,73 @@ define('CLAIM_TIMEOUT', 1200);
  * We can then pass individual items through the QueueHandler subclasses
  * they belong to.
  */
-class QueueDaemon extends Daemon
+class QueueDaemon extends SpawningDaemon
 {
-    protected $allsites;
-    protected $threads=1;
+    protected $allsites = false;
 
     function __construct($id=null, $daemonize=true, $threads=1, $allsites=false)
     {
-        parent::__construct($daemonize);
-
-        if ($id) {
-            $this->set_id($id);
-        }
-        $this->all = $allsites;
-        $this->threads = $threads;
-    }
-
-    /**
-     * How many seconds a polling-based queue manager should wait between
-     * checks for new items to handle.
-     *
-     * Defaults to 60 seconds; override to speed up or slow down.
-     *
-     * @return int timeout in seconds
-     */
-    function timeout()
-    {
-        return 60;
-    }
-
-    function name()
-    {
-        return strtolower(get_class($this).'.'.$this->get_id());
-    }
-
-    function run()
-    {
-        if ($this->threads > 1) {
-            return $this->runThreads();
-        } else {
-            return $this->runLoop();
-        }
-    }
-    
-    function runThreads()
-    {
-        $children = array();
-        for ($i = 1; $i <= $this->threads; $i++) {
-            $pid = pcntl_fork();
-            if ($pid < 0) {
-                print "Couldn't fork for thread $i; aborting\n";
-                exit(1);
-            } else if ($pid == 0) {
-                $this->runChild($i);
-                exit(0);
-            } else {
-                $this->log(LOG_INFO, "Spawned thread $i as pid $pid");
-                $children[$i] = $pid;
-            }
-        }
-        
-        $this->log(LOG_INFO, "Waiting for children to complete.");
-        while (count($children) > 0) {
-            $status = null;
-            $pid = pcntl_wait($status);
-            if ($pid > 0) {
-                $i = array_search($pid, $children);
-                if ($i === false) {
-                    $this->log(LOG_ERR, "Unrecognized child pid $pid exited!");
-                    continue;
-                }
-                unset($children[$i]);
-                $this->log(LOG_INFO, "Thread $i pid $pid exited.");
-                
-                $pid = pcntl_fork();
-                if ($pid < 0) {
-                    print "Couldn't fork to respawn thread $i; aborting thread.\n";
-                } else if ($pid == 0) {
-                    $this->runChild($i);
-                    exit(0);
-                } else {
-                    $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
-                    $children[$i] = $pid;
-                }
-            }
-        }
-        $this->log(LOG_INFO, "All child processes complete.");
-        return true;
-    }
-
-    function runChild($thread)
-    {
-        $this->set_id($this->get_id() . "." . $thread);
-        $this->resetDb();
-        $this->runLoop();
-    }
-
-    /**
-     * Reconnect to the database for each child process,
-     * or they'll get very confused trying to use the
-     * same socket.
-     */
-    function resetDb()
-    {
-        // @fixme do we need to explicitly open the db too
-        // or is this implied?
-        global $_DB_DATAOBJECT;
-        unset($_DB_DATAOBJECT['CONNECTIONS']);
-
-        // Reconnect main memcached, or threads will stomp on
-        // each other and corrupt their requests.
-        $cache = common_memcache();
-        if ($cache) {
-            $cache->reconnect();
-        }
-
-        // Also reconnect memcached for status_network table.
-        if (!empty(Status_network::$cache)) {
-            Status_network::$cache->close();
-            Status_network::$cache = null;
-        }
+        parent::__construct($id, $daemonize, $threads);
+        $this->allsites = $allsites;
     }
 
     /**
      * Setup and start of run loop for this queue handler as a daemon.
      * Most of the heavy lifting is passed on to the QueueManager's service()
-     * method, which passes control on to the QueueHandler's handle_notice()
-     * method for each notice that comes in on the queue.
-     *
-     * Most of the time this won't need to be overridden in a subclass.
+     * method, which passes control on to the QueueHandler's handle()
+     * method for each item that comes in on the queue.
      *
      * @return boolean true on success, false on failure
      */
-    function runLoop()
+    function runThread()
     {
         $this->log(LOG_INFO, 'checking for queued notices');
 
-        $master = new IoMaster($this->get_id());
-        $master->init($this->all);
-        $master->service();
+        $master = new QueueMaster($this->get_id(), $this->processManager());
+        $master->init($this->allsites);
+        try {
+            $master->service();
+        } catch (Exception $e) {
+            common_log(LOG_ERR, "Unhandled exception: " . $e->getMessage() . ' ' .
+                str_replace("\n", " ", $e->getTraceAsString()));
+            return self::EXIT_ERR;
+        }
 
         $this->log(LOG_INFO, 'finished servicing the queue');
 
         $this->log(LOG_INFO, 'terminating normally');
 
-        return true;
+        return $master->respawn ? self::EXIT_RESTART : self::EXIT_SHUTDOWN;
     }
+}
 
-    function log($level, $msg)
+class QueueMaster extends IoMaster
+{
+    protected $processManager;
+
+    function __construct($id, $processManager)
+    {
+        parent::__construct($id);
+        $this->processManager = $processManager;
+    }
+
+    /**
+     * Initialize IoManagers which are appropriate to this instance.
+     */
+    function initManagers()
     {
-        common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg);
+        $managers = array();
+        if (Event::handle('StartQueueDaemonIoManagers', array(&$managers))) {
+            $qm = QueueManager::get();
+            $qm->setActiveGroup('main');
+            $managers[] = $qm;
+            $managers[] = $this->processManager;
+        }
+        Event::handle('EndQueueDaemonIoManagers', array(&$managers));
+
+        foreach ($managers as $manager) {
+            $this->instantiate($manager);
+        }
     }
 }
 
@@ -260,13 +177,6 @@ if (!$threads) {
 $daemonize = !(have_option('f') || have_option('--foreground'));
 $all = have_option('a') || have_option('--all');
 
-if (have_option('--skip-xmpp')) {
-    define('XMPP_EMERGENCY_FLAG', true);
-}
-if (have_option('--xmpp-only')) {
-    define('XMPP_ONLY_FLAG', true);
-}
-
 $daemon = new QueueDaemon($id, $daemonize, $threads, $all);
 $daemon->runOnce();