]> git.mxchange.org Git - quix0rs-gnu-social.git/blobdiff - lib/stompqueuemanager.php
Merge branch 'ssltweak'
[quix0rs-gnu-social.git] / lib / stompqueuemanager.php
index 9af8b2f4826b24714f5f557af56d2a560681adcd..fc98c77d4086cffea3a90076790c19f39f8bbced 100644 (file)
@@ -39,7 +39,8 @@ class StompQueueManager extends QueueManager
     protected $base;
     protected $control;
 
-    protected $useTransactions = true;
+    protected $useTransactions;
+    protected $useAcks;
 
     protected $sites = array();
     protected $subscriptions = array();
@@ -59,11 +60,13 @@ class StompQueueManager extends QueueManager
         } else {
             $this->servers = array($server);
         }
-        $this->username = common_config('queue', 'stomp_username');
-        $this->password = common_config('queue', 'stomp_password');
-        $this->base     = common_config('queue', 'queue_basename');
-        $this->control  = common_config('queue', 'control_channel');
-        $this->breakout = common_config('queue', 'breakout');
+        $this->username        = common_config('queue', 'stomp_username');
+        $this->password        = common_config('queue', 'stomp_password');
+        $this->base            = common_config('queue', 'queue_basename');
+        $this->control         = common_config('queue', 'control_channel');
+        $this->breakout        = common_config('queue', 'breakout');
+        $this->useTransactions = common_config('queue', 'stomp_transactions');
+        $this->useAcks         = common_config('queue', 'stomp_acks');
     }
 
     /**
@@ -112,14 +115,27 @@ class StompQueueManager extends QueueManager
      *
      * @param mixed $object
      * @param string $queue
+     * @param string $siteNickname optional override to drop into another site's queue
      *
      * @return boolean true on success
      * @throws StompException on connection or send error
      */
-    public function enqueue($object, $queue)
+    public function enqueue($object, $queue, $siteNickname=null)
     {
         $this->_connect();
-        return $this->_doEnqueue($object, $queue, $this->defaultIdx);
+        if (common_config('queue', 'stomp_enqueue_on')) {
+            // We're trying to force all writes to a single server.
+            // WARNING: this might do odd things if that server connection dies.
+            $idx = array_search(common_config('queue', 'stomp_enqueue_on'),
+                                $this->servers);
+            if ($idx === false) {
+                common_log(LOG_ERR, 'queue stomp_enqueue_on setting does not match our server list.');
+                $idx = $this->defaultIdx;
+            }
+        } else {
+            $idx = $this->defaultIdx;
+        }
+        return $this->_doEnqueue($object, $queue, $idx, $siteNickname);
     }
 
     /**
@@ -129,10 +145,10 @@ class StompQueueManager extends QueueManager
      * @return boolean true on success
      * @throws StompException on connection or send error
      */
-    protected function _doEnqueue($object, $queue, $idx)
+    protected function _doEnqueue($object, $queue, $idx, $siteNickname=null)
     {
         $rep = $this->logrep($object);
-        $envelope = array('site' => common_config('site', 'nickname'),
+        $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'),
                           'handler' => $queue,
                           'payload' => $this->encode($object));
         $msg = serialize($envelope);
@@ -633,7 +649,7 @@ class StompQueueManager extends QueueManager
      */
     protected function updateSiteConfig($nickname)
     {
-        $sn = Status_network::staticGet($nickname);
+        $sn = Status_network::staticGet('nickname', $nickname);
         if ($sn) {
             $this->switchSite($nickname);
             if (!in_array($nickname, $this->sites)) {
@@ -703,13 +719,15 @@ class StompQueueManager extends QueueManager
 
     protected function ack($idx, $frame)
     {
-        if ($this->useTransactions) {
-            if (empty($this->transaction[$idx])) {
-                throw new Exception("Tried to ack but not in a transaction");
+        if ($this->useAcks) {
+            if ($this->useTransactions) {
+                if (empty($this->transaction[$idx])) {
+                    throw new Exception("Tried to ack but not in a transaction");
+                }
+                $this->cons[$idx]->ack($frame, $this->transaction[$idx]);
+            } else {
+                $this->cons[$idx]->ack($frame);
             }
-            $this->cons[$idx]->ack($frame, $this->transaction[$idx]);
-        } else {
-            $this->cons[$idx]->ack($frame);
         }
     }