]> git.mxchange.org Git - quix0rs-gnu-social.git/blobdiff - lib/stompqueuemanager.php
Merge branch 'master' into testing
[quix0rs-gnu-social.git] / lib / stompqueuemanager.php
index bfeeb23b7fc88e1b9a3d27a0a9f531e066036555..9af8b2f4826b24714f5f557af56d2a560681adcd 100644 (file)
@@ -63,7 +63,7 @@ class StompQueueManager extends QueueManager
         $this->password = common_config('queue', 'stomp_password');
         $this->base     = common_config('queue', 'queue_basename');
         $this->control  = common_config('queue', 'control_channel');
-        $this->subscriptions = array($this->control => $this->control);
+        $this->breakout = common_config('queue', 'breakout');
     }
 
     /**
@@ -75,28 +75,6 @@ class StompQueueManager extends QueueManager
         return IoManager::INSTANCE_PER_PROCESS;
     }
 
-    /**
-     * Record queue subscriptions we'll need to handle the current site.
-     */
-    public function addSite()
-    {
-        $this->sites[] = StatusNet::currentSite();
-
-        // Set up handlers active for this site...
-        $this->initialize();
-
-        foreach ($this->activeGroups as $group) {
-            if (isset($this->groups[$group])) {
-                // Actual queues may be broken out or consolidated...
-                // Subscribe to all the target queues we'll need.
-                foreach ($this->groups[$group] as $transport => $class) {
-                    $target = $this->queueName($transport);
-                    $this->subscriptions[$target] = $target;
-                }
-            }
-        }
-    }
-
     /**
      * Optional; ping any running queue handler daemons with a notification
      * such as announcing a new site to handle or requesting clean shutdown.
@@ -166,14 +144,15 @@ class StompQueueManager extends QueueManager
 
         $con = $this->cons[$idx];
         $host = $con->getServer();
-        $result = $con->send($this->queueName($queue), $msg, $props);
+        $target = $this->queueName($queue);
+        $result = $con->send($target, $msg, $props);
 
         if (!$result) {
-            $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host");
+            $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host $target");
             return false;
         }
 
-        $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host");
+        $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host $target");
         $this->stats('enqueued', $queue);
         return true;
     }
@@ -432,11 +411,42 @@ class StompQueueManager extends QueueManager
     protected function doSubscribe(LiberalStomp $con)
     {
         $host = $con->getServer();
-        foreach ($this->subscriptions as $queue) {
-            $this->_log(LOG_INFO, "Subscribing to $queue on $host");
-            $con->subscribe($queue);
+        foreach ($this->subscriptions() as $sub) {
+            $this->_log(LOG_INFO, "Subscribing to $sub on $host");
+            $con->subscribe($sub);
         }
     }
+    
+    /**
+     * Grab a full list of stomp-side queue subscriptions.
+     * Will include:
+     *  - control broadcast channel
+     *  - shared group queues for active groups
+     *  - per-handler and per-site breakouts from $config['queue']['breakout']
+     *    that are rooted in the active groups.
+     *
+     * @return array of strings
+     */
+    protected function subscriptions()
+    {
+        $subs = array();
+        $subs[] = $this->control;
+
+        foreach ($this->activeGroups as $group) {
+            $subs[] = $this->base . $group;
+        }
+
+        foreach ($this->breakout as $spec) {
+            $parts = explode('/', $spec);
+            if (count($parts) < 2 || count($parts) > 3) {
+                common_log(LOG_ERR, "Bad queue breakout specifier $spec");
+            }
+            if (in_array($parts[0], $this->activeGroups)) {
+                $subs[] = $this->base . $spec;
+            }
+        }
+        return array_unique($subs);
+    }
 
     /**
      * Handle and acknowledge an event that's come in through a queue.
@@ -612,32 +622,26 @@ class StompQueueManager extends QueueManager
     }
 
     /**
-     * Set us up with queue subscriptions for a new site added at runtime,
+     * (Re)load runtime configuration for a given site by nickname,
      * triggered by a broadcast to the 'statusnet-control' topic.
      *
+     * Configuration changes in database should update, but config
+     * files might not.
+     *
      * @param array $frame Stomp frame
      * @return bool true to continue; false to stop further processing.
      */
     protected function updateSiteConfig($nickname)
     {
-        if (empty($this->sites)) {
-            if ($nickname == common_config('site', 'nickname')) {
-                StatusNet::init(common_config('site', 'server'));
-            } else {
-                $this->_log(LOG_INFO, "Ignoring update ping for other site $nickname");
+        $sn = Status_network::staticGet($nickname);
+        if ($sn) {
+            $this->switchSite($nickname);
+            if (!in_array($nickname, $this->sites)) {
+                $this->addSite();
             }
+            $this->stats('siteupdate');
         } else {
-            $sn = Status_network::staticGet($nickname);
-            if ($sn) {
-                $this->switchSite($nickname);
-                if (!in_array($nickname, $this->sites)) {
-                    $this->addSite();
-                }
-                // @fixme update subscriptions, if applicable
-                $this->stats('siteupdate');
-            } else {
-                $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
-            }
+            $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
         }
     }
 
@@ -646,24 +650,25 @@ class StompQueueManager extends QueueManager
      * group name for this queue to give eg:
      *
      * /queue/statusnet/main
+     * /queue/statusnet/main/distrib
+     * /queue/statusnet/xmpp/xmppout/site01
      *
      * @param string $queue
      * @return string
      */
     protected function queueName($queue)
     {
-        $base = common_config('queue', 'queue_basename');
         $group = $this->queueGroup($queue);
-        $breakout = $this->breakoutMode($queue);
-        if ($breakout == 'shared') {
-            return $base . "$group";
-        } else if ($breakout == 'handler') {
-            return $base . "$group/$queue";
-        } else if ($breakout == 'site') {
-            $site = StatusNet::currentSite();
-            return $base . "$group/$queue/$site";
-        }
-        throw Exception("Unrecognized queue breakout mode '$breakout' for '$queue'");
+        $site = StatusNet::currentSite();
+
+        $specs = array("$group/$queue/$site",
+                       "$group/$queue");
+        foreach ($specs as $spec) {
+            if (in_array($spec, $this->breakout)) {
+                return $this->base . $spec;
+            }
+        }
+        return $this->base . $group;
     }
 
     /**