]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
Offload inbox updates to a queue handler to speed up posting online
authorEvan Prodromou <evan@status.net>
Mon, 25 Jan 2010 23:08:21 +0000 (18:08 -0500)
committerEvan Prodromou <evan@status.net>
Mon, 25 Jan 2010 23:08:21 +0000 (18:08 -0500)
Moved much of the writing that happens when posting a notice to a new
queuehandler, distribqueuehandler. This updates tags, groups, replies
and inboxes at queue time (or at Web time, if queues are disabled).

To make this work well, I had to break up the monolithic
Notice::blowCaches() and make cache blowing happen closer to where
data is updated.

Squashed commit of the following:

commit 5257626c62750ac4ac1db0ce2b71410c5711cfa3
Author: Evan Prodromou <evan@status.net>
Date:   Mon Jan 25 14:56:41 2010 -0500

    slightly better handling of blowing tag memory cache

commit 8a22a3cdf6ec28685da129a0313e7b2a0837c9ef
Author: Evan Prodromou <evan@status.net>
Date:   Mon Jan 25 01:42:56 2010 -0500

    change 'distribute' to 'distrib' so not too long for dbqueue

commit 7a063315b0f7fad27cb6fbd2bdd74e253af83e4f
Author: Evan Prodromou <evan@status.net>
Date:   Mon Jan 25 01:39:15 2010 -0500

    change handle_notice() to handle() in distributqueuehandler

commit 1a39ccd28b9994137d7bfd21bb4f230546938e77
Author: Evan Prodromou <evan@status.net>
Date:   Mon Jan 25 16:05:25 2010 -0500

    error with queuemanager

commit e6b3bb93f305cfd2de71a6340b8aa6fb890049b7
Author: Evan Prodromou <evan@status.net>
Date:   Mon Jan 25 01:11:34 2010 -0500

    Blow memcache at different point rather than one big function for Notice class

commit 94d557cdc016187d1d0647ae1794cd94d6fb8ac8
Author: Evan Prodromou <evan@status.net>
Date:   Mon Jan 25 00:48:44 2010 -0500

    Blow memcache at different point rather than one big function for Notice class

commit 1c781dd08c88a35dafc5c01230b4872fd6b95182
Author: Evan Prodromou <evan@status.net>
Date:   Wed Jan 20 08:54:18 2010 -0500

    move broadcasting and distributing to new queuehandler

commit da3e46d26b84e4f028f34a13fd2ee373e4c1b954
Author: Evan Prodromou <evan@status.net>
Date:   Wed Jan 20 08:53:12 2010 -0500

    Move distribution of notices to new distribute queue handler

16 files changed:
actions/apistatusesretweet.php
actions/apistatusesupdate.php
actions/newnotice.php
actions/repeat.php
classes/Inbox.php
classes/Memcached_DataObject.php
classes/Notice.php
classes/Notice_tag.php
classes/User.php
lib/command.php
lib/distribqueuehandler.php [new file with mode: 0644]
lib/mailhandler.php
lib/oauthstore.php
lib/queuemanager.php
lib/util.php
plugins/Facebook/facebookaction.php

index d9d4820c0e35049623da216aa43c2317924432af..128c881e25aef3979e1f253f1f141f4ac46a07ce 100644 (file)
@@ -112,7 +112,7 @@ class ApiStatusesRetweetAction extends ApiAuthAction
 
         $repeat = $this->original->repeat($this->user->id, $this->source);
 
-        common_broadcast_notice($repeat);
+
 
         $this->showNotice($repeat);
     }
index f594bbf393626ffa67cccc8d5806ea9629df864a..9d831b9dbbc42c855dec90813c04861b0acd9029 100644 (file)
@@ -250,7 +250,7 @@ class ApiStatusesUpdateAction extends ApiAuthAction
                 $upload->attachToNotice($this->notice);
             }
 
-            common_broadcast_notice($this->notice);
+
         }
 
         $this->showNotice();
index a4ed87bb62e695b86cc02e026259dce025d01102..78480ababb069849508550141ee4c5df9862b78c 100644 (file)
@@ -201,7 +201,7 @@ class NewnoticeAction extends Action
             $upload->attachToNotice($notice);
         }
 
