]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
Queues: redid the breakout control model so we can start up and subscribe to queues...
authorBrion Vibber <brion@pobox.com>
Thu, 18 Feb 2010 00:49:00 +0000 (16:49 -0800)
committerBrion Vibber <brion@pobox.com>
Thu, 18 Feb 2010 00:49:00 +0000 (16:49 -0800)
All breakout queues that we're going to need to listen to now need to be explicitly listed in $config['queue']['breakout'].

Until XMPP is moved to component model, this setting will let the individual processes work with their own queues:
$config['queue']['breakout'][] = 'xmpp/xmppout/' . $config['site']['nickname'];

lib/default.php
lib/iomaster.php
lib/stompqueuemanager.php
scripts/queuedaemon.php

index a74cccae12fb29fa76053a84bbff58be440d31ea..c969c3b337cc5f7d647a57795e834a6dd88219fa 100644 (file)
@@ -91,10 +91,13 @@ $default =
               'spawndelay' => 1, // Wait at least N seconds between (re)spawns of child processes to avoid slamming the queue server with subscription startup
               'debug_memory' => false, // true to spit memory usage to log
               'inboxes' => true, // true to do inbox distribution & output queueing from in background via 'distrib' queue
-              'breakout' => array('*' => 'shared'), // set global or per-handler queue breakout
-                                      // 'shared': use a shared queue for all sites
-                                      // 'handler': share each/this handler over multiple sites
-                                      // 'site': break out for each/this handler on this site
+              'breakout' => array(), // List queue specifiers to break out when using Stomp queue.
+                                     // Default will share all queues for all sites within each group.
+                                     // Specify as <group>/<queue> or <group>/<queue>/<site>,
+                                     // using nickname identifier as site.
+                                     //
+                                     // 'main/distrib' separate "distrib" queue covering all sites
+                                     // 'xmpp/xmppout/mysite' separate "xmppout" queue covering just 'mysite'
               'max_retries' => 10, // drop messages after N failed attempts to process (Stomp)
               'dead_letter_dir' => false, // set to directory to save dropped messages into (Stomp)
               ),
index 54e2dfe84165cda3c3ca92c5a2a0587b5bc29eee..d20837ba54a3dac70e5731a2eb1b705912beb8ac 100644 (file)
@@ -55,27 +55,18 @@ abstract class IoMaster
         if ($multiSite !== null) {
             $this->multiSite = $multiSite;
         }
-        if ($this->multiSite) {
-            $this->sites = StatusNet::findAllSites();
-        } else {
-            $this->sites = array(StatusNet::currentSite());
-        }
-
-        if (empty($this->sites)) {
-            throw new Exception("Empty status_network table, cannot init");
-        }
 
-        foreach ($this->sites as $site) {
-            StatusNet::switchSite($site);
-            $this->initManagers();
-        }
+        $this->initManagers();
     }
 
     /**
-     * Initialize IoManagers for the currently configured site
-     * which are appropriate to this instance.
+     * Initialize IoManagers which are appropriate to this instance;
+     * pass class names or instances into $this->instantiate().
+     *
+     * If setup and configuration may vary between sites in multi-site
+     * mode, it's the subclass's responsibility to set them up here.
      *
-     * Pass class names into $this->instantiate()
+     * Switching site configurations is an acceptable side effect.
      */
     abstract function initManagers();
 
index bfeeb23b7fc88e1b9a3d27a0a9f531e066036555..9af8b2f4826b24714f5f557af56d2a560681adcd 100644 (file)
@@ -63,7 +63,7 @@ class StompQueueManager extends QueueManager
         $this->password = common_config('queue', 'stomp_password');
         $this->base     = common_config('queue', 'queue_basename');
         $this->control  = common_config('queue', 'control_channel');
