]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
Control channel for queue daemons to request graceful shutdown, restart, or update...
authorBrion Vibber <brion@pobox.com>
Tue, 26 Jan 2010 19:49:49 +0000 (11:49 -0800)
committerBrion Vibber <brion@pobox.com>
Tue, 26 Jan 2010 19:49:49 +0000 (11:49 -0800)
  queuectl.php --update -s<site>
  queuectl.php --stop
  queuectl.php --restart

Default control channel is /topic/statusnet-control. For external utilities to send a site update ping direct to the queue server, connect via Stomp and send a message formatted thus:

  update:<nickname>

(Nickname here, *not* server hostname! The rest of the queues will be updated to use nicknames later.)

Note that all currently-connected queue daemons will get these notifications, including both queuedaemon.php and xmppdaemon.php. (XMPP will ignore site update requests for sites that it's not handling.)

Limitations:
* only implemented for stomp queue manager so far
* --update may not yet handle a changed server name properly
* --restart won't reload PHP code files that were already loaded at startup. Still need to stop and restart the daemons from 'outside' when updating code base.

classes/Status_network.php
lib/default.php
lib/iomaster.php
lib/queuemanager.php
lib/spawningdaemon.php
lib/stompqueuemanager.php
scripts/queuectl.php [new file with mode: 0755]
scripts/queuedaemon.php
scripts/xmppdaemon.php

index f1314d61513c6802c3d1922f7b2c25c36ad28111..4bda24b6a02d253aeb03ad2efdf65605da0ee65d 100644 (file)
@@ -42,7 +42,16 @@ class Status_network extends DB_DataObject
     public $tags;                            // text
 
     /* Static get */
-    function staticGet($k,$v=NULL) { return DB_DataObject::staticGet('Status_network',$k,$v); }
+    function staticGet($k,$v=NULL) {
+        $i = DB_DataObject::staticGet('Status_network',$k,$v);
+
+        // Don't use local process cache; if we're fetching multiple
+        // times it's because we're reloading it in a long-running
+        // process; we need a fresh copy!
+        global $_DB_DATAOBJECT;
+        unset($_DB_DATAOBJECT['CACHE']['status_network']);
+        return $i;
+    }
 
     /* the code above is auto generated do not remove the tag below */
     ###END_AUTOCODE
index 35115542f29e0260bf9e71a6533339bdb3ab3c17..d2dd8ab33c9f6452f33e02dd7343af332ac5a2a5 100644 (file)
@@ -81,6 +81,7 @@ $default =
               'subsystem' => 'db', # default to database, or 'stomp'
               'stomp_server' => null,
               'queue_basename' => '/queue/statusnet/',
+              'control_channel' => '/topic/statusnet-control', // broadcasts to all queue daemons
               'stomp_username' => null,
               'stomp_password' => null,
               'monitor' => null, // URL to monitor ping endpoint (work in progress)
index 3bf82bc6b47903b76fc42631ddb1fdbc4014a0aa..bcab3542be5af791892d949c2ec8f80074a183c3 100644 (file)
@@ -38,6 +38,9 @@ abstract class IoMaster
     protected $pollTimeouts = array();
     protected $lastPoll = array();
 
+    public $shutdown = false; // Did we do a graceful shutdown?
+    public $respawn = true; // Should we respawn after shutdown?
+
     /**
      * @param string $id process ID to use in logging/monitoring
      */
@@ -144,7 +147,7 @@ abstract class IoMaster
         $this->logState('init');
         $this->start();
 
-        while (true) {
+        while (!$this->shutdown) {
             $timeouts = array_values($this->pollTimeouts);
             $timeouts[] = 60; // default max timeout
 
@@ -196,22 +199,31 @@ abstract class IoMaster
             $this->logState('idle');
             $this->idle();
 
-            $memoryLimit = $this->softMemoryLimit();
-            if ($memoryLimit > 0) {
-                $usage = memory_get_usage();
-                if ($usage > $memoryLimit) {
-                    common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
-                    break;
-                } else if (common_config('queue', 'debug_memory')) {
-                    common_log(LOG_DEBUG, "Memory usage $usage");
-                }
-            }
+            $this->checkMemory();
         }
 
         $this->logState('shutdown');
         $this->finish();
     }
 
+    /**
+     * Check runtime memory usage, possibly triggering a graceful shutdown
+     * and thread respawn if we've crossed the soft limit.
+     */
+    protected function checkMemory()
+    {
+        $memoryLimit = $this->softMemoryLimit();
+        if ($memoryLimit > 0) {
+            $usage = memory_get_usage();
+            if ($usage > $memoryLimit) {
+                common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
+                $this->requestRestart();
+            } else if (common_config('queue', 'debug_memory')) {
+                common_log(LOG_DEBUG, "Memory usage $usage");
+            }
+        }
+    }
+
     /**
      * Return fully-parsed soft memory limit in bytes.
      * @return intval 0 or -1 if not set
@@ -354,5 +366,24 @@ abstract class IoMaster
         $owners[] = "thread:" . $this->id;
         $this->monitor->stats($key, $owners);
     }
+
+    /**
+     * For IoManagers to request a graceful shutdown at end of event loop.
+     */
+    public function requestShutdown()
+    {
+        $this->shutdown = true;
+        $this->respawn = false;
+    }
+
+    /**
+     * For IoManagers to request a graceful restart at end of event loop.
+     */
+    public function requestRestart()
+    {
+        $this->shutdown = true;
+        $this->respawn = true;
+    }
+
 }
 