-        common_broadcast_notice($notice);
+
 
         if ($this->boolean('ajax')) {
             header('Content-Type: text/xml;charset=utf-8');
index b75523498b447cdfdbb489f53ed15d75d891a5c8..e112496bc120dbab109235077b5faa94260764a8 100644 (file)
@@ -106,7 +106,7 @@ class RepeatAction extends Action
     {
         $repeat = $this->notice->repeat($this->user->id, 'web');
 
-        common_broadcast_notice($repeat);
+
 
         if ($this->boolean('ajax')) {
             $this->startHTML('text/xml;charset=utf-8');
index 086dba1c9dbe266613dd84bfc3cd011673867139..26b27d2b58f71f47c2e6aa816a6cf668ab46cc3e 100644 (file)
@@ -120,11 +120,7 @@ class Inbox extends Memcached_DataObject
                                         $notice_id, $user_id));
 
         if ($result) {
-            $c = self::memcache();
-
-            if (!empty($c)) {
-                $c->delete(self::cacheKey('inbox', 'user_id', $user_id));
-            }
+            self::blow('inbox:user_id:%d', $user_id);
         }
 
         return $result;
index 6ddef4816015b7a5c3e8afb23629c1ebcc6a4241..2c9dcf59539784d0123503405b63287dd02abd01 100644 (file)
@@ -425,4 +425,23 @@ class Memcached_DataObject extends DB_DataObject
 
         return $dsn;
     }
+
+    static function blow()
+    {
+        $c = self::memcache();
+
+        if (empty($c)) {
+            return false;
+        }
+
+        $args = func_get_args();
+
+        $format = array_shift($args);
+
+        $keyPart = vsprintf($format, $args);
+
+        $cacheKey = common_cache_key($keyPart);
+
+        return $c->delete($cacheKey);
+    }
 }
index 38b10db048cf8cee663954b9210abcef7f5ab9f6..0966697e215431f05724a047433397c76799490b 100644 (file)
@@ -94,10 +94,6 @@ class Notice extends Memcached_DataObject
 
     function delete()
     {
-        $this->blowCaches(true);
-        $this->blowFavesCache(true);
-        $this->blowSubsCache(true);
-
         // For auditing purposes, save a record that the notice
         // was deleted.
 
@@ -109,31 +105,20 @@ class Notice extends Memcached_DataObject
         $deleted->created    = $this->created;
         $deleted->deleted    = common_sql_now();
 
-        $this->query('BEGIN');
-
         $deleted->insert();
 
-        //Null any notices that are replies to this notice
-        $this->query(sprintf("UPDATE notice set reply_to = null WHERE reply_to = %d", $this->id));
-
-        //Null any notices that are repeats of this notice
-        //XXX: probably need to uncache these, too
+        // Clear related records
 
-        $this->query(sprintf("UPDATE notice set repeat_of = null WHERE repeat_of = %d", $this->id));
+        $this->clearReplies();
+        $this->clearRepeats();
+        $this->clearFaves();
+        $this->clearTags();
+        $this->clearGroupInboxes();
 
-        $related = array('Reply',
-                         'Fave',
-                         'Notice_tag',
-                         'Group_inbox',
-                         'Queue_item');
+        // NOTE: we don't clear inboxes
+        // NOTE: we don't clear queue items
 
-        foreach ($related as $cls) {
-            $inst = new $cls();
-            $inst->notice_id = $this->id;
-            $inst->delete();
-        }
         $result = parent::delete();
-        $this->query('COMMIT');
     }
 
     function saveTags()
@@ -155,6 +140,7 @@ class Notice extends Memcached_DataObject
         foreach(array_unique($hashtags) as $hashtag) {
             /* elide characters we don't want in the tag */
             $this->saveTag($hashtag);
+            self::blow('profile:notice_ids_tagged:%d:%s', $this->profile_id, $tag->tag);
         }
         return true;
     }
@@ -172,6 +158,9 @@ class Notice extends Memcached_DataObject
                                               $last_error->message));
             return;
         }
