]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
- break OMB profile update pings to a background queue
authorBrion Vibber <brion@pobox.com>
Wed, 24 Feb 2010 20:36:36 +0000 (20:36 +0000)
committerBrion Vibber <brion@pobox.com>
Wed, 24 Feb 2010 20:36:36 +0000 (20:36 +0000)
- add event hooks to profile update pings
- send Salmon pings with custom update-profile event to OStatus subscribees and groups (subscribers will see it on your next post)
- fix OStatus queues with overlong transport names, should work on DB queues now
- Ostatus_profile::notifyActivity() and ::notifyDeferred() now can take XML, Notice, or Activity for convenience

15 files changed:
lib/activity.php
lib/profilequeuehandler.php [new file with mode: 0644]
lib/queuemanager.php
lib/util.php
plugins/OStatus/OStatusPlugin.php
plugins/OStatus/actions/pushcallback.php
plugins/OStatus/classes/HubSub.php
plugins/OStatus/classes/Ostatus_profile.php
plugins/OStatus/lib/hubconfqueuehandler.php [new file with mode: 0644]
plugins/OStatus/lib/hubverifyqueuehandler.php [deleted file]
plugins/OStatus/lib/ostatusqueuehandler.php
plugins/OStatus/lib/pushinputqueuehandler.php [deleted file]
plugins/OStatus/lib/pushinqueuehandler.php [new file with mode: 0644]
plugins/OStatus/lib/salmonoutqueuehandler.php [deleted file]
plugins/OStatus/lib/salmonqueuehandler.php [new file with mode: 0644]

index fa4ae02748d8d7ba29a4a6dff3760773d27b7158..33932ad0ef9c91e7fbb68ad0ecff50cc1de2b4f2 100644 (file)
@@ -691,6 +691,9 @@ class ActivityVerb
     const UNFAVORITE = 'http://ostatus.org/schema/1.0/unfavorite';
     const UNFOLLOW   = 'http://ostatus.org/schema/1.0/unfollow';
     const LEAVE      = 'http://ostatus.org/schema/1.0/leave';
+
+    // For simple profile-update pings; no content to share.
+    const UPDATE_PROFILE = 'http://ostatus.org/schema/1.0/update-profile';
 }
 
 class ActivityContext
diff --git a/lib/profilequeuehandler.php b/lib/profilequeuehandler.php
new file mode 100644 (file)
index 0000000..e8a00ae
--- /dev/null
@@ -0,0 +1,48 @@
+<?php
+/*
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2010, StatusNet, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * @package QueueHandler
+ * @maintainer Brion Vibber <brion@status.net>
+ */
+
+class ProfileQueueHandler extends QueueHandler
+{
+
+    function transport()
+    {
+        return 'profile';
+    }
+
+    function handle($profile)
+    {
+        if (!($profile instanceof Profile)) {
+            common_log(LOG_ERR, "Got a bogus profile, not broadcasting");
+            return true;
+        }
+
+        if (Event::handle('StartBroadcastProfile', array($profile))) {
+            require_once(INSTALLDIR.'/lib/omb.php');
+            omb_broadcast_profile($profile);
+        }
+        Event::handle('EndBroadcastProfile', array($profile));
+        return true;
+    }
+
+}
index 8f8c8f133ffcea17ad51f4f6c8229e5704779e49..9fdc801100f46f58df9e9ab22c71529da55af6bd 100644 (file)
@@ -262,6 +262,9 @@ abstract class QueueManager extends IoManager
                 $this->connect('sms', 'SmsQueueHandler');
             }
 
+            // Broadcasting profile updates to OMB remote subscribers
+            $this->connect('profile', 'ProfileQueueHandler');
+
             // XMPP output handlers...
             if (common_config('xmpp', 'enabled')) {
                 // Delivery prep, read by queuedaemon.php:
index 7fb2c6c4b0315b090b2a14df3448908ae5499183..9354431f27b62f4214556afbad5c0e51ce0c23e4 100644 (file)
@@ -1119,12 +1119,16 @@ function common_enqueue_notice($notice)
     return true;
 }
 
-function common_broadcast_profile($profile)
+/**
+ * Broadcast profile updates to OMB and other remote subscribers.
+ *
+ * Since this may be slow with a lot of subscribers or bad remote sites,
+ * this is run through the background queues if possible.
+ */
+function common_broadcast_profile(Profile $profile)
 {
-    // XXX: optionally use a queue system like http://code.google.com/p/microapps/wiki/NQDQ
-    require_once(INSTALLDIR.'/lib/omb.php');
-    omb_broadcast_profile($profile);
-    // XXX: Other broadcasts...?
+    $qm = QueueManager::get();
+    $qm->enqueue($profile, "profile");
     return true;
 }
 
