]> git.mxchange.org Git - quix0rs-gnu-social.git/blobdiff - lib/stompqueuemanager.php
Merge branch 'mention_branch' into 'nightly'
[quix0rs-gnu-social.git] / lib / stompqueuemanager.php
index bfeeb23b7fc88e1b9a3d27a0a9f531e066036555..7c3af4945e2b1db7eb039a47bf1b007961a789a3 100644 (file)
@@ -39,7 +39,8 @@ class StompQueueManager extends QueueManager
     protected $base;
     protected $control;
 
-    protected $useTransactions = true;
+    protected $useTransactions;
+    protected $useAcks;
 
     protected $sites = array();
     protected $subscriptions = array();
@@ -59,11 +60,13 @@ class StompQueueManager extends QueueManager
         } else {
             $this->servers = array($server);
         }
-        $this->username = common_config('queue', 'stomp_username');
-        $this->password = common_config('queue', 'stomp_password');
-        $this->base     = common_config('queue', 'queue_basename');
-        $this->control  = common_config('queue', 'control_channel');
-        $this->subscriptions = array($this->control => $this->control);
+        $this->username        = common_config('queue', 'stomp_username');
+        $this->password        = common_config('queue', 'stomp_password');
+        $this->base            = common_config('queue', 'queue_basename');
+        $this->control         = common_config('queue', 'control_channel');
+        $this->breakout        = common_config('queue', 'breakout');
+        $this->useTransactions = common_config('queue', 'stomp_transactions');
+        $this->useAcks         = common_config('queue', 'stomp_acks');
     }
 
     /**
@@ -75,28 +78,6 @@ class StompQueueManager extends QueueManager
         return IoManager::INSTANCE_PER_PROCESS;
     }
 
-    /**
-     * Record queue subscriptions we'll need to handle the current site.
-     */
-    public function addSite()
-    {
-        $this->sites[] = StatusNet::currentSite();
-
-        // Set up handlers active for this site...
-        $this->initialize();
-
-        foreach ($this->activeGroups as $group) {
-            if (isset($this->groups[$group])) {
-                // Actual queues may be broken out or consolidated...
-                // Subscribe to all the target queues we'll need.
-                foreach ($this->groups[$group] as $transport => $class) {
-                    $target = $this->queueName($transport);
-                    $this->subscriptions[$target] = $target;
-                }
-            }
-        }
-    }
-
     /**
      * Optional; ping any running queue handler daemons with a notification
      * such as announcing a new site to handle or requesting clean shutdown.
@@ -134,14 +115,27 @@ class StompQueueManager extends QueueManager
      *
      * @param mixed $object
      * @param string $queue
+     * @param string $siteNickname optional override to drop into another site's queue
      *
      * @return boolean true on success
      * @throws StompException on connection or send error
      */
-    public function enqueue($object, $queue)
+    public function enqueue($object, $queue, $siteNickname=null)
     {
         $this->_connect();
-        return $this->_doEnqueue($object, $queue, $this->defaultIdx);
+        if (common_config('queue', 'stomp_enqueue_on')) {
+            // We're trying to force all writes to a single server.
+            // WARNING: this might do odd things if that server connection dies.
+            $idx = array_search(common_config('queue', 'stomp_enqueue_on'),
+                                $this->servers);
+            if ($idx === false) {
+                common_log(LOG_ERR, 'queue stomp_enqueue_on setting does not match our server list.');
+                $idx = $this->defaultIdx;
+            }
+        } else {
+            $idx = $this->defaultIdx;
+        }
+        return $this->_doEnqueue($object, $queue, $idx, $siteNickname);
     }
 
     /**
@@ -151,10 +145,10 @@ class StompQueueManager extends QueueManager
      * @return boolean true on success
      * @throws StompException on connection or send error
      */
