OStatus PuSH fixlets:
authorBrion Vibber <brion@pobox.com>
Sun, 21 Feb 2010 22:28:06 +0000 (14:28 -0800)
committerBrion Vibber <brion@pobox.com>
Sun, 21 Feb 2010 22:28:06 +0000 (14:28 -0800)
- set minimal error page output on PuSH callback
- allow hub to retry ($config['ostatus']['hub_retries']), simplify internal iface a bit. Retries are pushed to end of queue but otherwise not delayed yet; makes delivery more robust to one-off transitory errors but not yet against downtime.

plugins/OStatus/actions/pushcallback.php
plugins/OStatus/classes/HubSub.php
plugins/OStatus/lib/hubdistribqueuehandler.php
plugins/OStatus/lib/huboutqueuehandler.php

index 9e976a80de413d8f93f7dc94dc11ed1568f78633..35c92c7323ad237af34f1c9fae7730ad9e04c617 100644 (file)
@@ -29,6 +29,7 @@ class PushCallbackAction extends Action
 {
     function handle()
     {
+        StatusNet::setApi(true); // Minimize error messages to aid in debugging
         parent::handle();
         if ($_SERVER['REQUEST_METHOD'] == 'POST') {
             $this->handlePost();
index 0cd4281f8f5208199fb7b28cb9f922916bb60aa2..a81de68e69daee2abe9cd01eb069dfac2150a471 100644 (file)
@@ -226,6 +226,26 @@ class HubSub extends Memcached_DataObject
         return parent::insert();
     }
 
+    /**
+     * Schedule delivery of a 'fat ping' to the subscriber's callback
+     * endpoint. If queues are disabled, this will run immediately.
+     *
+     * @param string $atom well-formed Atom feed
+     * @param int $retries optional count of retries if POST fails; defaults to hub_retries from config or 0 if unset
+     */
+    function distribute($atom, $retries=null)
+    {
+        if ($retries === null) {
+            $retries = intval(common_config('ostatus', 'hub_retries'));
+        }
+
+        $data = array('sub' => clone($this),
+                      'atom' => $atom,
+                      'retries' => $retries);
+        $qm = QueueManager::get();
+        $qm->enqueue($data, 'hubout');
+    }
+
     /**
      * Send a 'fat ping' to the subscriber's callback endpoint
      * containing the given Atom feed chunk.
@@ -234,6 +254,7 @@ class HubSub extends Memcached_DataObject
      * a higher level; don't just shove in a complete feed!
      *
      * @param string $atom well-formed Atom feed
+     * @throws Exception (HTTP or general)
      */
     function push($atom)
     {
@@ -245,24 +266,18 @@ class HubSub extends Memcached_DataObject
             $hmac = '(none)';
         }
         common_log(LOG_INFO, "About to push feed to $this->callback for $this->topic, HMAC $hmac");
-        try {
-            $request = new HTTPClient();
-            $request->setBody($atom);
-            $response = $request->post($this->callback, $headers);
 
-            if ($response->isOk()) {
-                return true;
-            }
-            common_log(LOG_ERR, "Error sending PuSH content " .
-                                "to $this->callback for $this->topic: " .
-                                $response->getStatus());
-            return false;
+        $request = new HTTPClient();
+        $request->setBody($atom);
+        $response = $request->post($this->callback, $headers);
 
-        } catch (Exception $e) {
-            common_log(LOG_ERR, "Error sending PuSH content " .
-                                "to $this->callback for $this->topic: " .
-                                $e->getMessage());
-            return false;
+        if ($response->isOk()) {
+            return true;
+        } else {
+            throw new Exception("Callback returned status: " .
+                                $response->getStatus() .
+                                "; body: " .
+                                trim($response->getBody()));
         }
     }
 }
index 245a57f7200dac1e321d3aefe98661d9edaec724..30a427e3fca8b79b1fe4c40403b2a6f77de70b44 100644 (file)
@@ -124,10 +124,7 @@ class HubDistribQueueHandler extends QueueHandler
         common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic");
         $qm = QueueManager::get();
         while ($sub->fetch()) {
-            common_log(LOG_INFO, "Prepping PuSH distribution to $sub->callback for $sub->topic");
-            $data = array('sub' => clone($sub),
-                          'atom' => $atom);
-            $qm->enqueue($data, 'hubout');
+            $sub->distribute($atom);
         }
     }
 
index 0791c7e5db16c4fd186ba5158bddc93546172770..3ad94646e6ddd98e51b0674b1b3536cd69a3e619 100644 (file)
@@ -33,6 +33,7 @@ class HubOutQueueHandler extends QueueHandler
     {
         $sub = $data['sub'];
         $atom = $data['atom'];
+        $retries = $data['retries'];
 
         assert($sub instanceof HubSub);
         assert(is_string($atom));
@@ -40,13 +41,20 @@ class HubOutQueueHandler extends QueueHandler
         try {
             $sub->push($atom);
         } catch (Exception $e) {
-            common_log(LOG_ERR, "Failed PuSH to $sub->callback for $sub->topic: " .
-                                $e->getMessage());
-            // @fixme Reschedule a later delivery?
-            return true;
+            $retries--;
+            $msg = "Failed PuSH to $sub->callback for $sub->topic: " .
+                   $e->getMessage();
+            if ($retries > 0) {
+                common_log(LOG_ERR, "$msg; scheduling for $retries more tries");
+
+                // @fixme when we have infrastructure to schedule a retry
+                // after a delay, use it.
+                $sub->distribute($atom, $retries);
+            } else {
+                common_log(LOG_ERR, "$msg; discarding");
+            }
         }
 
         return true;
     }
 }
-