index 9376c048dee2f141a959a85a95b8ed077cc71ba4..90abe034de358565066ff8d9b2cec90d5223ca3c 100644 (file)
@@ -82,14 +82,14 @@ class OStatusPlugin extends Plugin
         $qm->connect('ostatus', 'OStatusQueueHandler');
 
         // Outgoing from our internal PuSH hub
-        $qm->connect('hubverify', 'HubVerifyQueueHandler');
+        $qm->connect('hubconf', 'HubConfQueueHandler');
         $qm->connect('hubout', 'HubOutQueueHandler');
 
         // Outgoing Salmon replies (when we don't need a return value)
-        $qm->connect('salmonout', 'SalmonOutQueueHandler');
+        $qm->connect('salmon', 'SalmonQueueHandler');
 
         // Incoming from a foreign PuSH hub
-        $qm->connect('pushinput', 'PushInputQueueHandler');
+        $qm->connect('pushin', 'PushInQueueHandler');
         return true;
     }
 
@@ -656,4 +656,51 @@ class OStatusPlugin extends Plugin
 
         return true;
     }
+
+    /**
+     * Ping remote profiles with updates to this profile.
+     * Salmon pings are queued for background processing.
+     */
+    function onEndBroadcastProfile(Profile $profile)
+    {
+        $user = User::staticGet('id', $profile->id);
+
+        // Find foreign accounts I'm subscribed to that support Salmon pings.
+        //
+        // @fixme we could run updates through the PuSH feed too,
+        // in which case we can skip Salmon pings to folks who
+        // are also subscribed to me.
+        $sql = "SELECT * FROM ostatus_profile " .
+               "WHERE profile_id IN " .
+               "(SELECT subscribed FROM subscription WHERE subscriber=%d) " .
+               "OR group_id IN " .
+               "(SELECT group_id FROM group_member WHERE profile_id=%d)";
+        $oprofile = new Ostatus_profile();
+        $oprofile->query(sprintf($sql, $profile->id, $profile->id));
+
+        if ($oprofile->N == 0) {
+            common_log(LOG_DEBUG, "No OStatus remote subscribees for $profile->nickname");
+            return true;
+        }
+
+        $act = new Activity();
+
+        $act->verb = ActivityVerb::UPDATE_PROFILE;
+        $act->id   = TagURI::mint('update-profile:%d:%s',
+                                  $profile->id,
+                                  common_date_iso8601(time()));
+        $act->time    = time();
+        $act->title   = _m("Profile update");
+        $act->content = sprintf(_m("%s has updated their profile page."),
+                               $profile->getBestName());
+
+        $act->actor   = ActivityObject::fromProfile($profile);
+        $act->object  = $act->actor;
+
+        while ($oprofile->fetch()) {
+            $oprofile->notifyDeferred($act);
+        }
+
+        return true;
+    }
 }
index 4184f0e0c008044ba9353a978f1b8d6386146f8a..9a2067b8ca0725e5b8ffa201add9bc4fedf4ba03 100644 (file)
@@ -68,7 +68,7 @@ class PushCallbackAction extends Action
                       'post' => $post,
                       'hmac' => $hmac);
         $qm = QueueManager::get();
-        $qm->enqueue($data, 'pushinput');
+        $qm->enqueue($data, 'pushin');
     }
     
     /**
index eae2928c329fcb0dfb58c1357443461f80e98050..1ac181feeb946db8097f0f0954892f3658ea1b0b 100644 (file)
@@ -164,7 +164,7 @@ class HubSub extends Memcached_DataObject
                       'token' => $token,
                       'retries' => $retries);
         $qm = QueueManager::get();
-        $qm->enqueue($data, 'hubverify');
+        $qm->enqueue($data, 'hubconf');
     }
 
     /**
index 9f9efb96ee98f3e3b219258c4c8859ed3490c6fb..61505206ece5ec5369f9e5f8ba5d150fd7080354 100644 (file)
@@ -431,21 +431,57 @@ class Ostatus_profile extends Memcached_DataObject
         return false;
     }
 
-    public function notifyActivity($activity)
+    /**
+     * Send a Salmon notification ping immediately, and confirm that we got
+     * an acceptable response from the remote site.
+     *
+     * @param mixed $entry XML string, Notice, or Activity
+     * @return boolean success
+     */
+    public function notifyActivity($entry)
     {
         if ($this->salmonuri) {
+            $salmon = new Salmon();
+            return $salmon->post($this->salmonuri, $this->notifyPrepXml($entry));
+        }
 
-            $xml = '<?xml version="1.0" encoding="UTF-8" ?' . '>' .
-                          $activity->asString(true);
+        return false;
+    }
 
