]> git.mxchange.org Git - quix0rs-gnu-social.git/blobdiff - plugins/OStatus/lib/ostatusqueuehandler.php
Merge branch '1.0.x' into shortcontrol10x
[quix0rs-gnu-social.git] / plugins / OStatus / lib / ostatusqueuehandler.php
index c1e50bffa1e50a8cb48839e519f2facc4187aab6..9814cab9f18d99fe4ff34e4811a82c22fa1baae0 100644 (file)
  */
 class OStatusQueueHandler extends QueueHandler
 {
+    // If we have more than this many subscribing sites on a single feed,
+    // break up the PuSH distribution into smaller batches which will be
+    // rolled into the queue progressively. This reduces disruption to
+    // other, shorter activities being enqueued while we work.
+    const MAX_UNBATCHED = 50;
+
+    // Each batch (a 'hubprep' entry) will have this many items.
+    // Selected to provide a balance between queue packet size
+    // and number of batches that will end up getting processed.
+    // For 20,000 target sites, 1000 should work acceptably.
+    const BATCH_SIZE = 1000;
+
     function transport()
     {
         return 'ostatus';
@@ -55,6 +67,17 @@ class OStatusQueueHandler extends QueueHandler
             }
         }
 
+        if (!empty($this->notice->reply_to)) {
+            $replyTo = Notice::staticGet('id', $this->notice->reply_to);
+            if (!empty($replyTo)) {
+                foreach($replyTo->getReplies() as $profile_id) {
+                    $oprofile = Ostatus_profile::staticGet('profile_id', $profile_id);
+                    if ($oprofile) {
+                        $this->pingReply($oprofile);
+                    }
+                }
+            }
+        }
         return true;
     }
 
@@ -83,23 +106,11 @@ class OStatusQueueHandler extends QueueHandler
     function pingReply($oprofile)
     {
         if ($this->user) {
-            if (!empty($oprofile->salmonuri)) {
-                // For local posts, send a Salmon ping to the mentioned
-                // remote user or group.
-                // @fixme as an optimization we can skip this if the
-                // remote profile is subscribed to the author.
-
-                common_log(LOG_INFO, "Prepping to send notice '{$this->notice->uri}' to remote profile '{$oprofile->uri}'.");
-
-                $xml = '<?xml version="1.0" encoding="UTF-8" ?' . '>';
-                $xml .= $this->notice->asAtomEntry(true, true);
-
-                $data = array('salmonuri' => $oprofile->salmonuri,
-                              'entry' => $xml);
-
-                $qm = QueueManager::get();
-                $qm->enqueue($data, 'salmonout');
-            }
+            // For local posts, send a Salmon ping to the mentioned
+            // remote user or group.
+            // @fixme as an optimization we can skip this if the
+            // remote profile is subscribed to the author.
+            $oprofile->notifyDeferred($this->notice, $this->user);
         }
     }
 
@@ -159,14 +170,31 @@ class OStatusQueueHandler extends QueueHandler
 
     /**
      * Queue up direct feed update pushes to subscribers on our internal hub.
+     * If there are a large number of subscriber sites, intermediate bulk
+     * distribution triggers may be queued.
+     *
      * @param string $atom update feed, containing only new/changed items
      * @param HubSub $sub open query of subscribers
      */
     function pushFeedInternal($atom, $sub)
     {
         common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic");
+        $n = 0;
+        $batch = array();
         while ($sub->fetch()) {
-            $sub->distribute($atom);
+            $n++;
+            if ($n < self::MAX_UNBATCHED) {
+                $sub->distribute($atom);
+            } else {
+                $batch[] = $sub->callback;
+                if (count($batch) >= self::BATCH_SIZE) {
+                    $sub->bulkDistribute($atom, $batch);
+                    $batch = array();
+                }
+            }
+        }
+        if (count($batch) >= 0) {
+            $sub->bulkDistribute($atom, $batch);
         }
     }
 
@@ -176,48 +204,22 @@ class OStatusQueueHandler extends QueueHandler
      */
     function userFeedForNotice()
     {
-        // @fixme this feels VERY hacky...
-        // should probably be a cleaner way to do it
-
-        ob_start();
-        $api = new ApiTimelineUserAction();
-        $api->prepare(array('id' => $this->notice->profile_id,
-                            'format' => 'atom',
-                            'max_id' => $this->notice->id,
-                            'since_id' => $this->notice->id - 1));
-        $api->showTimeline();
-        $feed = ob_get_clean();
-        
-        // ...and override the content-type back to something normal... eww!
-        // hope there's no other headers that got set while we weren't looking.
-        header('Content-Type: text/html; charset=utf-8');
-
-        common_log(LOG_DEBUG, $feed);
+        $atom = new AtomUserNoticeFeed($this->user);
+        $atom->addEntryFromNotice($this->notice);
+        $feed = $atom->getString();
+
         return $feed;
     }
 
     function groupFeedForNotice($group_id)
     {
-        // @fixme this feels VERY hacky...
-        // should probably be a cleaner way to do it
-
-        ob_start();
-        $api = new ApiTimelineGroupAction();
-        $args = array('id' => $group_id,
-                      'format' => 'atom',
-                      'max_id' => $this->notice->id,
-                      'since_id' => $this->notice->id - 1);
-        $api->prepare($args);
-        $api->handle($args);
-        $feed = ob_get_clean();
-        
-        // ...and override the content-type back to something normal... eww!
-        // hope there's no other headers that got set while we weren't looking.
-        header('Content-Type: text/html; charset=utf-8');
-
-        common_log(LOG_DEBUG, $feed);
+        $group = User_group::staticGet('id', $group_id);
+
+        $atom = new AtomGroupNoticeFeed($group);
+        $atom->addEntryFromNotice($this->notice);
+        $feed = $atom->getString();
+
         return $feed;
     }
 
 }
-