]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
OStatus plugin: Rolling batch queueing for PuSH output to >50 subscribing sites....
authorBrion Vibber <brion@pobox.com>
Fri, 4 Jun 2010 18:48:54 +0000 (11:48 -0700)
committerBrion Vibber <brion@pobox.com>
Mon, 7 Jun 2010 17:03:43 +0000 (10:03 -0700)
plugins/OStatus/OStatusPlugin.php
plugins/OStatus/classes/HubSub.php
plugins/OStatus/lib/ostatusqueuehandler.php

index 5a657c83d00875ee8af6923cc620fb465853e763..c61e2cc5f3984234940b694ea74132ece1d6b9db 100644 (file)
@@ -87,6 +87,8 @@ class OStatusPlugin extends Plugin
 
         // Outgoing from our internal PuSH hub
         $qm->connect('hubconf', 'HubConfQueueHandler');
+        $qm->connect('hubprep', 'HubPrepQueueHandler');
+
         $qm->connect('hubout', 'HubOutQueueHandler');
 
         // Outgoing Salmon replies (when we don't need a return value)
index 9748b4a569971d90041512c75830b52c8ff8be84..7db528a4e85772a23ad35823cb5592e0edde6951 100644 (file)
@@ -304,6 +304,26 @@ class HubSub extends Memcached_DataObject
         $qm->enqueue($data, 'hubout');
     }
 
+    /**
+     * Queue up a large batch of pushes to multiple subscribers
+     * for this same topic update.
+     * 
+     * If queues are disabled, this will run immediately.
+     * 
+     * @param string $atom well-formed Atom feed
+     * @param array $pushCallbacks list of callback URLs
+     */
+    function bulkDistribute($atom, $pushCallbacks)
+    {
+        $data = array('atom' => $atom,
+                      'topic' => $this->topic,
+                      'pushCallbacks' => $pushCallbacks);
+        common_log(LOG_INFO, "Queuing PuSH batch: $this->topic to " .
+                             count($pushCallbacks) . " sites");
+        $qm = QueueManager::get();
+        $qm->enqueue($data, 'hubprep');
+    }
+
     /**
      * Send a 'fat ping' to the subscriber's callback endpoint
      * containing the given Atom feed chunk.
index d1e58f1d68ec2b83925b9faa98043b2da1fd5dff..8905d2e21069f22851c81c208b20ba650fbc2fd6 100644 (file)
  */
 class OStatusQueueHandler extends QueueHandler
 {
+    // If we have more than this many subscribing sites on a single feed,
+    // break up the PuSH distribution into smaller batches which will be
+    // rolled into the queue progressively. This reduces disruption to
+    // other, shorter activities being enqueued while we work.
+    const MAX_UNBATCHED = 50;
+
+    // Each batch (a 'hubprep' entry) will have this many items.
+    // Selected to provide a balance between queue packet size
+    // and number of batches that will end up getting processed.
+    // For 20,000 target sites, 1000 should work acceptably.
+    const BATCH_SIZE = 1000;
+
     function transport()
     {
         return 'ostatus';
@@ -147,14 +159,31 @@ class OStatusQueueHandler extends QueueHandler
 
     /**
      * Queue up direct feed update pushes to subscribers on our internal hub.
+     * If there are a large number of subscriber sites, intermediate bulk
+     * distribution triggers may be queued.
+     * 
      * @param string $atom update feed, containing only new/changed items
      * @param HubSub $sub open query of subscribers
      */
     function pushFeedInternal($atom, $sub)
     {
         common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic");
+        $n = 0;
+        $batch = array();
         while ($sub->fetch()) {
-            $sub->distribute($atom);
+            $n++;
+            if ($n < self::MAX_UNBATCHED) {
+                $sub->distribute($atom);
+            } else {
+                $batch[] = $sub->callback;
+                if (count($batch) >= self::BATCH_SIZE) {
+                    $sub->bulkDistribute($atom, $batch);
+                    $batch = array();
+                }
+            }
+        }
+        if (count($batch) >= 0) {
+            $sub->bulkDistribute($atom, $batch);
         }
     }