]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
Fix for stuck queue messages: wrap processing in stomp transactions so our lack of...
authorBrion Vibber <brion@pobox.com>
Fri, 22 Jan 2010 20:35:05 +0000 (12:35 -0800)
committerBrion Vibber <brion@pobox.com>
Fri, 22 Jan 2010 20:52:56 +0000 (12:52 -0800)
Previously, messages once delivered would just get stuck in the queue seemingly forever if they never got ACKed.
Note this could lead to partial duplication, for instance if the OMB or Twitter queue handlers die after 1/2 of the outgoing sends.

Recommendations:
* catch exceptions more aggressively within queue handlers (so only PHP fatal errors are likely to kill in the middle)
* for processing that involves sending to multiple clients, consider a second queue similar to the XMPP output, eg for OMB:
 - first queue gets delivery list and builds message data, enqueueing it for each target address
 - second queue can handle each individual outgoing message (and attempt redelivery etc separately)

This would also protect better against a recurring error preventing delivery in the second part, and could spread out any slow sends over multiple threads.

lib/stompqueuemanager.php

index f057bd9e41718bda776bbf9ba48caf1eac4f2af7..8f0091a1384f51b1cf07ad6c4bddf7dfbd920f03 100644 (file)
@@ -41,6 +41,10 @@ class StompQueueManager extends QueueManager
     
     protected $sites = array();
 
+    protected $useTransactions = true;
+    protected $transaction = null;
+    protected $transactionCount = 0;
+
     function __construct()
     {
         parent::__construct();
@@ -201,6 +205,7 @@ class StompQueueManager extends QueueManager
         } else {
             $this->doSubscribe();
         }
+        $this->begin();
         return true;
     }
     
@@ -213,6 +218,9 @@ class StompQueueManager extends QueueManager
      */
     public function finish()
     {
+        // If there are any outstanding delivered messages we haven't processed,
+        // free them for another thread to take.
+        $this->rollback();
         if ($this->sites) {
             foreach ($this->sites as $server) {
                 StatusNet::init($server);
@@ -293,7 +301,9 @@ class StompQueueManager extends QueueManager
             $notice = Notice::staticGet('id', $id);
             if (empty($notice)) {
                 $this->_log(LOG_WARNING, "Skipping missing $info");
-                $this->con->ack($frame);
+                $this->ack($frame);
+                $this->commit();
+                $this->begin();
                 $this->stats('badnotice', $queue);
                 return false;
             }
@@ -308,7 +318,9 @@ class StompQueueManager extends QueueManager
         $handler = $this->getHandler($queue);
         if (!$handler) {
             $this->_log(LOG_ERROR, "Missing handler class; skipping $info");
-            $this->con->ack($frame);
+            $this->ack($frame);
+            $this->commit();
+            $this->begin();
             $this->stats('badhandler', $queue);
             return false;
         }
@@ -320,14 +332,18 @@ class StompQueueManager extends QueueManager
             // FIXME we probably shouldn't have to do
             // this kind of queue management ourselves;
             // if we don't ack, it should resend...
-            $this->con->ack($frame);
+            $this->ack($frame);
             $this->enqueue($item, $queue);
+            $this->commit();
+            $this->begin();
             $this->stats('requeued', $queue);
             return false;
         }
 
         $this->_log(LOG_INFO, "Successfully handled $info");
-        $this->con->ack($frame);
+        $this->ack($frame);
+        $this->commit();
+        $this->begin();
         $this->stats('handled', $queue);
         return true;
     }
@@ -369,5 +385,49 @@ class StompQueueManager extends QueueManager
     {
         common_log($level, 'StompQueueManager: '.$msg);
     }
+
+    protected function begin()
+    {
+        if ($this->useTransactions) {
+            if ($this->transaction) {
+                throw new Exception("Tried to start transaction in the middle of a transaction");
+            }
+            $this->transactionCount++;
+            $this->transaction = $this->master->id . '-' . $this->transactionCount . '-' . time();
+            $this->con->begin($this->transaction);
+        }
+    }
+
+    protected function ack($frame)
+    {
+        if ($this->useTransactions) {
+            if (!$this->transaction) {
+                throw new Exception("Tried to ack but not in a transaction");
+            }
+        }
+        $this->con->ack($frame, $this->transaction);
+    }
+
+    protected function commit()
+    {
+        if ($this->useTransactions) {
+            if (!$this->transaction) {
+                throw new Exception("Tried to commit but not in a transaction");
+            }
+            $this->con->commit($this->transaction);
+            $this->transaction = null;
+        }
+    }
+
+    protected function rollback()
+    {
+        if ($this->useTransactions) {
+            if (!$this->transaction) {
+                throw new Exception("Tried to rollback but not in a transaction");
+            }
+            $this->con->commit($this->transaction);
+            $this->transaction = null;
+        }
+    }
 }