X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=lib%2Fstompqueuemanager.php;h=b1afe176a158d6c861ebed6b6c7ba37f8a4746e8;hb=3290227b50582ed29790f0bb10210362ca2f4093;hp=fc98c77d4086cffea3a90076790c19f39f8bbced;hpb=718317542940a8976f2c6f2a9ea09a04f2f00ca6;p=quix0rs-gnu-social.git diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index fc98c77d40..b1afe176a1 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -151,7 +151,7 @@ class StompQueueManager extends QueueManager $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'), 'handler' => $queue, 'payload' => $this->encode($object)); - $msg = serialize($envelope); + $msg = base64_encode(serialize($envelope)); $props = array('created' => common_sql_now()); if ($this->isPersistent($queue)) { @@ -479,11 +479,18 @@ class StompQueueManager extends QueueManager protected function handleItem($frame) { $host = $this->cons[$this->defaultIdx]->getServer(); - $message = unserialize($frame->body); + $message = unserialize(base64_decode($frame->body)); + + if ($message === false) { + $this->_log(LOG_ERR, "Can't unserialize frame: {$frame->body}"); + $this->_log(LOG_ERR, "Unserializable frame length: " . strlen($frame->body)); + return false; + } + $site = $message['site']; $queue = $message['handler']; - if ($this->isDeadletter($frame, $message)) { + if ($this->isDeadLetter($frame, $message)) { $this->stats('deadletter', $queue); return false; } @@ -491,8 +498,9 @@ class StompQueueManager extends QueueManager // @fixme detect failing site switches $this->switchSite($site); - $item = $this->decode($message['payload']); - if (empty($item)) { + try { + $item = $this->decode($message['payload']); + } catch (Exception $e) { $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host"); $this->stats('baditem', $queue); return false; @@ -501,15 +509,13 @@ class StompQueueManager extends QueueManager $frame->headers['created'] . " in queue $queue from $host"; $this->_log(LOG_DEBUG, "Dequeued $info"); - $handler = $this->getHandler($queue); - if (!$handler) { + try { + $handler = $this->getHandler($queue); + $ok = $handler->handle($item); + } catch (NoQueueHandlerException $e) { $this->_log(LOG_ERR, "Missing handler class; skipping $info"); $this->stats('badhandler', $queue); return false; - } - - try { - $ok = $handler->handle($item); } catch (Exception $e) { $this->_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage()); $ok = false; @@ -578,7 +584,7 @@ class StompQueueManager extends QueueManager function incDeliveryCount($msgId) { $count = 0; - $cache = common_memcache(); + $cache = Cache::instance(); if ($cache) { $key = 'statusnet:stomp:message-retries:' . $msgId; $count = $cache->increment($key); @@ -630,9 +636,9 @@ class StompQueueManager extends QueueManager */ function switchSite($site) { - if ($site != StatusNet::currentSite()) { + if ($site != GNUsocial::currentSite()) { $this->stats('switch'); - StatusNet::switchSite($site); + GNUsocial::switchSite($site); $this->initialize(); } } @@ -649,7 +655,7 @@ class StompQueueManager extends QueueManager */ protected function updateSiteConfig($nickname) { - $sn = Status_network::staticGet('nickname', $nickname); + $sn = Status_network::getKV('nickname', $nickname); if ($sn) { $this->switchSite($nickname); if (!in_array($nickname, $this->sites)) { @@ -675,7 +681,7 @@ class StompQueueManager extends QueueManager protected function queueName($queue) { $group = $this->queueGroup($queue); - $site = StatusNet::currentSite(); + $site = GNUsocial::currentSite(); $specs = array("$group/$queue/$site", "$group/$queue");