X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=lib%2Fstompqueuemanager.php;h=4084470e211f9cc0c656c18512f637bf6cd48e19;hb=1f890f03c885ce0761be80e69e678cce5f9ee46a;hp=de4ba7f01fdce59b8ebfa3b94bbb68f4390db7f9;hpb=ef51cc9ad4a37d6dcd2129828284def1c90d7402;p=quix0rs-gnu-social.git diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index de4ba7f01f..4084470e21 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -115,11 +115,12 @@ class StompQueueManager extends QueueManager * * @param mixed $object * @param string $queue + * @param string $siteNickname optional override to drop into another site's queue * * @return boolean true on success * @throws StompException on connection or send error */ - public function enqueue($object, $queue) + public function enqueue($object, $queue, $siteNickname=null) { $this->_connect(); if (common_config('queue', 'stomp_enqueue_on')) { @@ -134,7 +135,7 @@ class StompQueueManager extends QueueManager } else { $idx = $this->defaultIdx; } - return $this->_doEnqueue($object, $queue, $idx); + return $this->_doEnqueue($object, $queue, $idx, $siteNickname); } /** @@ -144,10 +145,10 @@ class StompQueueManager extends QueueManager * @return boolean true on success * @throws StompException on connection or send error */ - protected function _doEnqueue($object, $queue, $idx) + protected function _doEnqueue($object, $queue, $idx, $siteNickname=null) { $rep = $this->logrep($object); - $envelope = array('site' => common_config('site', 'nickname'), + $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'), 'handler' => $queue, 'payload' => $this->encode($object)); $msg = serialize($envelope); @@ -479,6 +480,13 @@ class StompQueueManager extends QueueManager { $host = $this->cons[$this->defaultIdx]->getServer(); $message = unserialize($frame->body); + + if ($message === false) { + $this->_log(LOG_ERR, "Can't unserialize frame: {$frame->body}"); + $this->_log(LOG_ERR, "Unserializable frame length: " . strlen($frame->body)); + return false; + } + $site = $message['site']; $queue = $message['handler']; @@ -490,8 +498,9 @@ class StompQueueManager extends QueueManager // @fixme detect failing site switches $this->switchSite($site); - $item = $this->decode($message['payload']); - if (empty($item)) { + try { + $item = $this->decode($message['payload']); + } catch (Exception $e) { $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host"); $this->stats('baditem', $queue); return false; @@ -577,7 +586,7 @@ class StompQueueManager extends QueueManager function incDeliveryCount($msgId) { $count = 0; - $cache = common_memcache(); + $cache = Cache::instance(); if ($cache) { $key = 'statusnet:stomp:message-retries:' . $msgId; $count = $cache->increment($key); @@ -629,9 +638,9 @@ class StompQueueManager extends QueueManager */ function switchSite($site) { - if ($site != StatusNet::currentSite()) { + if ($site != GNUsocial::currentSite()) { $this->stats('switch'); - StatusNet::switchSite($site); + GNUsocial::switchSite($site); $this->initialize(); } } @@ -648,7 +657,7 @@ class StompQueueManager extends QueueManager */ protected function updateSiteConfig($nickname) { - $sn = Status_network::staticGet($nickname); + $sn = Status_network::getKV('nickname', $nickname); if ($sn) { $this->switchSite($nickname); if (!in_array($nickname, $this->sites)) { @@ -674,7 +683,7 @@ class StompQueueManager extends QueueManager protected function queueName($queue) { $group = $this->queueGroup($queue); - $site = StatusNet::currentSite(); + $site = GNUsocial::currentSite(); $specs = array("$group/$queue/$site", "$group/$queue");