-            $salmon = new Salmon(); // ?
+    /**
+     * Queue a Salmon notification for later. If queues are disabled we'll
+     * send immediately but won't get the return value.
+     *
+     * @param mixed $entry XML string, Notice, or Activity
+     * @return boolean success
+     */
+    public function notifyDeferred($entry)
+    {
+        if ($this->salmonuri) {
+            $data = array('salmonuri' => $this->salmonuri,
+                          'entry' => $this->notifyPrepXml($entry));
 
-            return $salmon->post($this->salmonuri, $xml);
+            $qm = QueueManager::get();
+            return $qm->enqueue($data, 'salmon');
         }
 
         return false;
     }
 
+    protected function notifyPrepXml($entry)
+    {
+        $preamble = '<?xml version="1.0" encoding="UTF-8" ?' . '>';
+        if (is_string($entry)) {
+            return $entry;
+        } else if ($entry instanceof Activity) {
+            return $preamble . $entry->asString(true);
+        } else if ($entry instanceof Notice) {
+            return $preamble . $entry->asAtomEntry(true, true);
+        } else {
+            throw new ServerException("Invalid type passed to Ostatus_profile::notify; must be XML string or Activity entry");
+        }
+    }
+
     function getBestName()
     {
         if ($this->isGroup()) {
diff --git a/plugins/OStatus/lib/hubconfqueuehandler.php b/plugins/OStatus/lib/hubconfqueuehandler.php
new file mode 100644 (file)
index 0000000..c8e0b72
--- /dev/null
@@ -0,0 +1,54 @@
+<?php
+/*
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2010, StatusNet, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * Send a PuSH subscription verification from our internal hub.
+ * @package Hub
+ * @author Brion Vibber <brion@status.net>
+ */
+class HubConfQueueHandler extends QueueHandler
+{
+    function transport()
+    {
+        return 'hubconf';
+    }
+
+    function handle($data)
+    {
+        $sub = $data['sub'];
+        $mode = $data['mode'];
+        $token = $data['token'];
+
+        assert($sub instanceof HubSub);
+        assert($mode === 'subscribe' || $mode === 'unsubscribe');
+
+        common_log(LOG_INFO, __METHOD__ . ": $mode $sub->callback $sub->topic");
+        try {
+            $sub->verify($mode, $token);
+        } catch (Exception $e) {
+            common_log(LOG_ERR, "Failed PuSH $mode verify to $sub->callback for $sub->topic: " .
+                                $e->getMessage());
+            // @fixme schedule retry?
+            // @fixme just kill it?
+        }
+
+        return true;
+    }
+}
+
diff --git a/plugins/OStatus/lib/hubverifyqueuehandler.php b/plugins/OStatus/lib/hubverifyqueuehandler.php
deleted file mode 100644 (file)
index 7ce9e14..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-<?php
-/*
- * StatusNet - the distributed open-source microblogging tool
- * Copyright (C) 2010, StatusNet, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-/**
- * Send a PuSH subscription verification from our internal hub.
- * @package Hub
- * @author Brion Vibber <brion@status.net>
- */
-class HubVerifyQueueHandler extends QueueHandler
-{
-    function transport()
-    {
-        return 'hubverify';
-    }
-
-    function handle($data)
-    {
-        $sub = $data['sub'];
-        $mode = $data['mode'];
-        $token = $data['token'];
-
-        assert($sub instanceof HubSub);
-        assert($mode === 'subscribe' || $mode === 'unsubscribe');
-
-        common_log(LOG_INFO, __METHOD__ . ": $mode $sub->callback $sub->topic");
-        try {
-            $sub->verify($mode, $token);
-        } catch (Exception $e) {
-            common_log(LOG_ERR, "Failed PuSH $mode verify to $sub->callback for $sub->topic: " .
-                                $e->getMessage());
-            // @fixme schedule retry?
-            // @fixme just kill it?
-        }
-
-        return true;
-    }
-}
-
index c1e50bffa1e50a8cb48839e519f2facc4187aab6..0da85600fb99c39d07fa3f9d2abf0663f7e9fea3 100644 (file)
@@ -83,23 +83,11 @@ class OStatusQueueHandler extends QueueHandler
     function pingReply($oprofile)
     {
         if ($this->user) {
-            if (!empty($oprofile->salmonuri)) {
-                // For local posts, send a Salmon ping to the mentioned
-                // remote user or group.
-                // @fixme as an optimization we can skip this if the
-                // remote profile is subscribed to the author.
-
-                common_log(LOG_INFO, "Prepping to send notice '{$this->notice->uri}' to remote profile '{$oprofile->uri}'.");
-
-                $xml = '<?xml version="1.0" encoding="UTF-8" ?' . '>';
-                $xml .= $this->notice->asAtomEntry(true, true);
-
-                $data = array('salmonuri' => $oprofile->salmonuri,
-                              'entry' => $xml);
-
-                $qm = QueueManager::get();
-                $qm->enqueue($data, 'salmonout');
-            }
+            // For local posts, send a Salmon ping to the mentioned
+            // remote user or group.
+            // @fixme as an optimization we can skip this if the
+            // remote profile is subscribed to the author.
+            $oprofile->notifyDeferred($this->notice);
         }
     }
 
diff --git a/plugins/OStatus/lib/pushinputqueuehandler.php b/plugins/OStatus/lib/pushinputqueuehandler.php
deleted file mode 100644 (file)
index cbd9139..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-<?php
-/*
- * StatusNet - the distributed open-source microblogging tool
- * Copyright (C) 2010, StatusNet, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-/**
- * Process a feed distribution POST from a PuSH hub.
- * @package FeedSub
- * @author Brion Vibber <brion@status.net>
- */
-
-class PushInputQueueHandler extends QueueHandler
-{
-    function transport()
-    {
-        return 'pushinput';
-    }
-
-    function handle($data)
-    {
-        assert(is_array($data));
-
-        $feedsub_id = $data['feedsub_id'];
-        $post = $data['post'];
-        $hmac = $data['hmac'];
-
-        $feedsub = FeedSub::staticGet('id', $feedsub_id);
-        if ($feedsub) {
-            $feedsub->receive($post, $hmac);
-        } else {
-            common_log(LOG_ERR, "Discarding POST to unknown feed subscription id $feedsub_id");
-        }
-        return true;
-    }
-}
diff --git a/plugins/OStatus/lib/pushinqueuehandler.php b/plugins/OStatus/lib/pushinqueuehandler.php
new file mode 100644 (file)
index 0000000..a90f52d
--- /dev/null
@@ -0,0 +1,49 @@
+<?php
+/*
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2010, StatusNet, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * Process a feed distribution POST from a PuSH hub.
+ * @package FeedSub
+ * @author Brion Vibber <brion@status.net>
+ */
+
+class PushInQueueHandler extends QueueHandler
+{
+    function transport()
+    {
+        return 'pushin';
+    }
+
+    function handle($data)
+    {
+        assert(is_array($data));
+
+        $feedsub_id = $data['feedsub_id'];
+        $post = $data['post'];
+        $hmac = $data['hmac'];
+
+        $feedsub = FeedSub::staticGet('id', $feedsub_id);
+        if ($feedsub) {
+            $feedsub->receive($post, $hmac);
+        } else {
+            common_log(LOG_ERR, "Discarding POST to unknown feed subscription id $feedsub_id");
+        }
+        return true;
+    }
+}
diff --git a/plugins/OStatus/lib/salmonoutqueuehandler.php b/plugins/OStatus/lib/salmonoutqueuehandler.php
deleted file mode 100644 (file)
index 536ff94..0000000
+++ /dev/null
@@ -1,44 +0,0 @@
-<?php
-/*
- * StatusNet - the distributed open-source microblogging tool
- * Copyright (C) 2010, StatusNet, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-/**
- * Send a Salmon notification in the background.
- * @package OStatusPlugin
- * @author Brion Vibber <brion@status.net>
- */
-class SalmonOutQueueHandler extends QueueHandler
-{
-    function transport()
-    {
-        return 'salmonout';
-    }
-
-    function handle($data)
-    {
-        assert(is_array($data));
-        assert(is_string($data['salmonuri']));
-        assert(is_string($data['entry']));
-
-        $salmon = new Salmon();
-        $salmon->post($data['salmonuri'], $data['entry']);
-
-        // @fixme detect failure and attempt to resend
-        return true;
-    }
-}
diff --git a/plugins/OStatus/lib/salmonqueuehandler.php b/plugins/OStatus/lib/salmonqueuehandler.php
new file mode 100644 (file)
index 0000000..aa97018
--- /dev/null
@@ -0,0 +1,44 @@
+<?php
+/*
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2010, StatusNet, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * Send a Salmon notification in the background.
+ * @package OStatusPlugin
+ * @author Brion Vibber <brion@status.net>
+ */
+class SalmonQueueHandler extends QueueHandler
+{
+    function transport()
+    {
+        return 'salmon';
+    }
+
+    function handle($data)
+    {
+        assert(is_array($data));
+        assert(is_string($data['salmonuri']));
+        assert(is_string($data['entry']));
+
+        $salmon = new Salmon();
+        $salmon->post($data['salmonuri'], $data['entry']);
+
+        // @fixme detect failure and attempt to resend
+        return true;
+    }
+}