X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=plugins%2FOStatus%2Flib%2Fostatusqueuehandler.php;h=964580cf49a208b8e2129857f8cc20a78587d468;hb=d6b28c64830f632bb2f4b6f3c9369b9e56ad217a;hp=0da85600fb99c39d07fa3f9d2abf0663f7e9fea3;hpb=c187bf55974347f7ddb4f28714af57861dce8f08;p=quix0rs-gnu-social.git diff --git a/plugins/OStatus/lib/ostatusqueuehandler.php b/plugins/OStatus/lib/ostatusqueuehandler.php index 0da85600fb..964580cf49 100644 --- a/plugins/OStatus/lib/ostatusqueuehandler.php +++ b/plugins/OStatus/lib/ostatusqueuehandler.php @@ -17,6 +17,10 @@ * along with this program. If not, see . */ +if (!defined('STATUSNET')) { + exit(1); +} + /** * Prepare PuSH and Salmon distributions for an outgoing message. * @@ -25,6 +29,18 @@ */ class OStatusQueueHandler extends QueueHandler { + // If we have more than this many subscribing sites on a single feed, + // break up the PuSH distribution into smaller batches which will be + // rolled into the queue progressively. This reduces disruption to + // other, shorter activities being enqueued while we work. + const MAX_UNBATCHED = 50; + + // Each batch (a 'hubprep' entry) will have this many items. + // Selected to provide a balance between queue packet size + // and number of batches that will end up getting processed. + // For 20,000 target sites, 1000 should work acceptably. + const BATCH_SIZE = 1000; + function transport() { return 'ostatus'; @@ -35,23 +51,62 @@ class OStatusQueueHandler extends QueueHandler assert($notice instanceof Notice); $this->notice = $notice; - $this->user = User::staticGet($notice->profile_id); + $this->user = User::getKV('id', $notice->profile_id); + + try { + $profile = $this->notice->getProfile(); + } catch (Exception $e) { + common_log(LOG_ERR, "Can't get profile for notice; skipping: " . $e->getMessage()); + return true; + } - $this->pushUser(); + if ($notice->isLocal()) { + // Notices generated on remote sites will have already + // been pushed to user's subscribers by their origin sites. + $this->pushUser(); + } foreach ($notice->getGroups() as $group) { - $oprofile = Ostatus_profile::staticGet('group_id', $group->id); + $oprofile = Ostatus_profile::getKV('group_id', $group->id); if ($oprofile) { - $this->pingReply($oprofile); + // remote group + if ($notice->isLocal()) { + $this->pingReply($oprofile); + } } else { + // local group $this->pushGroup($group->id); } } - foreach ($notice->getReplies() as $profile_id) { - $oprofile = Ostatus_profile::staticGet('profile_id', $profile_id); - if ($oprofile) { - $this->pingReply($oprofile); + if ($notice->isLocal()) { + // Notices generated on other sites will have already + // pinged their reply-targets. + + foreach ($notice->getReplies() as $profile_id) { + $oprofile = Ostatus_profile::getKV('profile_id', $profile_id); + if ($oprofile) { + $this->pingReply($oprofile); + } + } + + if (!empty($this->notice->reply_to)) { + $replyTo = Notice::getKV('id', $this->notice->reply_to); + if (!empty($replyTo)) { + foreach($replyTo->getReplies() as $profile_id) { + $oprofile = Ostatus_profile::getKV('profile_id', $profile_id); + if ($oprofile) { + $this->pingReply($oprofile); + } + } + } + } + + foreach ($notice->getProfileTags() as $ptag) { + $oprofile = Ostatus_profile::getKV('peopletag_id', $ptag->id); + if (!$oprofile) { + $this->pushPeopletag($ptag); + } } } @@ -80,14 +135,25 @@ class OStatusQueueHandler extends QueueHandler $this->pushFeed($feed, array($this, 'groupFeedForNotice'), $group_id); } - function pingReply($oprofile) + function pushPeopletag($ptag) + { + // For a local people tag, ping the PuSH hub to update its feed. + // Updates may come from either a local or a remote user. + $feed = common_local_url('ApiTimelineList', + array('id' => $ptag->id, + 'user' => $ptag->tagger, + 'format' => 'atom')); + $this->pushFeed($feed, array($this, 'peopletagFeedForNotice'), $ptag); + } + + function pingReply(OStatus_profile $oprofile) { if ($this->user) { // For local posts, send a Salmon ping to the mentioned // remote user or group. // @fixme as an optimization we can skip this if the // remote profile is subscribed to the author. - $oprofile->notifyDeferred($this->notice); + $oprofile->notifyDeferred($this->notice, $this->user->getProfile()); } } @@ -147,14 +213,31 @@ class OStatusQueueHandler extends QueueHandler /** * Queue up direct feed update pushes to subscribers on our internal hub. + * If there are a large number of subscriber sites, intermediate bulk + * distribution triggers may be queued. + * * @param string $atom update feed, containing only new/changed items * @param HubSub $sub open query of subscribers */ function pushFeedInternal($atom, $sub) { common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic"); + $n = 0; + $batch = array(); while ($sub->fetch()) { - $sub->distribute($atom); + $n++; + if ($n < self::MAX_UNBATCHED) { + $sub->distribute($atom); + } else { + $batch[] = $sub->callback; + if (count($batch) >= self::BATCH_SIZE) { + $sub->bulkDistribute($atom, $batch); + $batch = array(); + } + } + } + if (count($batch) >= 0) { + $sub->bulkDistribute($atom, $batch); } } @@ -164,48 +247,30 @@ class OStatusQueueHandler extends QueueHandler */ function userFeedForNotice() { - // @fixme this feels VERY hacky... - // should probably be a cleaner way to do it - - ob_start(); - $api = new ApiTimelineUserAction(); - $api->prepare(array('id' => $this->notice->profile_id, - 'format' => 'atom', - 'max_id' => $this->notice->id, - 'since_id' => $this->notice->id - 1)); - $api->showTimeline(); - $feed = ob_get_clean(); - - // ...and override the content-type back to something normal... eww! - // hope there's no other headers that got set while we weren't looking. - header('Content-Type: text/html; charset=utf-8'); - - common_log(LOG_DEBUG, $feed); + $atom = new AtomUserNoticeFeed($this->user); + $atom->addEntryFromNotice($this->notice); + $feed = $atom->getString(); + return $feed; } function groupFeedForNotice($group_id) { - // @fixme this feels VERY hacky... - // should probably be a cleaner way to do it - - ob_start(); - $api = new ApiTimelineGroupAction(); - $args = array('id' => $group_id, - 'format' => 'atom', - 'max_id' => $this->notice->id, - 'since_id' => $this->notice->id - 1); - $api->prepare($args); - $api->handle($args); - $feed = ob_get_clean(); - - // ...and override the content-type back to something normal... eww! - // hope there's no other headers that got set while we weren't looking. - header('Content-Type: text/html; charset=utf-8'); - - common_log(LOG_DEBUG, $feed); + $group = User_group::getKV('id', $group_id); + + $atom = new AtomGroupNoticeFeed($group); + $atom->addEntryFromNotice($this->notice); + $feed = $atom->getString(); + return $feed; } -} + function peopletagFeedForNotice($ptag) + { + $atom = new AtomListNoticeFeed($ptag); + $atom->addEntryFromNotice($this->notice); + $feed = $atom->getString(); + return $feed; + } +}