X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=plugins%2FOStatus%2Flib%2Fostatusqueuehandler.php;h=5e0ab46a5b842657afd760b04226141f36e15e44;hb=5a6f6162061985c9d0e67b7d25b94b5a53205ace;hp=d1e58f1d68ec2b83925b9faa98043b2da1fd5dff;hpb=b218aee94e581230e1efa14d4ae1a19756986ddf;p=quix0rs-gnu-social.git diff --git a/plugins/OStatus/lib/ostatusqueuehandler.php b/plugins/OStatus/lib/ostatusqueuehandler.php index d1e58f1d68..5e0ab46a5b 100644 --- a/plugins/OStatus/lib/ostatusqueuehandler.php +++ b/plugins/OStatus/lib/ostatusqueuehandler.php @@ -25,6 +25,18 @@ */ class OStatusQueueHandler extends QueueHandler { + // If we have more than this many subscribing sites on a single feed, + // break up the PuSH distribution into smaller batches which will be + // rolled into the queue progressively. This reduces disruption to + // other, shorter activities being enqueued while we work. + const MAX_UNBATCHED = 50; + + // Each batch (a 'hubprep' entry) will have this many items. + // Selected to provide a balance between queue packet size + // and number of batches that will end up getting processed. + // For 20,000 target sites, 1000 should work acceptably. + const BATCH_SIZE = 1000; + function transport() { return 'ostatus'; @@ -55,6 +67,17 @@ class OStatusQueueHandler extends QueueHandler } } + if (!empty($this->notice->reply_to)) { + $replyTo = Notice::staticGet('id', $this->notice->reply_to); + if (!empty($replyTo)) { + foreach($replyTo->getReplies() as $profile_id) { + $oprofile = Ostatus_profile::staticGet('profile_id', $profile_id); + if ($oprofile) { + $this->pingReply($oprofile); + } + } + } + } return true; } @@ -147,14 +170,31 @@ class OStatusQueueHandler extends QueueHandler /** * Queue up direct feed update pushes to subscribers on our internal hub. + * If there are a large number of subscriber sites, intermediate bulk + * distribution triggers may be queued. + * * @param string $atom update feed, containing only new/changed items * @param HubSub $sub open query of subscribers */ function pushFeedInternal($atom, $sub) { common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic"); + $n = 0; + $batch = array(); while ($sub->fetch()) { - $sub->distribute($atom); + $n++; + if ($n < self::MAX_UNBATCHED) { + $sub->distribute($atom); + } else { + $batch[] = $sub->callback; + if (count($batch) >= self::BATCH_SIZE) { + $sub->bulkDistribute($atom, $batch); + $batch = array(); + } + } + } + if (count($batch) >= 0) { + $sub->bulkDistribute($atom, $batch); } } @@ -181,6 +221,4 @@ class OStatusQueueHandler extends QueueHandler return $feed; } - } -