]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
move handling code into queuemanager
authorEvan Prodromou <evan@controlyourself.ca>
Sat, 4 Jul 2009 04:31:28 +0000 (00:31 -0400)
committerEvan Prodromou <evan@controlyourself.ca>
Sat, 4 Jul 2009 04:31:28 +0000 (00:31 -0400)
lib/dbqueuemanager.php
lib/queuehandler.php
lib/queuemanager.php
lib/stompqueuemanager.php

index c9e5ef243f9a2b8beb15d5b57bff60f0dbbe1a50..6e7172de005ee7df94096e459122f936c424c3b4 100644 (file)
@@ -51,7 +51,36 @@ class DBQueueManager extends QueueManager
         return true;
     }
 
-    function nextItem($queue, $timeout=null)
+    function service($queue, $handler)
+    {
+        while (true) {
+            $this->_log(LOG_DEBUG, 'Checking for notices...');
+            $notice = $this->_nextItem($queue, null);
+            if (empty($notice)) {
+                $this->_log(LOG_DEBUG, 'No notices waiting; idling.');
+                // Nothing in the queue. Do you
+                // have other tasks, like servicing your
+                // XMPP connection, to do?
+                $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+            } else {
+                $this->_log(LOG_INFO, 'Got notice '. $notice->id);
+                // Yay! Got one!
+                if ($handler->handle_notice($notice)) {
+                    $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
+                    $this->_done($notice, $queue);
+                } else {
+                    $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id);
+                    $this->_fail($notice, $queue);
+                }
+                // Chance to e.g. service your XMPP connection
+                $this->_log(LOG_DEBUG, 'Idling after success.');
+                $handler->idle(QUEUE_HANDLER_HIT_IDLE);
+            }
+            // XXX: when do we give up?
+        }
+    }
+
+    function _nextItem($queue, $timeout=null)
     {
         $start = time();
         $result = null;
@@ -74,7 +103,7 @@ class DBQueueManager extends QueueManager
         return $result;
     }
 
-    function done($object, $queue)
+    function _done($object, $queue)
     {
         // XXX: right now, we only handle notices
 
@@ -101,7 +130,7 @@ class DBQueueManager extends QueueManager
         $notice = null;
     }
 
-    function fail($object, $queue)
+    function _fail($object, $queue)
     {
         // XXX: right now, we only handle notices
 
index ddb47a28e947478a97a3ca988ffa81583459d787..c0f38f4e35f35fa9f7f132ecf56fcf3f236e9aec 100644 (file)
@@ -25,7 +25,7 @@ require_once(INSTALLDIR.'/classes/Notice.php');
 
 define('CLAIM_TIMEOUT', 1200);
 define('QUEUE_HANDLER_MISS_IDLE', 10);
-define('QUEUE_HANDLER_HIT_IDLE', 10);
+define('QUEUE_HANDLER_HIT_IDLE', 0);
 
 class QueueHandler extends Daemon
 {
@@ -42,7 +42,7 @@ class QueueHandler extends Daemon
 
     function timeout()
     {
-        return null;
+        return 60;
     }
 
     function class_name()
@@ -96,31 +96,7 @@ class QueueHandler extends Daemon
 
         $qm = QueueManager::get();
 
-        while (true) {
-            $this->log(LOG_DEBUG, 'Checking for notices...');
-            $notice = $qm->nextItem($queue, $timeout);
-            if (empty($notice)) {
-                $this->log(LOG_DEBUG, 'No notices waiting; idling.');
-                // Nothing in the queue. Do you
-                // have other tasks, like servicing your
-                // XMPP connection, to do?
-                $this->idle(QUEUE_HANDLER_MISS_IDLE);
-            } else {
-                $this->log(LOG_INFO, 'Got notice '. $notice->id);
-                // Yay! Got one!
-                if ($this->handle_notice($notice)) {
-                    $this->log(LOG_INFO, 'Successfully handled notice '. $notice->id);
-                    $qm->done($notice, $queue);
-                } else {
-                    $this->log(LOG_INFO, 'Failed to handle notice '. $notice->id);
-                    $qm->fail($notice, $queue);
-                }
-                // Chance to e.g. service your XMPP connection
-                $this->log(LOG_DEBUG, 'Idling after success.');
-                $this->idle(QUEUE_HANDLER_HIT_IDLE);
-            }
-            // XXX: when do we give up?
-        }
+        $qm->service($queue, $this);
 
         if (!$this->finish()) {
             return false;
index 1bf4d4decc7ce1e81ec2a363d00e967ee342d57c..f36e99d16ae807aaf5b052c24269e5c5f134a7ce 100644 (file)
@@ -67,23 +67,8 @@ class QueueManager
         throw ServerException("Unimplemented function 'enqueue' called");
     }
 
-    function peek($queue)
+    function service($queue, $handler)
     {
-        throw ServerException("Unimplemented function 'peek' called");
-    }
-
-    function nextItem($queue, $timeout=null)
-    {
-        throw ServerException("Unimplemented function 'nextItem' called");
-    }
-
-    function done($object, $queue)
-    {
-        throw ServerException("Unimplemented function 'done' called");
-    }
-
-    function fail($object, $queue)
-    {
-        throw ServerException("Unimplemented function 'fail' called");
+        throw ServerException("Unimplemented function 'service' called");
     }
 }
index 1ad6870363c5728d16d19c9b8eeb27ad2c66c60d..b8731d5439c4d936bc9bc6687e6b1f1b2610e175 100644 (file)
@@ -84,85 +84,33 @@ class StompQueueManager
                    . $notice->id . ' for ' . $transport);
     }
 