index 1bc8de1f787178598ac0cfef3f238c895f546fd7..afe710e884dbc9d4aea2168a4f9548c4fc053b9c 100644 (file)
@@ -100,6 +100,23 @@ abstract class QueueManager extends IoManager
         $this->initialize();
     }
 
+    /**
+     * Optional; ping any running queue handler daemons with a notification
+     * such as announcing a new site to handle or requesting clean shutdown.
+     * This avoids having to restart all the daemons manually to update configs
+     * and such.
+     *
+     * Called from scripts/queuectl.php controller utility.
+     *
+     * @param string $event event key
+     * @param string $param optional parameter to append to key
+     * @return boolean success
+     */
+    public function sendControlSignal($event, $param='')
+    {
+        throw new Exception(get_class($this) . " does not support control signals.");
+    }
+
     /**
      * Store an object (usually/always a Notice) into the given queue
      * for later processing. No guarantee is made on when it will be
index 8baefe88e8184db42b216f589f19e79f9207ea5f..b1961d68801c308fc25429a4cee80b028439f72d 100644 (file)
@@ -36,6 +36,11 @@ abstract class SpawningDaemon extends Daemon
 {
     protected $threads=1;
 
+    const EXIT_OK = 0;
+    const EXIT_ERR = 1;
+    const EXIT_SHUTDOWN = 100;
+    const EXIT_RESTART = 101;
+
     function __construct($id=null, $daemonize=true, $threads=1)
     {
         parent::__construct($daemonize);
@@ -49,7 +54,7 @@ abstract class SpawningDaemon extends Daemon
     /**
      * Perform some actual work!
      *
-     * @return boolean true on success, false on failure
+     * @return int exit code; use self::EXIT_SHUTDOWN to request not to respawn.
      */
     public abstract function runThread();
 
