]> git.mxchange.org Git - quix0rs-gnu-social.git/blobdiff - lib/stompqueuemanager.php
Merge commit 'refs/merge-requests/41' of https://gitorious.org/social/mainline into...
[quix0rs-gnu-social.git] / lib / stompqueuemanager.php
index de4ba7f01fdce59b8ebfa3b94bbb68f4390db7f9..25a8e2a27ebaef546bfa6ea7017c83dd7583a709 100644 (file)
@@ -115,11 +115,12 @@ class StompQueueManager extends QueueManager
      *
      * @param mixed $object
      * @param string $queue
+     * @param string $siteNickname optional override to drop into another site's queue
      *
      * @return boolean true on success
      * @throws StompException on connection or send error
      */
-    public function enqueue($object, $queue)
+    public function enqueue($object, $queue, $siteNickname=null)
     {
         $this->_connect();
         if (common_config('queue', 'stomp_enqueue_on')) {
@@ -134,7 +135,7 @@ class StompQueueManager extends QueueManager
         } else {
             $idx = $this->defaultIdx;
         }
-        return $this->_doEnqueue($object, $queue, $idx);
+        return $this->_doEnqueue($object, $queue, $idx, $siteNickname);
     }
 
     /**
@@ -144,10 +145,10 @@ class StompQueueManager extends QueueManager
      * @return boolean true on success
      * @throws StompException on connection or send error
      */
-    protected function _doEnqueue($object, $queue, $idx)
+    protected function _doEnqueue($object, $queue, $idx, $siteNickname=null)
     {
         $rep = $this->logrep($object);
-        $envelope = array('site' => common_config('site', 'nickname'),
+        $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'),
                           'handler' => $queue,
                           'payload' => $this->encode($object));
         $msg = serialize($envelope);
@@ -479,6 +480,13 @@ class StompQueueManager extends QueueManager
     {
         $host = $this->cons[$this->defaultIdx]->getServer();
         $message = unserialize($frame->body);
+
+        if ($message === false) {
+            $this->_log(LOG_ERR, "Can't unserialize frame: {$frame->body}");
+            $this->_log(LOG_ERR, "Unserializable frame length: " . strlen($frame->body));
+            return false;
+        }
+
         $site = $message['site'];
         $queue = $message['handler'];
 
@@ -490,8 +498,9 @@ class StompQueueManager extends QueueManager
         // @fixme detect failing site switches
         $this->switchSite($site);
 
-        $item = $this->decode($message['payload']);
-        if (empty($item)) {
+        try {
+            $item = $this->decode($message['payload']);
+        } catch (Exception $e) {
             $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host");
             $this->stats('baditem', $queue);
             return false;
@@ -577,7 +586,7 @@ class StompQueueManager extends QueueManager
     function incDeliveryCount($msgId)
     {
            $count = 0;
-           $cache = common_memcache();
+           $cache = Cache::instance();
            if ($cache) {
                    $key = 'statusnet:stomp:message-retries:' . $msgId;
                    $count = $cache->increment($key);
@@ -648,7 +657,7 @@ class StompQueueManager extends QueueManager
      */
     protected function updateSiteConfig($nickname)
     {
-        $sn = Status_network::staticGet($nickname);
+        $sn = Status_network::getKV('nickname', $nickname);
         if ($sn) {
             $this->switchSite($nickname);
             if (!in_array($nickname, $this->sites)) {