-    function nextItem($queue, $timeout=null)
+    function service($queue, $handler)
     {
         $result = null;
 
         $this->_connect();
 
-        $frame = $this->con->readFrame();
+        $this->con->setReadTimeout($handler->timeout());
 
-        if ($frame) {
-            $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created']));
+        $this->con->subscribe($this->_queueName($queue));
 
-            // XXX: Now the queue handler receives only the ID of the
-            // notice, and it has to get it from the DB
-            // A massive improvement would be avoid DB query by transmitting
-            // all the notice details via queue server...
+        while (true) {
 
-            $notice = Notice::staticGet($frame->body);
+            $frame = $this->con->readFrame();
 
-            if ($notice) {
-                $this->_saveFrame($notice, $queue, $frame);
-            } else {
-                $this->log(LOG_WARNING, 'queue item for notice that does not exist');
-            }
-        }
-    }
-
-    function done($object, $queue)
-    {
-        $notice = $object;
+            if ($frame) {
+                $notice = Notice::staticGet($frame->body);
 
-        $this->_connect();
-
-        $frame = $this->_getFrame($notice, $queue);
-
-        if (empty($frame)) {
-            $this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue);
-        } else {
-            // if the msg has been handled positively, ack it
-            // and the queue server will remove it from the queue
-            $this->con->ack($frame);
-            $this->_clearFrame($notice, $queue);
+                if ($handler->handle_notice($notice)) {
+                    $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
+                    $this->con->ack($frame);
+                }
+            }
 
-            $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
+            $handler->idle(0);
         }
-    }
-
-    function fail($object, $queue)
-    {
-        $notice = $object;
-
-        // STOMP server will requeue it after a while anyways,
-        // so no need to notify. Just get it out of our little
-        // array
-
-        $this->_clearFrame($notice, $queue);
-    }
-
-    function _frameKey($notice, $queue)
-    {
-        return ((string)$notice->id) . '-' . $queue;
-    }
 
-    function _saveFrame($notice, $queue, $frame)
-    {
-        $k = $this->_frameKey($notice, $queue);
-        $this->_frames[$k] = $frame;
-        return true;
-    }
-
-    function _getFrame($notice, $queue)
-    {
-        $k = $this->_frameKey($notice, $queue);
-        return $this->_frames[$k];
-    }
-
-    function _clearFrame($notice, $queue)
-    {
-        $k = $this->_frameKey($notice, $queue);
-        unset($this->_frames[$k]);
+        $this->con->unsubscribe($this->_queueName($queue));
     }
 
     function _queueName($queue)