fix up hub queueing to work w/ stomp queues
authorBrion Vibber <brion@pobox.com>
Tue, 9 Feb 2010 00:43:37 +0000 (16:43 -0800)
committerBrion Vibber <brion@pobox.com>
Wed, 10 Feb 2010 20:27:41 +0000 (12:27 -0800)
lib/queuemanager.php
lib/stompqueuemanager.php
plugins/OStatus/lib/hubdistribqueuehandler.php
plugins/OStatus/lib/huboutqueuehandler.php

index afe710e884dbc9d4aea2168a4f9548c4fc053b9c..149617eb508f1139c2502a110f57756b1a251744 100644 (file)
@@ -155,26 +155,26 @@ abstract class QueueManager extends IoManager
     }
 
     /**
-     * Encode an object for queued storage.
-     * Next gen may use serialization.
+     * Encode an object or variable for queued storage.
+     * Notice objects are currently stored as an id reference;
+     * other items are serialized.
      *
-     * @param mixed $object
+     * @param mixed $item
      * @return string
      */
-    protected function encode($object)
+    protected function encode($item)
     {
-        if ($object instanceof Notice) {
-            return $object->id;
-        } else if (is_string($object)) {
-            return $object;
+        if ($item instanceof Notice) {
+            // Backwards compat
+            return $item->id;
         } else {
-            throw new ServerException("Can't queue this type", 500);
+            return serialize($item);
         }
     }
 
     /**
      * Decode an object from queued storage.
-     * Accepts back-compat notice reference entries and strings for now.
+     * Accepts notice reference entries and serialized items.
      *
      * @param string
      * @return mixed
@@ -182,9 +182,23 @@ abstract class QueueManager extends IoManager
     protected function decode($frame)
     {
         if (is_numeric($frame)) {
+            // Back-compat for notices...
             return Notice::staticGet(intval($frame));
-        } else {
+        } elseif (substr($frame, 0, 1) == '<') {
+            // Back-compat for XML source
             return $frame;
+        } else {
+            // Deserialize!
+            #$old = error_reporting();
+            #error_reporting($old & ~E_NOTICE);
+            $out = unserialize($frame);
+            #error_reporting($old);
+
+            if ($out === false && $frame !== 'b:0;') {
+                common_log(LOG_ERR, "Couldn't unserialize queued frame: $frame");
+                return false;
+            }
+            return $out;
         }
     }
 
index cc4c817d8f7736be37dc1d4e6f00335413c57a30..cd62c25bd828c9a6bc37b158e2b9dc3ae5bb5180 100644 (file)
@@ -549,26 +549,14 @@ class StompQueueManager extends QueueManager
         }
 
         $host = $this->cons[$idx]->getServer();
-        if (is_numeric($frame->body)) {
-            $id = intval($frame->body);
-            $info = "notice $id posted at {$frame->headers['created']} in queue $queue from $host";
-
-            $notice = Notice::staticGet('id', $id);
-            if (empty($notice)) {
-                $this->_log(LOG_WARNING, "Skipping missing $info");
-                $this->ack($idx, $frame);
-                $this->commit($idx);
-                $this->begin($idx);
-                $this->stats('badnotice', $queue);
-                return false;
-            }
-
-            $item = $notice;
-        } else {
-            // @fixme should we serialize, or json, or what here?
-            $info = "string posted at {$frame->headers['created']} in queue $queue from $host";
-            $item = $frame->body;
+        $item = $this->decode($frame->body);
+        if (empty($item)) {
+            $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host");
+            return true;
         }
+        $info = $this->logrep($item) . " posted at " .
+                $frame->headers['created'] . " in queue $queue from $host";
+        $this->_log(LOG_DEBUG, "Dequeued $info");
 
         $handler = $this->getHandler($queue);
         if (!$handler) {
index 189ccbedf9fc26e25efd075ee048ef90e077fa1f..de3a8138580ca6c13e1ad466cbb98e69c4e48c68 100644 (file)
@@ -56,6 +56,7 @@ class HubDistribQueueHandler extends QueueHandler
         } else {
             common_log(LOG_INFO, "No PuSH subscribers for $feed");
         }
+        return true;
     }
 
     function pushGroup($notice, $group_id)
index cb44ad2c4edc55d298b2ae09060740d42eed5e52..0791c7e5db16c4fd186ba5158bddc93546172770 100644 (file)
@@ -43,7 +43,7 @@ class HubOutQueueHandler extends QueueHandler
             common_log(LOG_ERR, "Failed PuSH to $sub->callback for $sub->topic: " .
                                 $e->getMessage());
             // @fixme Reschedule a later delivery?
-            // Currently we have no way to do this other than 'send NOW'
+            return true;
         }
 
         return true;