X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=lib%2Fstompqueuemanager.php;h=1d9a5ad207ef4846aa7e8caf05d08dc9054fd1d1;hb=8cdf8cf33e8b98d8d49dc03777dd5c39fb28c4d6;hp=9af8b2f4826b24714f5f557af56d2a560681adcd;hpb=85528ccb1f5840a8c73f6c939f12d98bb3680cde;p=quix0rs-gnu-social.git diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 9af8b2f482..1d9a5ad207 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -39,7 +39,8 @@ class StompQueueManager extends QueueManager protected $base; protected $control; - protected $useTransactions = true; + protected $useTransactions; + protected $useAcks; protected $sites = array(); protected $subscriptions = array(); @@ -59,11 +60,13 @@ class StompQueueManager extends QueueManager } else { $this->servers = array($server); } - $this->username = common_config('queue', 'stomp_username'); - $this->password = common_config('queue', 'stomp_password'); - $this->base = common_config('queue', 'queue_basename'); - $this->control = common_config('queue', 'control_channel'); - $this->breakout = common_config('queue', 'breakout'); + $this->username = common_config('queue', 'stomp_username'); + $this->password = common_config('queue', 'stomp_password'); + $this->base = common_config('queue', 'queue_basename'); + $this->control = common_config('queue', 'control_channel'); + $this->breakout = common_config('queue', 'breakout'); + $this->useTransactions = common_config('queue', 'stomp_transactions'); + $this->useAcks = common_config('queue', 'stomp_acks'); } /** @@ -112,14 +115,27 @@ class StompQueueManager extends QueueManager * * @param mixed $object * @param string $queue + * @param string $siteNickname optional override to drop into another site's queue * * @return boolean true on success * @throws StompException on connection or send error */ - public function enqueue($object, $queue) + public function enqueue($object, $queue, $siteNickname=null) { $this->_connect(); - return $this->_doEnqueue($object, $queue, $this->defaultIdx); + if (common_config('queue', 'stomp_enqueue_on')) { + // We're trying to force all writes to a single server. + // WARNING: this might do odd things if that server connection dies. + $idx = array_search(common_config('queue', 'stomp_enqueue_on'), + $this->servers); + if ($idx === false) { + common_log(LOG_ERR, 'queue stomp_enqueue_on setting does not match our server list.'); + $idx = $this->defaultIdx; + } + } else { + $idx = $this->defaultIdx; + } + return $this->_doEnqueue($object, $queue, $idx, $siteNickname); } /** @@ -129,10 +145,10 @@ class StompQueueManager extends QueueManager * @return boolean true on success * @throws StompException on connection or send error */ - protected function _doEnqueue($object, $queue, $idx) + protected function _doEnqueue($object, $queue, $idx, $siteNickname=null) { $rep = $this->logrep($object); - $envelope = array('site' => common_config('site', 'nickname'), + $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'), 'handler' => $queue, 'payload' => $this->encode($object)); $msg = serialize($envelope); @@ -562,7 +578,7 @@ class StompQueueManager extends QueueManager function incDeliveryCount($msgId) { $count = 0; - $cache = common_memcache(); + $cache = Cache::instance(); if ($cache) { $key = 'statusnet:stomp:message-retries:' . $msgId; $count = $cache->increment($key); @@ -633,7 +649,7 @@ class StompQueueManager extends QueueManager */ protected function updateSiteConfig($nickname) { - $sn = Status_network::staticGet($nickname); + $sn = Status_network::staticGet('nickname', $nickname); if ($sn) { $this->switchSite($nickname); if (!in_array($nickname, $this->sites)) { @@ -703,13 +719,15 @@ class StompQueueManager extends QueueManager protected function ack($idx, $frame) { - if ($this->useTransactions) { - if (empty($this->transaction[$idx])) { - throw new Exception("Tried to ack but not in a transaction"); + if ($this->useAcks) { + if ($this->useTransactions) { + if (empty($this->transaction[$idx])) { + throw new Exception("Tried to ack but not in a transaction"); + } + $this->cons[$idx]->ack($frame, $this->transaction[$idx]); + } else { + $this->cons[$idx]->ack($frame); } - $this->cons[$idx]->ack($frame, $this->transaction[$idx]); - } else { - $this->cons[$idx]->ack($frame); } }