+
+        // if it's saved, blow its cache
+        $tag->blowCache(false);
     }
 
     /**
@@ -331,27 +320,43 @@ class Notice extends Memcached_DataObject
                 }
             }
 
-            // XXX: do we need to change this for remote users?
+        }
+
+        # Clear the cache for subscribed users, so they'll update at next request
+        # XXX: someone clever could prepend instead of clearing the cache
+        $notice->blowOnInsert();
 
-            $notice->saveTags();
+        $qm = QueueManager::get();
 
-            $groups = $notice->saveGroups();
+        $qm->enqueue($notice, 'distrib');
 
-            $recipients = $notice->saveReplies();
+        return $notice;
+    }
 
-            $notice->addToInboxes($groups, $recipients);
+    function blowOnInsert()
+    {
+        self::blow('profile:notice_ids:%d', $this->profile_id);
+        self::blow('public');
 
-            $notice->saveUrls();
+        if ($this->conversation != $this->id) {
+            self::blow('notice:conversation_ids:%d', $this->conversation);
+        }
 
-            Event::handle('EndNoticeSave', array($notice));
+        if (!empty($this->repeat_of)) {
+            self::blow('notice:repeats:%d', $this->repeat_of);
         }
 
-        # Clear the cache for subscribed users, so they'll update at next request
-        # XXX: someone clever could prepend instead of clearing the cache
+        $original = Notice::staticGet('id', $this->repeat_of);
 
-        $notice->blowCaches();
+        if (!empty($original)) {
+            $originalUser = User::staticGet('id', $original->profile_id);
+            if (!empty($originalUser)) {
+                self::blow('user:repeats_of_me:%d', $originalUser->id);
+            }
+        }
 
-        return $notice;
+        $profile = Profile::staticGet($this->profile_id);
+        $profile->blowNoticeCount();
     }
 
     /** save all urls in the notice to the db
@@ -456,227 +461,6 @@ class Notice extends Memcached_DataObject
         return $att;
     }
 
-    function blowCaches($blowLast=false)
-    {
-        $this->blowSubsCache($blowLast);
-        $this->blowNoticeCache($blowLast);
-        $this->blowRepliesCache($blowLast);
-        $this->blowPublicCache($blowLast);
-        $this->blowTagCache($blowLast);
-        $this->blowGroupCache($blowLast);
-        $this->blowConversationCache($blowLast);
-        $this->blowRepeatCache();
-        $profile = Profile::staticGet($this->profile_id);
-        $profile->blowNoticeCount();
-    }
-
-    function blowRepeatCache()
-    {
-        if (!empty($this->repeat_of)) {
-            $cache = common_memcache();
-            if (!empty($cache)) {
-                // XXX: only blow if <100 in cache
-                $ck = common_cache_key('notice:repeats:'.$this->repeat_of);
-                $result = $cache->delete($ck);
-
-                $user = User::staticGet('id', $this->profile_id);
-
-                if (!empty($user)) {
-                    $uk = common_cache_key('user:repeated_by_me:'.$user->id);
-                    $cache->delete($uk);
-                    $user->free();
-                    unset($user);
-                }
-
-                $original = Notice::staticGet('id', $this->repeat_of);
-
-                if (!empty($original)) {
-                    $originalUser = User::staticGet('id', $original->profile_id);
-                    if (!empty($originalUser)) {
-                        $ouk = common_cache_key('user:repeats_of_me:'.$originalUser->id);
-                        $cache->delete($ouk);
-                        $originalUser->free();
-                        unset($originalUser);
-                    }
-                    $original->free();
-                    unset($original);
-                }
-            }
-        }
-    }
-
-    function blowConversationCache($blowLast=false)
-    {
-        $cache = common_memcache();
-        if ($cache) {
-            $ck = common_cache_key('notice:conversation_ids:'.$this->conversation);
-            $cache->delete($ck);
-            if ($blowLast) {
-                $cache->delete($ck.';last');
-            }
-        }
-    }
-
-    function blowGroupCache($blowLast=false)
-    {
-        $cache = common_memcache();
-        if ($cache) {
-            $group_inbox = new Group_inbox();
-            $group_inbox->notice_id = $this->id;
-            if ($group_inbox->find()) {
-                while ($group_inbox->fetch()) {
-                    $cache->delete(common_cache_key('user_group:notice_ids:' . $group_inbox->group_id));
-                    if ($blowLast) {
-                        $cache->delete(common_cache_key('user_group:notice_ids:' . $group_inbox->group_id.';last'));
-                    }
-                    $member = new Group_member();
-                    $member->group_id = $group_inbox->group_id;
-                    if ($member->find()) {
-                        while ($member->fetch()) {
-                            $cache->delete(common_cache_key('notice_inbox:by_user:' . $member->profile_id));
-                            $cache->delete(common_cache_key('notice_inbox:by_user_own:' . $member->profile_id));
-                            if (empty($this->repeat_of)) {
-                                $cache->delete(common_cache_key('user:friends_timeline:' . $member->profile_id));
-                                $cache->delete(common_cache_key('user:friends_timeline_own:' . $member->profile_id));
-                            }
-                            if ($blowLast) {
-                                $cache->delete(common_cache_key('notice_inbox:by_user:' . $member->profile_id . ';last'));
-                                $cache->delete(common_cache_key('notice_inbox:by_user_own:' . $member->profile_id . ';last'));
-                                if (empty($this->repeat_of)) {
-                                    $cache->delete(common_cache_key('user:friends_timeline:' . $member->profile_id . ';last'));
-                                    $cache->delete(common_cache_key('user:friends_timeline_own:' . $member->profile_id . ';last'));
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-            $group_inbox->free();
-            unset($group_inbox);
-        }
-    }
-
-    function blowTagCache($blowLast=false)
-    {
-        $cache = common_memcache();
-        if ($cache) {
-            $tag = new Notice_tag();
-            $tag->notice_id = $this->id;
-            if ($tag->find()) {
-                while ($tag->fetch()) {
-                    $tag->blowCache($blowLast);
-                    $ck = 'profile:notice_ids_tagged:' . $this->profile_id . ':' . $tag->tag;
-
-                    $cache->delete($ck);
-                    if ($blowLast) {
-                        $cache->delete($ck . ';last');
-                    }
-                }
-            }
-            $tag->free();
-            unset($tag);
-        }
-    }
-
-    function blowSubsCache($blowLast=false)
-    {
-        $cache = common_memcache();
-        if ($cache) {
-            $user = new User();
-
-            $UT = common_config('db','type')=='pgsql'?'"user"':'user';
-            $user->query('SELECT id ' .
-
-                         "FROM $UT JOIN subscription ON $UT.id = subscription.subscriber " .
-                         'WHERE subscription.subscribed = ' . $this->profile_id);
-
-            while ($user->fetch()) {
-                $cache->delete(common_cache_key('notice_inbox:by_user:'.$user->id));
-                $cache->delete(common_cache_key('notice_inbox:by_user_own:'.$user->id));
-                if (empty($this->repeat_of)) {
-                    $cache->delete(common_cache_key('user:friends_timeline:'.$user->id));
-                    $cache->delete(common_cache_key('user:friends_timeline_own:'.$user->id));
-                }
-                if ($blowLast) {
-                    $cache->delete(common_cache_key('notice_inbox:by_user:'.$user->id.';last'));
-                    $cache->delete(common_cache_key('notice_inbox:by_user_own:'.$user->id.';last'));
-                    if (empty($this->repeat_of)) {
-                        $cache->delete(common_cache_key('user:friends_timeline:'.$user->id.';last'));
-                        $cache->delete(common_cache_key('user:friends_timeline_own:'.$user->id.';last'));
-                    }
-                }
-            }
-            $user->free();
-            unset($user);
-        }
-    }
-
-    function blowNoticeCache($blowLast=false)
-    {
-        if ($this->is_local) {
-            $cache = common_memcache();
-            if (!empty($cache)) {
-                $cache->delete(common_cache_key('profile:notice_ids:'.$this->profile_id));
-                if ($blowLast) {
-                    $cache->delete(common_cache_key('profile:notice_ids:'.$this->profile_id.';last'));
-                }
-            }
-        }
-    }
-
-    function blowRepliesCache($blowLast=false)
-    {
-        $cache = common_memcache();
-        if ($cache) {
-            $reply = new Reply();
-            $reply->notice_id = $this->id;
-            if ($reply->find()) {
-                while ($reply->fetch()) {
-                    $cache->delete(common_cache_key('reply:stream:'.$reply->profile_id));
-                    if ($blowLast) {
-                        $cache->delete(common_cache_key('reply:stream:'.$reply->profile_id.';last'));
-                    }
-                }
-            }
-            $reply->free();
-            unset($reply);
-        }
-    }
-
-    function blowPublicCache($blowLast=false)
-    {
-        if ($this->is_local == Notice::LOCAL_PUBLIC) {
-            $cache = common_memcache();
-            if ($cache) {
-                $cache->delete(common_cache_key('public'));
-                if ($blowLast) {
-                    $cache->delete(common_cache_key('public').';last');
-                }
-            }
-        }
-    }
-
-    function blowFavesCache($blowLast=false)
-    {
-        $cache = common_memcache();
-        if ($cache) {
-            $fave = new Fave();
-            $fave->notice_id = $this->id;
-            if ($fave->find()) {
-                while ($fave->fetch()) {
-                    $cache->delete(common_cache_key('fave:ids_by_user:'.$fave->user_id));
-                    $cache->delete(common_cache_key('fave:by_user_own:'.$fave->user_id));
-                    if ($blowLast) {
-                        $cache->delete(common_cache_key('fave:ids_by_user:'.$fave->user_id.';last'));
-                        $cache->delete(common_cache_key('fave:by_user_own:'.$fave->user_id.';last'));
-                    }
-                }
-            }
-            $fave->free();
-            unset($fave);
-        }
-    }
-
     function getStreamByIds($ids)
     {
         $cache = common_memcache();
@@ -999,7 +783,14 @@ class Notice extends Memcached_DataObject
             $gi->notice_id = $this->id;
             $gi->created   = $this->created;
 
-            return $gi->insert();
+            $result = $gi->insert();
+
+            if (!result) {
+                common_log_db_error($gi, 'INSERT', __FILE__);
+                throw new ServerException(_('Problem saving group inbox.'));
+            }
+
+            self::blow('user_group:notice_ids:%d', $gi->group_id);
         }
 
         return true;
@@ -1094,7 +885,8 @@ class Notice extends Memcached_DataObject
 
         foreach ($recipientIds as $recipientId) {
             $user = User::staticGet('id', $recipientId);
-            if ($user) {
+            if (!empty($user)) {
+                self::blow('reply:stream:%d', $reply->profile_id);
                 mail_notify_attn($user, $this);
             }
         }
@@ -1553,4 +1345,104 @@ class Notice extends Memcached_DataObject
 
         return $options;
     }
+
+    function clearReplies()
+    {
+        $replyNotice = new Notice();
+        $replyNotice->reply_to = $this->id;
+
+        //Null any notices that are replies to this notice
+
+        if ($replyNotice->find()) {
+            while ($replyNotice->fetch()) {
+                $orig = clone($replyNotice);
+                $replyNotice->reply_to = null;
+                $replyNotice->update($orig);
+            }
+        }
+
+        // Reply records
+
+        $reply = new Reply();
+        $reply->notice_id = $this->id;
+
+        if ($reply->find()) {
+            while($reply->fetch()) {
+                self::blow('reply:stream:%d', $reply->profile_id);
+                $reply->delete();
+            }
+        }
+
+        $reply->free();
+
+        return $ids;
+    }
+
+    function clearRepeats()
+    {
+        $repeatNotice = new Notice();
+        $repeatNotice->repeat_of = $this->id;
+
+        //Null any notices that are repeats of this notice
+
+        if ($repeatNotice->find()) {
+            while ($repeatNotice->fetch()) {
+                $orig = clone($repeatNotice);
+                $repeatNotice->repeat_of = null;
+                $repeatNotice->update($orig);
+            }
+        }
+    }
+
+    function clearFaves()
+    {
+        $fave = new Fave();
+        $fave->notice_id = $this->id;
+
+        if ($fave->find()) {
+            while ($fave->fetch()) {
+                self::blow('fave:ids_by_user_own:%d', $fave->user_id);
+                self::blow('fave:ids_by_user_own:%d;last', $fave->user_id);
+                self::blow('fave:ids_by_user:%d', $fave->user_id);
+                self::blow('fave:ids_by_user:%d;last', $fave->user_id);
+                $fave->delete();
+            }
+        }
+
+        $fave->free();
+    }
+
+    function clearTags()
+    {
+        $tag = new Notice_tag();
+        $tag->notice_id = $this->id;
+
+        if ($tag->find()) {
+            while ($tag->fetch()) {
+                self::blow('profile:notice_ids_tagged:%d:%s', $this->profile_id, common_keyize($tag->tag));
+                self::blow('profile:notice_ids_tagged:%d:%s;last', $this->profile_id, common_keyize($tag->tag));
+                self::blow('notice_tag:notice_ids:%s', common_keyize($tag->tag));
+                self::blow('notice_tag:notice_ids:%s;last', common_keyize($tag->tag));
+                $tag->delete();
+            }
+        }
+
+        $tag->free();
+    }
+
+    function clearGroupInboxes()
+    {
+        $gi = new Group_inbox();
+
+        $gi->notice_id = $this->id;
+
+        if ($gi->find()) {
+            while ($gi->fetch()) {
+                self::blow('user_group:notice_ids:%d', $gi->group_id);
+                $gi->delete();
+            }
+        }
+
+        $gi->free();
+    }
 }
index 79231f0b0c0777749c09cf91da451192ed55cb2f..4fd76e8ea85e8c04c9119b370bf3c89a462d7dec 100644 (file)
@@ -86,13 +86,9 @@ class Notice_tag extends Memcached_DataObject
 
     function blowCache($blowLast=false)
     {
-        $cache = common_memcache();
-        if ($cache) {
-            $idkey = common_cache_key('notice_tag:notice_ids:' . common_keyize($this->tag));
-            $cache->delete($idkey);
-            if ($blowLast) {
-                $cache->delete($idkey.';last');
-            }
+        self::blow('notice_tag:notice_ids:%s', common_keyize($this->tag));
+        if ($blowLast) {
+            self::blow('notice_tag:notice_ids:%s;last', common_keyize($this->tag));
         }
     }
 
index d6b52be0170813b5547189354ee8a003faaa1c1c..6ea975202d2c732427a930a54451e647263605f3 100644 (file)
@@ -383,7 +383,7 @@ class User extends Memcached_DataObject
                                                   common_config('site', 'name'),
                                                   $user->nickname),
                                           'system');
-                common_broadcast_notice($notice);
+
             }
         }
 
index c0a32e1b1a4cb2184434e96bc5423a57749262cd..2a51fd6872835781c424cf7e718f226e73b2a79f 100644 (file)
@@ -422,7 +422,7 @@ class RepeatCommand extends Command
         $repeat = $notice->repeat($this->user->id, $channel->source);
 
         if ($repeat) {
-            common_broadcast_notice($repeat);
+
             $channel->output($this->user, sprintf(_('Notice from %s repeated'), $recipient->nickname));
         } else {
             $channel->error($this->user, _('Error repeating notice.'));
@@ -492,7 +492,7 @@ class ReplyCommand extends Command
         } else {
             $channel->error($this->user, _('Error saving notice.'));
         }
-        common_broadcast_notice($notice);
+
     }
 }
 
diff --git a/lib/distribqueuehandler.php b/lib/distribqueuehandler.php
new file mode 100644 (file)
index 0000000..f458d23
--- /dev/null
@@ -0,0 +1,84 @@
+<?php
+/*
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2008, 2009, StatusNet, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
+
+/**
+ * Base class for queue handlers.
+ *
+ * As extensions of the Daemon class, each queue handler has the ability
+ * to launch itself in the background, at which point it'll pass control
+ * to the configured QueueManager class to poll for updates.
+ *
+ * Subclasses must override at least the following methods:
+ * - transport
+ * - handle_notice
+ */
+
+class DistribQueueHandler
+{
+    /**
+     * Return transport keyword which identifies items this queue handler
+     * services; must be defined for all subclasses.
+     *
+     * Must be 8 characters or less to fit in the queue_item database.
+     * ex "email", "jabber", "sms", "irc", ...
+     *
+     * @return string
+     */
+
+    function transport()
+    {
+        return 'distrib';
+    }
+
+    /**
+     * Here's the meat of your queue handler -- you're handed a Notice
+     * object, which you may do as you will with.
+     *
+     * If this function indicates failure, a warning will be logged
+     * and the item is placed back in the queue to be re-run.
+     *
+     * @param Notice $notice
+     * @return boolean true on success, false on failure
+     */
+    function handle($notice)
+    {
+        // XXX: do we need to change this for remote users?
+
+        $notice->saveTags();
+
+        $groups = $notice->saveGroups();
+
+        $recipients = $notice->saveReplies();
+
+        $notice->addToInboxes($groups, $recipients);
+
+        $notice->saveUrls();
+
+        Event::handle('EndNoticeSave', array($notice));
+
+        // Enqueue for other handlers
+
+        common_enqueue_notice($notice);
+
+        return true;
+    }
+}
+
index 85be89f1868540b10e41f232ed9d4cdff82f26e6..890f6d5b49fea9decfaf8476b0c3dbe5e28cb70c 100644 (file)
@@ -160,7 +160,7 @@ class MailHandler
         foreach($mediafiles as $mf){
             $mf->attachToNotice($notice);
         }