@@ -84,23 +89,30 @@ abstract class SpawningDaemon extends Daemon
         while (count($children) > 0) {
             $status = null;
             $pid = pcntl_wait($status);
-            if ($pid > 0) {
+            if ($pid > 0 && pcntl_wifexited($status)) {
+                $exitCode = pcntl_wexitstatus($status);
+
                 $i = array_search($pid, $children);
                 if ($i === false) {
-                    $this->log(LOG_ERR, "Unrecognized child pid $pid exited!");
+                    $this->log(LOG_ERR, "Unrecognized child pid $pid exited with status $exitCode");
                     continue;
                 }
                 unset($children[$i]);
-                $this->log(LOG_INFO, "Thread $i pid $pid exited.");
-                
-                $pid = pcntl_fork();
-                if ($pid < 0) {
-                    $this->log(LOG_ERROR, "Couldn't fork to respawn thread $i; aborting thread.\n");
-                } else if ($pid == 0) {
-                    $this->initAndRunChild($i);
+
+                if ($this->shouldRespawn($exitCode)) {
+                    $this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; respawing.");
+
+                    $pid = pcntl_fork();
+                    if ($pid < 0) {
+                        $this->log(LOG_ERROR, "Couldn't fork to respawn thread $i; aborting thread.\n");
+                    } else if ($pid == 0) {
+                        $this->initAndRunChild($i);
+                    } else {
+                        $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
+                        $children[$i] = $pid;
+                    }
                 } else {
-                    $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
-                    $children[$i] = $pid;
+                    $this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; closing out thread.");
                 }
             }
         }
@@ -108,6 +120,24 @@ abstract class SpawningDaemon extends Daemon
         return true;
     }
 
+    /**
+     * Determine whether to respawn an exited subprocess based on its exit code.
+     * Otherwise we'll respawn all exits by default.
+     *
+     * @param int $exitCode
+     * @return boolean true to respawn
+     */
+    protected function shouldRespawn($exitCode)
+    {
+        if ($exitCode == self::EXIT_SHUTDOWN) {
+            // Thread requested a clean shutdown.
+            return false;
+        } else {
+            // Otherwise we should always respawn!
+            return true;
+        }
+    }
+
     /**
      * Initialize things for a fresh thread, call runThread(), and
      * exit at completion with appropriate return value.
@@ -116,8 +146,8 @@ abstract class SpawningDaemon extends Daemon
     {
         $this->set_id($this->get_id() . "." . $thread);
         $this->resetDb();
-        $ok = $this->runThread();
-        exit($ok ? 0 : 1);
+        $exitCode = $this->runThread();
+        exit($exitCode);
     }
 
     /**
index 8f0091a1384f51b1cf07ad6c4bddf7dfbd920f03..89f3d74cce0e7fb6628552c6538a0a758760ac2d 100644 (file)
@@ -38,8 +38,10 @@ class StompQueueManager extends QueueManager
     var $password = null;
     var $base = null;
     var $con = null;
+    protected $control;
     
     protected $sites = array();
+    protected $subscriptions = array();
 
     protected $useTransactions = true;
     protected $transaction = null;
@@ -52,6 +54,7 @@ class StompQueueManager extends QueueManager
         $this->username = common_config('queue', 'stomp_username');
         $this->password = common_config('queue', 'stomp_password');
         $this->base     = common_config('queue', 'queue_basename');
+        $this->control  = common_config('queue', 'control_channel');
     }
 
     /**
@@ -77,6 +80,36 @@ class StompQueueManager extends QueueManager
         $this->initialize();
     }
 
+    /**
+     * Optional; ping any running queue handler daemons with a notification
+     * such as announcing a new site to handle or requesting clean shutdown.
+     * This avoids having to restart all the daemons manually to update configs
+     * and such.
+     *
+     * Currently only relevant for multi-site queue managers such as Stomp.
+     *
+     * @param string $event event key
+     * @param string $param optional parameter to append to key
+     * @return boolean success
+     */
+    public function sendControlSignal($event, $param='')
+    {
+        $message = $event;
+        if ($param != '') {
+            $message .= ':' . $param;
+        }
+        $this->_connect();
+        $result = $this->con->send($this->control,
+                                   $message,
+                                   array ('created' => common_sql_now()));
+        if ($result) {
+            $this->_log(LOG_INFO, "Sent control ping to queue daemons: $message");
+            return true;
+        } else {
+            $this->_log(LOG_ERR, "Failed sending control ping to queue daemons: $message");
+            return false;
+        }
+    }
 
     /**
      * Instantiate the appropriate QueueHandler class for the given queue.
@@ -86,7 +119,7 @@ class StompQueueManager extends QueueManager
      */
     function getHandler($queue)
     {
-        $handlers = $this->handlers[common_config('site', 'server')];
+        $handlers = $this->handlers[$this->currentSite()];
         if (isset($handlers[$queue])) {
             $class = $handlers[$queue];
             if (class_exists($class)) {
@@ -108,7 +141,7 @@ class StompQueueManager extends QueueManager
     function getQueues()
     {
         $group = $this->activeGroup();
-        $site = common_config('site', 'server');
+        $site = $this->currentSite();
         if (empty($this->groups[$site][$group])) {
             return array();
         } else {
@@ -126,8 +159,8 @@ class StompQueueManager extends QueueManager
      */
     public function connect($transport, $class, $group='queuedaemon')
     {
-        $this->handlers[common_config('site', 'server')][$transport] = $class;
-        $this->groups[common_config('site', 'server')][$group][$transport] = $class;
+        $this->handlers[$this->currentSite()][$transport] = $class;
+        $this->groups[$this->currentSite()][$group][$transport] = $class;
     }
 
     /**
@@ -180,7 +213,16 @@ class StompQueueManager extends QueueManager
         $ok = true;
         $frames = $this->con->readFrames();
         foreach ($frames as $frame) {
-            $ok = $ok && $this->_handleItem($frame);
+            $dest = $frame->headers['destination'];
+            if ($dest == $this->control) {
+                if (!$this->handleControlSignal($frame)) {
+                    // We got a control event that requests a shutdown;
+                    // close out and stop handling anything else!
+                    break;
+                }
+            } else {
+                $ok = $ok && $this->handleItem($frame);
+            }
         }
         return $ok;
     }
@@ -197,6 +239,9 @@ class StompQueueManager extends QueueManager
     public function start($master)
     {
         parent::start($master);
+        $this->_connect();
+
+        $this->con->subscribe($this->control);
         if ($this->sites) {
             foreach ($this->sites as $server) {
                 StatusNet::init($server);
@@ -221,6 +266,7 @@ class StompQueueManager extends QueueManager
         // If there are any outstanding delivered messages we haven't processed,
         // free them for another thread to take.
         $this->rollback();
+        $this->con->unsubscribe($this->control);
         if ($this->sites) {
             foreach ($this->sites as $server) {
                 StatusNet::init($server);
@@ -231,7 +277,16 @@ class StompQueueManager extends QueueManager
         }
         return true;
     }
-    
+
+    /**
+     * Get identifier of the currently active site configuration
+     * @return string
+     */
+    protected function currentSite()
+    {
+        return common_config('site', 'server'); // @fixme switch to nickname
+    }
+
     /**
      * Lazy open connection to Stomp queue server.
      */
@@ -255,22 +310,29 @@ class StompQueueManager extends QueueManager
      */
     protected function doSubscribe()
     {
+        $site = $this->currentSite();
         $this->_connect();
         foreach ($this->getQueues() as $queue) {
             $rawqueue = $this->queueName($queue);
+            $this->subscriptions[$site][$queue] = $rawqueue;
             $this->_log(LOG_INFO, "Subscribing to $rawqueue");
             $this->con->subscribe($rawqueue);
         }
     }
-    
+
     /**
      * Subscribe from all enabled notice queues for the current site.
      */
     protected function doUnsubscribe()
     {
+        $site = $this->currentSite();
         $this->_connect();
-        foreach ($this->getQueues() as $queue) {
-            $this->con->unsubscribe($this->queueName($queue));
+        if (!empty($this->subscriptions[$site])) {
+            foreach ($this->subscriptions[$site] as $queue => $rawqueue) {
+                $this->_log(LOG_INFO, "Unsubscribing from $rawqueue");
+                $this->con->unsubscribe($rawqueue);
+                unset($this->subscriptions[$site][$queue]);
+            }
         }
     }
 
@@ -286,10 +348,10 @@ class StompQueueManager extends QueueManager
      * @param StompFrame $frame
      * @return bool
      */
-    protected function _handleItem($frame)
+    protected function handleItem($frame)
     {
         list($site, $queue) = $this->parseDestination($frame->headers['destination']);
-        if ($site != common_config('site', 'server')) {
+        if ($site != $this->currentSite()) {
             $this->stats('switch');
             StatusNet::init($site);
         }
@@ -317,7 +379,7 @@ class StompQueueManager extends QueueManager
 
         $handler = $this->getHandler($queue);
         if (!$handler) {
-            $this->_log(LOG_ERROR, "Missing handler class; skipping $info");
+            $this->_log(LOG_ERR, "Missing handler class; skipping $info");
             $this->ack($frame);
             $this->commit();
             $this->begin();
@@ -348,6 +410,77 @@ class StompQueueManager extends QueueManager
         return true;
     }
 
+    /**
+     * Process a control signal broadcast.
+     *
+     * @param array $frame Stomp frame
+     * @return bool true to continue; false to stop further processing.
+     */
+    protected function handleControlSignal($frame)
+    {
+        $message = trim($frame->body);
+        if (strpos($message, ':') !== false) {
+            list($event, $param) = explode(':', $message, 2);
+        } else {
+            $event = $message;
+            $param = '';
+        }
+
+        $shutdown = false;
+
+        if ($event == 'shutdown') {
+            $this->master->requestShutdown();
+            $shutdown = true;
+        } else if ($event == 'restart') {
+            $this->master->requestRestart();
+            $shutdown = true;
+        } else if ($event == 'update') {
+            $this->updateSiteConfig($param);
+        } else {
+            $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message");
+        }
+
+        $this->ack($frame);
+        $this->commit();
+        $this->begin();
+        return $shutdown;
+    }
+    
+    /**
+     * Set us up with queue subscriptions for a new site added at runtime,
+     * triggered by a broadcast to the 'statusnet-control' topic.
+     *
+     * @param array $frame Stomp frame
+     * @return bool true to continue; false to stop further processing.
+     */
+    protected function updateSiteConfig($nickname)
+    {
+        if (empty($this->sites)) {
+            if ($nickname == common_config('site', 'nickname')) {
+                StatusNet::init(common_config('site', 'server'));
+                $this->doUnsubscribe();
+                $this->doSubscribe();
+            } else {
+                $this->_log(LOG_INFO, "Ignoring update ping for other site $nickname");
+            }
+        } else {
+            $sn = Status_network::staticGet($nickname);
+            if ($sn) {
+                $server = $sn->getServerName(); // @fixme do config-by-nick
+                StatusNet::init($server);
+                if (empty($this->sites[$server])) {
+                    $this->addSite($server);
+                }
+                $this->_log(LOG_INFO, "(Re)subscribing to queues for site $nickname / $server");
+                $this->doUnsubscribe();
+                $this->doSubscribe();
+                $this->stats('siteupdate');
+            } else {
+                $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
+            }
+        }
+    }
+
     /**
      * Combines the queue_basename from configuration with the
      * site server name and queue name to give eg:
@@ -360,7 +493,7 @@ class StompQueueManager extends QueueManager
     protected function queueName($queue)
     {
         return common_config('queue', 'queue_basename') .
-            common_config('site', 'server') . '/' . $queue;
+            $this->currentSite() . '/' . $queue;
     }
 
     /**
diff --git a/scripts/queuectl.php b/scripts/queuectl.php
new file mode 100755 (executable)
index 0000000..1c9ea33
--- /dev/null
@@ -0,0 +1,85 @@
+#!/usr/bin/env php
+<?php
+/*
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2010, StatusNet, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * Sends control signals to running queue daemons.
+ *
+ * @author Brion Vibber <brion@status.net>
+ * @package QueueHandler
+ */
+
+define('INSTALLDIR', realpath(dirname(__FILE__) . '/..'));
+
+$shortoptions = 'ur';
+$longoptions = array('update', 'restart', 'stop');
+
+$helptext = <<<END_OF_QUEUECTL_HELP
+Send broadcast events to control any running queue handlers.
+(Currently for Stomp queues only.)
+
+Events relating to current site (as selected with -s etc)
+    -u --update       Announce new site or updated configuration. Running
+                      daemons will start subscribing to any new queues needed
+                      for this site.
+
+Global events:
+    -r --restart      Graceful restart of all threads
+       --stop         Graceful shutdown of all threads
+
+END_OF_QUEUECTL_HELP;
+
+require_once INSTALLDIR.'/scripts/commandline.inc';
+
+function doSendControl($message, $event, $param='')
+{
+    print $message;
+    $qm = QueueManager::get();
+    if ($qm->sendControlSignal($event, $param)) {
+        print " sent.\n";
+    } else {
+        print " FAILED.\n";
+    }
+}
+
+$actions = 0;
+
+if (have_option('u') || have_option('--update')) {
+    $nickname = common_config('site', 'nickname');
+    doSendControl("Sending site update signal to queue daemons for $nickname",
+                  "update", $nickname);
+    $actions++;
+}
+
+if (have_option('r') || have_option('--restart')) {
+    doSendControl("Sending graceful restart signal to queue daemons...",
+                  "restart");
+    $actions++;
+}
+
+if (have_option('--stop')) {
+    doSendControl("Sending graceful shutdown signal to queue daemons...",
+                  "shutdown");
+    $actions++;
+}
+
+if (!$actions) {
+    show_help();
+}
+
index bedd14b1a3fbce867534e2c3a0eeb8e4857ce52c..c2e2351c3910e307331303c0509e30692f89b012 100755 (executable)
@@ -115,7 +115,7 @@ class QueueDaemon extends SpawningDaemon
 
         $this->log(LOG_INFO, 'terminating normally');
 
-        return true;
+        return $master->respawn ? self::EXIT_RESTART : self::EXIT_SHUTDOWN;
     }
 }
 
index fd7cf055b485cedaae3f327952c16baf4f634456..46dd9b90cc636be75028f7ddbfac0ff563143879 100755 (executable)
@@ -56,7 +56,7 @@ class XMPPDaemon extends SpawningDaemon
 
         common_log(LOG_INFO, 'terminating normally');
 
-        return true;
+        return $master->respawn ? self::EXIT_RESTART : self::EXIT_SHUTDOWN;
     }
 
 }