-        $this->subscriptions = array($this->control => $this->control);
+        $this->breakout = common_config('queue', 'breakout');
     }
 
     /**
@@ -75,28 +75,6 @@ class StompQueueManager extends QueueManager
         return IoManager::INSTANCE_PER_PROCESS;
     }
 
-    /**
-     * Record queue subscriptions we'll need to handle the current site.
-     */
-    public function addSite()
-    {
-        $this->sites[] = StatusNet::currentSite();
-
-        // Set up handlers active for this site...
-        $this->initialize();
-
-        foreach ($this->activeGroups as $group) {
-            if (isset($this->groups[$group])) {
-                // Actual queues may be broken out or consolidated...
-                // Subscribe to all the target queues we'll need.
-                foreach ($this->groups[$group] as $transport => $class) {
-                    $target = $this->queueName($transport);
-                    $this->subscriptions[$target] = $target;
-                }
-            }
-        }
-    }
-
     /**
      * Optional; ping any running queue handler daemons with a notification
      * such as announcing a new site to handle or requesting clean shutdown.
@@ -166,14 +144,15 @@ class StompQueueManager extends QueueManager
 
         $con = $this->cons[$idx];
         $host = $con->getServer();
-        $result = $con->send($this->queueName($queue), $msg, $props);
+        $target = $this->queueName($queue);
+        $result = $con->send($target, $msg, $props);
 
         if (!$result) {
-            $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host");
+            $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host $target");
             return false;
         }
 
-        $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host");
+        $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host $target");
         $this->stats('enqueued', $queue);
         return true;
     }
@@ -432,11 +411,42 @@ class StompQueueManager extends QueueManager
     protected function doSubscribe(LiberalStomp $con)
     {
         $host = $con->getServer();
-        foreach ($this->subscriptions as $queue) {
-            $this->_log(LOG_INFO, "Subscribing to $queue on $host");
-            $con->subscribe($queue);
+        foreach ($this->subscriptions() as $sub) {
+            $this->_log(LOG_INFO, "Subscribing to $sub on $host");
+            $con->subscribe($sub);
         }
     }
+    
+    /**
+     * Grab a full list of stomp-side queue subscriptions.
+     * Will include:
+     *  - control broadcast channel
+     *  - shared group queues for active groups
+     *  - per-handler and per-site breakouts from $config['queue']['breakout']
+     *    that are rooted in the active groups.
+     *
+     * @return array of strings
+     */
+    protected function subscriptions()
+    {
+        $subs = array();
+        $subs[] = $this->control;
+
+        foreach ($this->activeGroups as $group) {
+            $subs[] = $this->base . $group;
+        }
+
+        foreach ($this->breakout as $spec) {
+            $parts = explode('/', $spec);
+            if (count($parts) < 2 || count($parts) > 3) {
+                common_log(LOG_ERR, "Bad queue breakout specifier $spec");
+            }
+            if (in_array($parts[0], $this->activeGroups)) {
+                $subs[] = $this->base . $spec;
+            }
+        }
+        return array_unique($subs);
+    }
 
     /**
      * Handle and acknowledge an event that's come in through a queue.
@@ -612,32 +622,26 @@ class StompQueueManager extends QueueManager
     }
 
     /**
-     * Set us up with queue subscriptions for a new site added at runtime,
+     * (Re)load runtime configuration for a given site by nickname,
      * triggered by a broadcast to the 'statusnet-control' topic.
      *
+     * Configuration changes in database should update, but config
+     * files might not.
+     *
      * @param array $frame Stomp frame
      * @return bool true to continue; false to stop further processing.
      */
     protected function updateSiteConfig($nickname)
     {
-        if (empty($this->sites)) {
-            if ($nickname == common_config('site', 'nickname')) {
-                StatusNet::init(common_config('site', 'server'));
-            } else {
-                $this->_log(LOG_INFO, "Ignoring update ping for other site $nickname");
+        $sn = Status_network::staticGet($nickname);
+        if ($sn) {
+            $this->switchSite($nickname);
+            if (!in_array($nickname, $this->sites)) {
+                $this->addSite();
             }
+            $this->stats('siteupdate');
         } else {
-            $sn = Status_network::staticGet($nickname);
-            if ($sn) {
-                $this->switchSite($nickname);
-                if (!in_array($nickname, $this->sites)) {
-                    $this->addSite();
-                }
-                // @fixme update subscriptions, if applicable
-                $this->stats('siteupdate');
-            } else {
-                $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
-            }
+            $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
         }
     }
 
@@ -646,24 +650,25 @@ class StompQueueManager extends QueueManager
      * group name for this queue to give eg:
      *
      * /queue/statusnet/main
+     * /queue/statusnet/main/distrib
+     * /queue/statusnet/xmpp/xmppout/site01
      *
      * @param string $queue
      * @return string
      */
     protected function queueName($queue)
     {
-        $base = common_config('queue', 'queue_basename');
         $group = $this->queueGroup($queue);
-        $breakout = $this->breakoutMode($queue);
-        if ($breakout == 'shared') {
-            return $base . "$group";
-        } else if ($breakout == 'handler') {
-            return $base . "$group/$queue";
-        } else if ($breakout == 'site') {
-            $site = StatusNet::currentSite();
-            return $base . "$group/$queue/$site";
-        }
-        throw Exception("Unrecognized queue breakout mode '$breakout' for '$queue'");
+        $site = StatusNet::currentSite();
+
+        $specs = array("$group/$queue/$site",
+                       "$group/$queue");
+        foreach ($specs as $spec) {
+            if (in_array($spec, $this->breakout)) {
+                return $this->base . $spec;
+            }
+        }
+        return $this->base . $group;
     }
 
     /**
index d372d898fa23f3d7a003973ac5e9edf4c36a20a0..6dba16f95388a03d9533bcc46a04af6514cfb139 100755 (executable)
@@ -126,8 +126,7 @@ class QueueDaemon extends SpawningDaemon
 class QueueMaster extends IoMaster
 {
     /**
-     * Initialize IoManagers for the currently configured site
-     * which are appropriate to this instance.
+     * Initialize IoManagers which are appropriate to this instance.
      */
     function initManagers()
     {