*
* @param mixed $object
* @param string $queue
+ * @param string $siteNickname optional override to drop into another site's queue
*
* @return boolean true on success
* @throws StompException on connection or send error
*/
- public function enqueue($object, $queue)
+ public function enqueue($object, $queue, $siteNickname=null)
{
$this->_connect();
- return $this->_doEnqueue($object, $queue, $this->defaultIdx);
+ if (common_config('queue', 'stomp_enqueue_on')) {
+ // We're trying to force all writes to a single server.
+ // WARNING: this might do odd things if that server connection dies.
+ $idx = array_search(common_config('queue', 'stomp_enqueue_on'),
+ $this->servers);
+ if ($idx === false) {
+ common_log(LOG_ERR, 'queue stomp_enqueue_on setting does not match our server list.');
+ $idx = $this->defaultIdx;
+ }
+ } else {
+ $idx = $this->defaultIdx;
+ }
+ return $this->_doEnqueue($object, $queue, $idx, $siteNickname);
}
/**
* @return boolean true on success
* @throws StompException on connection or send error
*/
- protected function _doEnqueue($object, $queue, $idx)
+ protected function _doEnqueue($object, $queue, $idx, $siteNickname=null)
{
$rep = $this->logrep($object);
- $envelope = array('site' => common_config('site', 'nickname'),
+ $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'),
'handler' => $queue,
'payload' => $this->encode($object));
$msg = serialize($envelope);
{
$host = $this->cons[$this->defaultIdx]->getServer();
$message = unserialize($frame->body);
+
+ if ($message === false) {
+ $this->_log(LOG_ERR, "Can't unserialize frame: {$frame->body}");
+ $this->_log(LOG_ERR, "Unserializable frame length: " . strlen($frame->body));
+ return false;
+ }
+
$site = $message['site'];
$queue = $message['handler'];
function incDeliveryCount($msgId)
{
$count = 0;
- $cache = common_memcache();
+ $cache = Cache::instance();
if ($cache) {
$key = 'statusnet:stomp:message-retries:' . $msgId;
$count = $cache->increment($key);
*/
protected function updateSiteConfig($nickname)
{
- $sn = Status_network::staticGet($nickname);
+ $sn = Status_network::staticGet('nickname', $nickname);
if ($sn) {
$this->switchSite($nickname);
if (!in_array($nickname, $this->sites)) {