*
* @param mixed $object
* @param string $queue
+ * @param string $siteNickname optional override to drop into another site's queue
*
* @return boolean true on success
* @throws StompException on connection or send error
*/
- public function enqueue($object, $queue)
+ public function enqueue($object, $queue, $siteNickname=null)
{
$this->_connect();
if (common_config('queue', 'stomp_enqueue_on')) {
} else {
$idx = $this->defaultIdx;
}
- return $this->_doEnqueue($object, $queue, $idx);
+ return $this->_doEnqueue($object, $queue, $idx, $siteNickname);
}
/**
* @return boolean true on success
* @throws StompException on connection or send error
*/
- protected function _doEnqueue($object, $queue, $idx)
+ protected function _doEnqueue($object, $queue, $idx, $siteNickname=null)
{
$rep = $this->logrep($object);
- $envelope = array('site' => common_config('site', 'nickname'),
+ $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'),
'handler' => $queue,
'payload' => $this->encode($object));
$msg = serialize($envelope);
{
$host = $this->cons[$this->defaultIdx]->getServer();
$message = unserialize($frame->body);
+
+ if ($message === false) {
+ $this->_log(LOG_ERR, "Can't unserialize frame: {$frame->body}");
+ $this->_log(LOG_ERR, "Unserializable frame length: " . strlen($frame->body));
+ return false;
+ }
+
$site = $message['site'];
$queue = $message['handler'];
// @fixme detect failing site switches
$this->switchSite($site);
- $item = $this->decode($message['payload']);
- if (empty($item)) {
+ try {
+ $item = $this->decode($message['payload']);
+ } catch (Exception $e) {
$this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host");
$this->stats('baditem', $queue);
return false;
function incDeliveryCount($msgId)
{
$count = 0;
- $cache = common_memcache();
+ $cache = Cache::instance();
if ($cache) {
$key = 'statusnet:stomp:message-retries:' . $msgId;
$count = $cache->increment($key);
*/
protected function updateSiteConfig($nickname)
{
- $sn = Status_network::staticGet($nickname);
+ $sn = Status_network::getKV('nickname', $nickname);
if ($sn) {
$this->switchSite($nickname);
if (!in_array($nickname, $this->sites)) {