-    protected function _doEnqueue($object, $queue, $idx)
+    protected function _doEnqueue($object, $queue, $idx, $siteNickname=null)
     {
         $rep = $this->logrep($object);
-        $envelope = array('site' => common_config('site', 'nickname'),
+        $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'),
                           'handler' => $queue,
                           'payload' => $this->encode($object));
         $msg = serialize($envelope);
@@ -166,14 +160,15 @@ class StompQueueManager extends QueueManager
 
         $con = $this->cons[$idx];
         $host = $con->getServer();
-        $result = $con->send($this->queueName($queue), $msg, $props);
+        $target = $this->queueName($queue);
+        $result = $con->send($target, $msg, $props);
 
         if (!$result) {
-            $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host");
+            $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host $target");
             return false;
         }
 
-        $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host");
+        $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host $target");
         $this->stats('enqueued', $queue);
         return true;
     }
@@ -432,10 +427,41 @@ class StompQueueManager extends QueueManager
     protected function doSubscribe(LiberalStomp $con)
     {
         $host = $con->getServer();
-        foreach ($this->subscriptions as $queue) {
-            $this->_log(LOG_INFO, "Subscribing to $queue on $host");
-            $con->subscribe($queue);
+        foreach ($this->subscriptions() as $sub) {
+            $this->_log(LOG_INFO, "Subscribing to $sub on $host");
+            $con->subscribe($sub);
+        }
+    }
+    
+    /**
+     * Grab a full list of stomp-side queue subscriptions.
+     * Will include:
+     *  - control broadcast channel
+     *  - shared group queues for active groups
+     *  - per-handler and per-site breakouts from $config['queue']['breakout']
+     *    that are rooted in the active groups.
+     *
+     * @return array of strings
+     */
+    protected function subscriptions()
+    {
+        $subs = array();
+        $subs[] = $this->control;
+
+        foreach ($this->activeGroups as $group) {
+            $subs[] = $this->base . $group;
+        }
+
+        foreach ($this->breakout as $spec) {
+            $parts = explode('/', $spec);
+            if (count($parts) < 2 || count($parts) > 3) {
+                common_log(LOG_ERR, "Bad queue breakout specifier $spec");
+            }
+            if (in_array($parts[0], $this->activeGroups)) {
+                $subs[] = $this->base . $spec;
+            }
         }
+        return array_unique($subs);
     }
 
     /**
@@ -454,6 +480,13 @@ class StompQueueManager extends QueueManager
     {
         $host = $this->cons[$this->defaultIdx]->getServer();
         $message = unserialize($frame->body);
+
+        if ($message === false) {
+            $this->_log(LOG_ERR, "Can't unserialize frame: {$frame->body}");
+            $this->_log(LOG_ERR, "Unserializable frame length: " . strlen($frame->body));
+            return false;
+        }
+
         $site = $message['site'];
         $queue = $message['handler'];
 
@@ -465,8 +498,9 @@ class StompQueueManager extends QueueManager
         // @fixme detect failing site switches
         $this->switchSite($site);
 
-        $item = $this->decode($message['payload']);
-        if (empty($item)) {
+        try {
+            $item = $this->decode($message['payload']);
+        } catch (Exception $e) {
             $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host");
             $this->stats('baditem', $queue);
             return false;
@@ -475,15 +509,13 @@ class StompQueueManager extends QueueManager
                 $frame->headers['created'] . " in queue $queue from $host";
         $this->_log(LOG_DEBUG, "Dequeued $info");
 
-        $handler = $this->getHandler($queue);
-        if (!$handler) {
+        try {
+            $handler = $this->getHandler($queue);
+            $ok = $handler->handle($item);
+        } catch (NoQueueHandlerException $e) {
             $this->_log(LOG_ERR, "Missing handler class; skipping $info");
             $this->stats('badhandler', $queue);
             return false;
-        }
-
-        try {
-            $ok = $handler->handle($item);
         } catch (Exception $e) {
             $this->_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage());
             $ok = false;
@@ -552,7 +584,7 @@ class StompQueueManager extends QueueManager
     function incDeliveryCount($msgId)
     {
            $count = 0;
-           $cache = common_memcache();
+           $cache = Cache::instance();
            if ($cache) {
                    $key = 'statusnet:stomp:message-retries:' . $msgId;
                    $count = $cache->increment($key);
@@ -604,40 +636,34 @@ class StompQueueManager extends QueueManager
      */
     function switchSite($site)
     {
-        if ($site != StatusNet::currentSite()) {
+        if ($site != GNUsocial::currentSite()) {
             $this->stats('switch');
-            StatusNet::switchSite($site);
+            GNUsocial::switchSite($site);
             $this->initialize();
         }
     }
 
     /**
-     * Set us up with queue subscriptions for a new site added at runtime,
+     * (Re)load runtime configuration for a given site by nickname,
      * triggered by a broadcast to the 'statusnet-control' topic.
      *
+     * Configuration changes in database should update, but config
+     * files might not.
+     *
      * @param array $frame Stomp frame
      * @return bool true to continue; false to stop further processing.
      */
     protected function updateSiteConfig($nickname)
     {
-        if (empty($this->sites)) {
-            if ($nickname == common_config('site', 'nickname')) {
-                StatusNet::init(common_config('site', 'server'));
-            } else {
-                $this->_log(LOG_INFO, "Ignoring update ping for other site $nickname");
+        $sn = Status_network::getKV('nickname', $nickname);
+        if ($sn) {
+            $this->switchSite($nickname);
+            if (!in_array($nickname, $this->sites)) {
+                $this->addSite();
             }
+            $this->stats('siteupdate');
         } else {
-            $sn = Status_network::staticGet($nickname);
-            if ($sn) {
-                $this->switchSite($nickname);
-                if (!in_array($nickname, $this->sites)) {
-                    $this->addSite();
-                }
-                // @fixme update subscriptions, if applicable
-                $this->stats('siteupdate');
-            } else {
-                $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
-            }
+            $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
         }
     }
 
@@ -646,24 +672,25 @@ class StompQueueManager extends QueueManager
      * group name for this queue to give eg:
      *
      * /queue/statusnet/main
+     * /queue/statusnet/main/distrib
+     * /queue/statusnet/xmpp/xmppout/site01
      *
      * @param string $queue
      * @return string
      */
     protected function queueName($queue)
     {
-        $base = common_config('queue', 'queue_basename');
         $group = $this->queueGroup($queue);
-        $breakout = $this->breakoutMode($queue);
-        if ($breakout == 'shared') {
-            return $base . "$group";
-        } else if ($breakout == 'handler') {
-            return $base . "$group/$queue";
-        } else if ($breakout == 'site') {
-            $site = StatusNet::currentSite();
-            return $base . "$group/$queue/$site";
-        }
-        throw Exception("Unrecognized queue breakout mode '$breakout' for '$queue'");
+        $site = GNUsocial::currentSite();
+
+        $specs = array("$group/$queue/$site",
+                       "$group/$queue");
+        foreach ($specs as $spec) {
+            if (in_array($spec, $this->breakout)) {
+                return $this->base . $spec;
+            }
+        }
+        return $this->base . $group;
     }
 
     /**
@@ -698,13 +725,15 @@ class StompQueueManager extends QueueManager
 
     protected function ack($idx, $frame)
     {
-        if ($this->useTransactions) {
-            if (empty($this->transaction[$idx])) {
-                throw new Exception("Tried to ack but not in a transaction");
+        if ($this->useAcks) {
+            if ($this->useTransactions) {
+                if (empty($this->transaction[$idx])) {
+                    throw new Exception("Tried to ack but not in a transaction");
+                }
+                $this->cons[$idx]->ack($frame, $this->transaction[$idx]);
+            } else {
+                $this->cons[$idx]->ack($frame);
             }
-            $this->cons[$idx]->ack($frame, $this->transaction[$idx]);
-        } else {
-            $this->cons[$idx]->ack($frame);
         }
     }