-        common_broadcast_notice($notice);
+
         $this->log(LOG_INFO,
                    'Added notice ' . $notice->id . ' from user ' . $user->nickname);
         return true;
index df63cc1512c12e800158d335606c52a8b9294b6c..b30fb49d57e2f16b0c737df00d0354492dbd4aa9 100644 (file)
@@ -362,7 +362,7 @@ class StatusNetOAuthDataStore extends OAuthDataStore
                                   array('is_local' => Notice::REMOTE_OMB,
                                         'uri' => $omb_notice->getIdentifierURI()));
 
-        common_broadcast_notice($notice, true);
+
     }
 
     /**
index 4eb39bfa8c72ecf75ef76630244b3a513fe42a97..e5cf8239e9cb1ee49dd5681032995b20a8eaa412 100644 (file)
@@ -217,6 +217,7 @@ abstract class QueueManager extends IoManager
             $this->connect('plugin', 'PluginQueueHandler');
             $this->connect('omb', 'OmbQueueHandler');
             $this->connect('ping', 'PingQueueHandler');
+            $this->connect('distrib', 'DistribQueueHandler');
             if (common_config('sms', 'enabled')) {
                 $this->connect('sms', 'SmsQueueHandler');
             }
@@ -224,7 +225,7 @@ abstract class QueueManager extends IoManager
             // XMPP output handlers...
             $this->connect('jabber', 'JabberQueueHandler');
             $this->connect('public', 'PublicQueueHandler');
-            
+
             // @fixme this should get an actual queue
             //$this->connect('confirm', 'XmppConfirmHandler');
 
index fb3b8be8765b3ab7c5a79924170b11ebe049d636..4312f9876b36ecf1916a972b9985d093909c6c5b 100644 (file)
@@ -987,7 +987,7 @@ function common_redirect($url, $code=307)
 
 function common_broadcast_notice($notice, $remote=false)
 {
-    return common_enqueue_notice($notice);
+    // DO NOTHING!
 }
 
 // Stick the notice on the queue
index bf9c037a579b791e240d99767005bcbe61d9aee1..815fee094ca69510036994c9a628583cc0eb799d 100644 (file)
@@ -397,7 +397,7 @@ class FacebookAction extends Action
             return;
         }
 
-        common_broadcast_notice($notice);
+
 
     }