]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
Break up stream code to use separate notice stream classes
authorEvan Prodromou <evan@status.net>
Thu, 24 Mar 2011 22:04:19 +0000 (18:04 -0400)
committerEvan Prodromou <evan@status.net>
Thu, 24 Mar 2011 22:04:19 +0000 (18:04 -0400)
Rearchitect (again!) notice stream code to delegate different functionality up and down the stack.

Now, different classes implement NoticeStream.

21 files changed:
classes/Fave.php
classes/File.php
classes/Notice.php
classes/Notice_tag.php
classes/Profile.php
classes/Reply.php
classes/User.php
classes/User_group.php
lib/cachingnoticestream.php [new file with mode: 0644]
lib/conversationnoticestream.php [new file with mode: 0644]
lib/favenoticestream.php [new file with mode: 0644]
lib/filenoticestream.php [new file with mode: 0644]
lib/groupnoticestream.php [new file with mode: 0644]
lib/noticestream.php
lib/profilenoticestream.php [new file with mode: 0644]
lib/publicnoticestream.php [new file with mode: 0644]
lib/repeatedbymenoticestream.php [new file with mode: 0644]
lib/repeatsofmenoticestream.php [new file with mode: 0644]
lib/replynoticestream.php [new file with mode: 0644]
lib/taggedprofilenoticestream.php [new file with mode: 0644]
lib/tagnoticestream.php [new file with mode: 0644]

index 7cd64982cd8e818408bc56adc3dc6b1314301d7d..e8fdbffc719d47ca934cecda0538145bb3e5ac7d 100644 (file)
@@ -79,82 +79,18 @@ class Fave extends Memcached_DataObject
 
     function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $own=false, $since_id=0, $max_id=0)
     {
-        $stream = new NoticeStream(array('Fave', '_streamDirect'),
-                                   array($user_id, $own),
-                                   ($own) ? 'fave:ids_by_user_own:'.$user_id :
-                                   'fave:ids_by_user:'.$user_id);
+        $stream = new FaveNoticeStream($user_id, $own);
 
         return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
     function idStream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $own=false, $since_id=0, $max_id=0)
     {
-        $stream = new NoticeStream(array('Fave', '_streamDirect'),
-                                   array($user_id, $own),
-                                   ($own) ? 'fave:ids_by_user_own:'.$user_id :
-                                   'fave:ids_by_user:'.$user_id);
+        $stream = new FaveNoticeStream($user_id, $own);
 
         return $stream->getNoticeIds($offset, $limit, $since_id, $max_id);
     }
 
-    /**
-     * Note that the sorting for this is by order of *fave* not order of *notice*.
-     *
-     * @fixme add since_id, max_id support?
-     *
-     * @param <type> $user_id
-     * @param <type> $own
-     * @param <type> $offset
-     * @param <type> $limit
-     * @param <type> $since_id
-     * @param <type> $max_id
-     * @return <type>
-     */
-    function _streamDirect($user_id, $own, $offset, $limit, $since_id, $max_id)
-    {
-        $fav = new Fave();
-        $qry = null;
-
-        if ($own) {
-            $qry  = 'SELECT fave.* FROM fave ';
-            $qry .= 'WHERE fave.user_id = ' . $user_id . ' ';
-        } else {
-             $qry =  'SELECT fave.* FROM fave ';
-             $qry .= 'INNER JOIN notice ON fave.notice_id = notice.id ';
-             $qry .= 'WHERE fave.user_id = ' . $user_id . ' ';
-             $qry .= 'AND notice.is_local != ' . Notice::GATEWAY . ' ';
-        }
-
-        if ($since_id != 0) {
-            $qry .= 'AND notice_id > ' . $since_id . ' ';
-        }
-
-        if ($max_id != 0) {
-            $qry .= 'AND notice_id <= ' . $max_id . ' ';
-        }
-
-        // NOTE: we sort by fave time, not by notice time!
-
-        $qry .= 'ORDER BY modified DESC ';
-
-        if (!is_null($offset)) {
-            $qry .= "LIMIT $limit OFFSET $offset";
-        }
-
-        $fav->query($qry);
-
-        $ids = array();
-
-        while ($fav->fetch()) {
-            $ids[] = $fav->notice_id;
-        }
-
-        $fav->free();
-        unset($fav);
-
-        return $ids;
-    }
-
     function asActivity()
     {
         $notice  = Notice::staticGet('id', $this->notice_id);
index 681c33f9cd4f788c962ed6379cfb8ed19d4b2d5c..36ffff585c58c6f3b56762bde062847e14b8572b 100644 (file)
@@ -449,53 +449,10 @@ class File extends Memcached_DataObject
 
     function stream($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
     {
-        $stream = new NoticeStream(array($this, '_streamDirect'),
-                                   array(),
-                                   'file:notice-ids:'.$this->url);
-
+        $stream = new FileNoticeStream($this);
         return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
-    /**
-     * Stream of notices linking to this URL
-     *
-     * @param integer $offset   Offset to show; default is 0
-     * @param integer $limit    Limit of notices to show
-     * @param integer $since_id Since this notice
-     * @param integer $max_id   Before this notice
-     *
-     * @return array ids of notices that link to this file
-     */
-
-    function _streamDirect($offset, $limit, $since_id, $max_id)
-    {
-        $f2p = new File_to_post();
-
-        $f2p->selectAdd();
-        $f2p->selectAdd('post_id');
-
-        $f2p->file_id = $this->id;
-
-        Notice::addWhereSinceId($f2p, $since_id, 'post_id', 'modified');
-        Notice::addWhereMaxId($f2p, $max_id, 'post_id', 'modified');
-
-        $f2p->orderBy('modified DESC, post_id DESC');
-
-        if (!is_null($offset)) {
-            $f2p->limit($offset, $limit);
-        }
-
-        $ids = array();
-
-        if ($f2p->find()) {
-            while ($f2p->fetch()) {
-                $ids[] = $f2p->post_id;
-            }
-        }
-
-        return $ids;
-    }
-
     function noticeCount()
     {
         $cacheKey = sprintf('file:notice-count:%d', $this->id);
index 1201dd902b82e846ca31cac00bf657a473a98c30..114119bfc9f7027f23d56b9877a58b94a4e48f7b 100644 (file)
@@ -45,7 +45,7 @@ require_once INSTALLDIR.'/classes/Memcached_DataObject.php';
 /* We keep 200 notices, the max number of notices available per API request,
  * in the memcached cache. */
 
-define('NOTICE_CACHE_WINDOW', NoticeStream::CACHE_WINDOW);
+define('NOTICE_CACHE_WINDOW', CachingNoticeStream::CACHE_WINDOW);
 
 define('MAX_BOXCARS', 128);
 
@@ -548,7 +548,7 @@ class Notice extends Memcached_DataObject
         if (empty($profile)) {
             return false;
         }
-        $notice = $profile->getNotices(0, NoticeStream::CACHE_WINDOW);
+        $notice = $profile->getNotices(0, CachingNoticeStream::CACHE_WINDOW);
         if (!empty($notice)) {
             $last = 0;
             while ($notice->fetch()) {
@@ -632,92 +632,18 @@ class Notice extends Memcached_DataObject
 
     function publicStream($offset=0, $limit=20, $since_id=0, $max_id=0)
     {
-        $stream = new NoticeStream(array('Notice', '_publicStreamDirect'),
-                                   array(),
-                                   'public');
-
+        $stream = new PublicNoticeStream();
         return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
-    function _publicStreamDirect($offset=0, $limit=20, $since_id=0, $max_id=0)
-    {
-        $notice = new Notice();
-
-        $notice->selectAdd(); // clears it
-        $notice->selectAdd('id');
-
-        $notice->orderBy('created DESC, id DESC');
-
-        if (!is_null($offset)) {
-            $notice->limit($offset, $limit);
-        }
-
-        if (common_config('public', 'localonly')) {
-            $notice->whereAdd('is_local = ' . Notice::LOCAL_PUBLIC);
-        } else {
-            // -1 == blacklisted, -2 == gateway (i.e. Twitter)
-            $notice->whereAdd('is_local !='. Notice::LOCAL_NONPUBLIC);
-            $notice->whereAdd('is_local !='. Notice::GATEWAY);
-        }
-
-        Notice::addWhereSinceId($notice, $since_id);
-        Notice::addWhereMaxId($notice, $max_id);
-
-        $ids = array();
-
-        if ($notice->find()) {
-            while ($notice->fetch()) {
-                $ids[] = $notice->id;
-            }
-        }
-
-        $notice->free();
-        $notice = NULL;
-
-        return $ids;
-    }
 
     function conversationStream($id, $offset=0, $limit=20, $since_id=0, $max_id=0)
     {
-        $stream = new NoticeStream(array('Notice', '_conversationStreamDirect'),
-                                   array($id),
-                                   'notice:conversation_ids:'.$id);
+        $stream = new ConversationNoticeStream($id);
 
         return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
-    function _conversationStreamDirect($id, $offset=0, $limit=20, $since_id=0, $max_id=0)
-    {
-        $notice = new Notice();
-
-        $notice->selectAdd(); // clears it
-        $notice->selectAdd('id');
-
-        $notice->conversation = $id;
-
-        $notice->orderBy('created DESC, id DESC');
-
-        if (!is_null($offset)) {
-            $notice->limit($offset, $limit);
-        }
-
-        Notice::addWhereSinceId($notice, $since_id);
-        Notice::addWhereMaxId($notice, $max_id);
-
-        $ids = array();
-
-        if ($notice->find()) {
-            while ($notice->fetch()) {
-                $ids[] = $notice->id;
-            }
-        }
-
-        $notice->free();
-        $notice = NULL;
-
-        return $ids;
-    }
-
     /**
      * Is this notice part of an active conversation?
      *
index 813242253dbd2d7dc2706c5188f2e293b8defc5c..809403a9bd99ac89be0785ad0e1c01655f15d307 100644 (file)
@@ -38,42 +38,11 @@ class Notice_tag extends Memcached_DataObject
 
     static function getStream($tag, $offset=0, $limit=20, $sinceId=0, $maxId=0)
     {
-        $stream = new NoticeStream(array('Notice_tag', '_streamDirect'),
-                                   array($tag),
-                                   'notice_tag:notice_ids:' . Cache::keyize($tag));
+        $stream = new TagNoticeStream($tag);
         
         return $stream->getNotices($offset, $limit, $sinceId, $maxId);
     }
 
-    function _streamDirect($tag, $offset, $limit, $since_id, $max_id)
-    {
-        $nt = new Notice_tag();
-
-        $nt->tag = $tag;
-
-        $nt->selectAdd();
-        $nt->selectAdd('notice_id');
-
-        Notice::addWhereSinceId($nt, $since_id, 'notice_id');
-        Notice::addWhereMaxId($nt, $max_id, 'notice_id');
-
-        $nt->orderBy('created DESC, notice_id DESC');
-
-        if (!is_null($offset)) {
-            $nt->limit($offset, $limit);
-        }
-
-        $ids = array();
-
-        if ($nt->find()) {
-            while ($nt->fetch()) {
-                $ids[] = $nt->notice_id;
-            }
-        }
-
-        return $ids;
-    }
-
     function blowCache($blowLast=false)
     {
         self::blow('notice_tag:notice_ids:%s', Cache::keyize($this->tag));
index a36024842e8651b565701ebb390dd0a35aeb9fe6..b582451350f94551e74b30a8df31ffa21dcfe941 100644 (file)
@@ -198,90 +198,18 @@ class Profile extends Memcached_DataObject
 
     function getTaggedNotices($tag, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
     {
-        $stream = new NoticeStream(array($this, '_streamTaggedDirect'),
-                                   array($tag),
-                                   'profile:notice_ids_tagged:'.$this->id.':'.$tag);
+        $stream = new TaggedProfileNoticeStream($this, $tag);
 
         return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
     function getNotices($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
     {
-        $stream = new NoticeStream(array($this, '_streamDirect'),
-                                   array(),
-                                   'profile:notice_ids:' . $this->id);
+        $stream = new ProfileNoticeStream($this);
 
         return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
-    function _streamTaggedDirect($tag, $offset, $limit, $since_id, $max_id)
-    {
-        // XXX It would be nice to do this without a join
-        // (necessary to do it efficiently on accounts with long history)
-
-        $notice = new Notice();
-
-        $query =
-          "select id from notice join notice_tag on id=notice_id where tag='".
-          $notice->escape($tag) .
-          "' and profile_id=" . intval($this->id);
-
-        $since = Notice::whereSinceId($since_id, 'id', 'notice.created');
-        if ($since) {
-            $query .= " and ($since)";
-        }
-
-        $max = Notice::whereMaxId($max_id, 'id', 'notice.created');
-        if ($max) {
-            $query .= " and ($max)";
-        }
-
-        $query .= ' order by notice.created DESC, id DESC';
-
-        if (!is_null($offset)) {
-            $query .= " LIMIT " . intval($limit) . " OFFSET " . intval($offset);
-        }
-
-        $notice->query($query);
-
-        $ids = array();
-
-        while ($notice->fetch()) {
-            $ids[] = $notice->id;
-        }
-
-        return $ids;
-    }
-
-    function _streamDirect($offset, $limit, $since_id, $max_id)
-    {
-        $notice = new Notice();
-
-        $notice->profile_id = $this->id;
-
-        $notice->selectAdd();
-        $notice->selectAdd('id');
-
-        Notice::addWhereSinceId($notice, $since_id);
-        Notice::addWhereMaxId($notice, $max_id);
-
-        $notice->orderBy('created DESC, id DESC');
-
-        if (!is_null($offset)) {
-            $notice->limit($offset, $limit);
-        }
-
-        $notice->find();
-
-        $ids = array();
-
-        while ($notice->fetch()) {
-            $ids[] = $notice->id;
-        }
-
-        return $ids;
-    }
-
     function isMember($group)
     {
         $mem = new Group_member();
@@ -551,7 +479,7 @@ class Profile extends Memcached_DataObject
             // This is the stream of favorite notices, in rev chron
             // order. This forces it into cache.
 
-            $ids = Fave::idStream($this->id, 0, NoticeStream::CACHE_WINDOW);
+            $ids = Fave::idStream($this->id, 0, CachingNoticeStream::CACHE_WINDOW);
 
             // If it's in the list, then it's a fave
 
@@ -563,7 +491,7 @@ class Profile extends Memcached_DataObject
             // then the cache has all available faves, so this one
             // is not a fave.
 
-            if (count($ids) < NoticeStream::CACHE_WINDOW) {
+            if (count($ids) < CachingNoticeStream::CACHE_WINDOW) {
                 return false;
             }
 
index d5341b9a05cdd49f18afe2b3f7d760ce7223b3c9..9ba623ba3fea630d9f8a3fb76e5265dbf5f3a2fb 100644 (file)
@@ -38,35 +38,8 @@ class Reply extends Memcached_DataObject
 
     function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
     {
-        $stream = new NoticeStream(array('Reply', '_streamDirect'),
-                                   array($user_id),
-                                   'reply:stream:' . $user_id);
+        $stream = new ReplyNoticeStream($user_id);
 
         return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
-
-    function _streamDirect($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
-    {
-        $reply = new Reply();
-        $reply->profile_id = $user_id;
-
-        Notice::addWhereSinceId($reply, $since_id, 'notice_id', 'modified');
-        Notice::addWhereMaxId($reply, $max_id, 'notice_id', 'modified');
-
-        $reply->orderBy('modified DESC, notice_id DESC');
-
-        if (!is_null($offset)) {
-            $reply->limit($offset, $limit);
-        }
-
-        $ids = array();
-
-        if ($reply->find()) {
-            while ($reply->fetch()) {
-                $ids[] = $reply->notice_id;
-            }
-        }
-
-        return $ids;
-    }
 }
index 4bd7b039df33df7a05732358762222b353ab10d4..1a3a7dfd72246d107103361f8f471cda441b35d6 100644 (file)
@@ -767,93 +767,18 @@ class User extends Memcached_DataObject
 
     function repeatedByMe($offset=0, $limit=20, $since_id=null, $max_id=null)
     {
-        $stream = new NoticeStream(array($this, '_repeatedByMeDirect'),
-                                   array(),
-                                   'user:repeated_by_me:'.$this->id);
-
+        $stream = new RepeatedByMeNoticeStream($this);
         return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
-    function _repeatedByMeDirect($offset, $limit, $since_id, $max_id)
-    {
-        $notice = new Notice();
-
-        $notice->selectAdd(); // clears it
-        $notice->selectAdd('id');
-
-        $notice->profile_id = $this->id;
-        $notice->whereAdd('repeat_of IS NOT NULL');
-
-        $notice->orderBy('created DESC, id DESC');
-
-        if (!is_null($offset)) {
-            $notice->limit($offset, $limit);
-        }
-
-        Notice::addWhereSinceId($notice, $since_id);
-        Notice::addWhereMaxId($notice, $max_id);
-
-        $ids = array();
-
-        if ($notice->find()) {
-            while ($notice->fetch()) {
-                $ids[] = $notice->id;
-            }
-        }
-
-        $notice->free();
-        $notice = NULL;
-
-        return $ids;
-    }
 
     function repeatsOfMe($offset=0, $limit=20, $since_id=null, $max_id=null)
     {
-        $stream = new NoticeStream(array($this, '_repeatsOfMeDirect'),
-                                   array(),
-                                   'user:repeats_of_me:'.$this->id);
+        $stream = new RepeatsOfMeNoticeStream($this);
 
         return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
-    function _repeatsOfMeDirect($offset, $limit, $since_id, $max_id)
-    {
-        $qry =
-          'SELECT DISTINCT original.id AS id ' .
-          'FROM notice original JOIN notice rept ON original.id = rept.repeat_of ' .
-          'WHERE original.profile_id = ' . $this->id . ' ';
-
-        $since = Notice::whereSinceId($since_id, 'original.id', 'original.created');
-        if ($since) {
-            $qry .= "AND ($since) ";
-        }
-
-        $max = Notice::whereMaxId($max_id, 'original.id', 'original.created');
-        if ($max) {
-            $qry .= "AND ($max) ";
-        }
-
-        $qry .= 'ORDER BY original.created, original.id DESC ';
-
-        if (!is_null($offset)) {
-            $qry .= "LIMIT $limit OFFSET $offset";
-        }
-
-        $ids = array();
-
-        $notice = new Notice();
-
-        $notice->query($qry);
-
-        while ($notice->fetch()) {
-            $ids[] = $notice->id;
-        }
-
-        $notice->free();
-        $notice = NULL;
-
-        return $ids;
-    }
 
     function repeatedToMe($offset=0, $limit=20, $since_id=null, $max_id=null)
     {
index 4d6dcfab689d536834e7b79e7f3bc6ddb16f6e0f..8587f15771d161e597914c982e38b2ce733043fb 100644 (file)
@@ -87,41 +87,11 @@ class User_group extends Memcached_DataObject
 
     function getNotices($offset, $limit, $since_id=null, $max_id=null)
     {
-        $stream = new NoticeStream(array($this, '_streamDirect'),
-                                   array(),
-                                   'user_group:notice_ids:' . $this->id);
+        $stream = new GroupNoticeStream($this);
 
         return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
-    function _streamDirect($offset, $limit, $since_id, $max_id)
-    {
-        $inbox = new Group_inbox();
-
-        $inbox->group_id = $this->id;
-
-        $inbox->selectAdd();
-        $inbox->selectAdd('notice_id');
-
-        Notice::addWhereSinceId($inbox, $since_id, 'notice_id');
-        Notice::addWhereMaxId($inbox, $max_id, 'notice_id');
-
-        $inbox->orderBy('created DESC, notice_id DESC');
-
-        if (!is_null($offset)) {
-            $inbox->limit($offset, $limit);
-        }
-
-        $ids = array();
-
-        if ($inbox->find()) {
-            while ($inbox->fetch()) {
-                $ids[] = $inbox->notice_id;
-            }
-        }
-
-        return $ids;
-    }
 
     function allowedNickname($nickname)
     {
diff --git a/lib/cachingnoticestream.php b/lib/cachingnoticestream.php
new file mode 100644 (file)
index 0000000..a1b3b3f
--- /dev/null
@@ -0,0 +1,129 @@
+<?php
+/**
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2011, StatusNet, Inc.
+ *
+ * A stream of notices
+ * 
+ * PHP version 5
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category  Stream
+ * @package   StatusNet
+ * @author    Evan Prodromou <evan@status.net>
+ * @copyright 2011 StatusNet, Inc.
+ * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
+ * @link      http://status.net/
+ */
+
+if (!defined('STATUSNET')) {
+    // This check helps protect against security problems;
+    // your code file can't be executed directly from the web.
+    exit(1);
+}
+
+/**
+ * Class for notice streams
+ *
+ * @category  Stream
+ * @package   StatusNet
+ * @author    Evan Prodromou <evan@status.net>
+ * @copyright 2011 StatusNet, Inc.
+ * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
+ * @link      http://status.net/
+ */
+
+class CachingNoticeStream extends NoticeStream
+{
+    const CACHE_WINDOW = 200;
+
+    public $stream   = null;
+    public $cachekey = null;
+
+    function __construct($stream, $cachekey)
+    {
+        $this->stream   = $stream;
+        $this->cachekey = $cachekey;
+    }
+
+    function getNoticeIds($offset=0, $limit=20, $sinceId=0, $maxId=0)
+    {
+        $cache = Cache::instance();
+
+        // We cache self::CACHE_WINDOW elements at the tip of the stream.
+        // If the cache won't be hit, just generate directly.
+
+        if (empty($cache) ||
+            $sinceId != 0 || $maxId != 0 ||
+            is_null($limit) ||
+            ($offset + $limit) > self::CACHE_WINDOW) {
+            return $this->stream->getNoticeIds($offset, $limit, $sinceId, $maxId);
+        }
+
+        // Check the cache to see if we have the stream.
+
+        $idkey = Cache::key($this->cachekey);
+
+        $idstr = $cache->get($idkey);
+
+        if ($idstr !== false) {
+            // Cache hit! Woohoo!
+            $window = explode(',', $idstr);
+            $ids = array_slice($window, $offset, $limit);
+            return $ids;
+        }
+
+        // Check the cache to see if we have a "last-known-good" version.
+        // The actual cache gets blown away when new notices are added, but
+        // the "last" value holds a lot of info. We might need to only generate
+        // a few at the "tip", which can bound our queries and save lots
+        // of time.
+
+        $laststr = $cache->get($idkey.';last');
+
+        if ($laststr !== false) {
+            $window = explode(',', $laststr);
+            $last_id = $window[0];
+            $new_ids = $this->stream->getNoticeIds(0, self::CACHE_WINDOW, $last_id, 0);
+
+            $new_window = array_merge($new_ids, $window);
+
+            $new_windowstr = implode(',', $new_window);
+
+            $result = $cache->set($idkey, $new_windowstr);
+            $result = $cache->set($idkey . ';last', $new_windowstr);
+
+            $ids = array_slice($new_window, $offset, $limit);
+
+            return $ids;
+        }
+
+        // No cache hits :( Generate directly and stick the results
+        // into the cache. Note we generate the full cache window.
+
+        $window = $this->stream->getNoticeIds(0, self::CACHE_WINDOW, 0, 0);
+
+        $windowstr = implode(',', $window);
+
+        $result = $cache->set($idkey, $windowstr);
+        $result = $cache->set($idkey . ';last', $windowstr);
+
+        // Return just the slice that was requested
+
+        $ids = array_slice($window, $offset, $limit);
+
+        return $ids;
+    }
+}
diff --git a/lib/conversationnoticestream.php b/lib/conversationnoticestream.php
new file mode 100644 (file)
index 0000000..b26e898
--- /dev/null
@@ -0,0 +1,52 @@
+<?php
+
+class ConversationNoticeStream extends CachingNoticeStream
+{
+    function __construct($id)
+    {
+        parent::__construct(new RawConversationNoticeStream($id),
+                            'notice:conversation_ids:'.$id);
+    }
+}
+
+class RawConversationNoticeStream extends NoticeStream
+{
+    protected $id;
+
+    function __construct($id)
+    {
+        $this->id = $id;
+    }
+
+    function getNoticeIds($offset=0, $limit=20, $since_id=0, $max_id=0)
+    {
+        $notice = new Notice();
+
+        $notice->selectAdd(); // clears it
+        $notice->selectAdd('id');
+
+        $notice->conversation = $this->id;
+
+        $notice->orderBy('created DESC, id DESC');
+
+        if (!is_null($offset)) {
+            $notice->limit($offset, $limit);
+        }
+
+        Notice::addWhereSinceId($notice, $since_id);
+        Notice::addWhereMaxId($notice, $max_id);
+
+        $ids = array();
+
+        if ($notice->find()) {
+            while ($notice->fetch()) {
+                $ids[] = $notice->id;
+            }
+        }
+
+        $notice->free();
+        $notice = NULL;
+
+        return $ids;
+    }
+}
\ No newline at end of file
diff --git a/lib/favenoticestream.php b/lib/favenoticestream.php
new file mode 100644 (file)
index 0000000..5aaad5c
--- /dev/null
@@ -0,0 +1,86 @@
+<?php
+
+class FaveNoticeStream extends CachingNoticeStream
+{
+    function __construct($user_id, $own)
+    {
+        $stream = new RawFaveNoticeStream($user_id, $own);
+        if ($own) {
+            $key = 'fave:ids_by_user_own:'.$user_id;
+        } else {
+            $key = 'fave:ids_by_user:'.$user_id;
+        }
+        parent::__construct($stream, $key);
+    }
+}
+
+class RawFaveNoticeStream extends NoticeStream
+{
+    protected $user_id;
+    protected $own;
+
+    function __construct($user_id, $own)
+    {
+        $this->user_id = $user_id;
+        $this->own     = $own;
+    }
+
+    /**
+     * Note that the sorting for this is by order of *fave* not order of *notice*.
+     *
+     * @fixme add since_id, max_id support?
+     *
+     * @param <type> $user_id
+     * @param <type> $own
+     * @param <type> $offset
+     * @param <type> $limit
+     * @param <type> $since_id
+     * @param <type> $max_id
+     * @return <type>
+     */
+    function getNoticeIds($offset, $limit, $since_id, $max_id)
+    {
+        $fav = new Fave();
+        $qry = null;
+
+        if ($this->own) {
+            $qry  = 'SELECT fave.* FROM fave ';
+            $qry .= 'WHERE fave.user_id = ' . $this->user_id . ' ';
+        } else {
+             $qry =  'SELECT fave.* FROM fave ';
+             $qry .= 'INNER JOIN notice ON fave.notice_id = notice.id ';
+             $qry .= 'WHERE fave.user_id = ' . $this->user_id . ' ';
+             $qry .= 'AND notice.is_local != ' . Notice::GATEWAY . ' ';
+        }
+
+        if ($since_id != 0) {
+            $qry .= 'AND notice_id > ' . $since_id . ' ';
+        }
+
+        if ($max_id != 0) {
+            $qry .= 'AND notice_id <= ' . $max_id . ' ';
+        }
+
+        // NOTE: we sort by fave time, not by notice time!
+
+        $qry .= 'ORDER BY modified DESC ';
+
+        if (!is_null($offset)) {
+            $qry .= "LIMIT $limit OFFSET $offset";
+        }
+
+        $fav->query($qry);
+
+        $ids = array();
+
+        while ($fav->fetch()) {
+            $ids[] = $fav->notice_id;
+        }
+
+        $fav->free();
+        unset($fav);
+
+        return $ids;
+    }
+}
+
diff --git a/lib/filenoticestream.php b/lib/filenoticestream.php
new file mode 100644 (file)
index 0000000..fddc5d3
--- /dev/null
@@ -0,0 +1,60 @@
+<?php
+
+class FileNoticeStream extends CachingNoticeStream
+{
+    function __construct($file)
+    {
+        parent::__construct(new RawFileNoticeStream($file),
+                            'file:notice-ids:'.$this->url);
+    }
+}
+
+class RawFileNoticeStream extends NoticeStream
+{
+    protected $file = null;
+
+    function __construct($file)
+    {
+        $this->file = $file;
+        parent::__construct();
+    }
+
+    /**
+     * Stream of notices linking to this URL
+     *
+     * @param integer $offset   Offset to show; default is 0
+     * @param integer $limit    Limit of notices to show
+     * @param integer $since_id Since this notice
+     * @param integer $max_id   Before this notice
+     *
+     * @return array ids of notices that link to this file
+     */
+    function getNoticeIds($offset, $limit, $since_id, $max_id)
+    {
+        $f2p = new File_to_post();
+
+        $f2p->selectAdd();
+        $f2p->selectAdd('post_id');
+
+        $f2p->file_id = $this->file->id;
+
+        Notice::addWhereSinceId($f2p, $since_id, 'post_id', 'modified');
+        Notice::addWhereMaxId($f2p, $max_id, 'post_id', 'modified');
+
+        $f2p->orderBy('modified DESC, post_id DESC');
+
+        if (!is_null($offset)) {
+            $f2p->limit($offset, $limit);
+        }
+
+        $ids = array();
+
+        if ($f2p->find()) {
+            while ($f2p->fetch()) {
+                $ids[] = $f2p->post_id;
+            }
+        }
+
+        return $ids;
+    }
+}
diff --git a/lib/groupnoticestream.php b/lib/groupnoticestream.php
new file mode 100644 (file)
index 0000000..a6aa2c3
--- /dev/null
@@ -0,0 +1,49 @@
+<?
+
+class GroupNoticeStream extends CachingNoticeStream
+{
+    function __construct($group)
+    {
+        parent::__construct(new RawGroupNoticeStream($group),
+                            'user_group:notice_ids:' . $group->id);
+    }
+}
+
+class RawGroupNoticeStream extends NoticeStream
+{
+    protected $group;
+
+    function __construct($group)
+    {
+        $this->group = $group;
+    }
+
+    function getNoticeIds($offset, $limit, $since_id, $max_id)
+    {
+        $inbox = new Group_inbox();
+
+        $inbox->group_id = $this->group->id;
+
+        $inbox->selectAdd();
+        $inbox->selectAdd('notice_id');
+
+        Notice::addWhereSinceId($inbox, $since_id, 'notice_id');
+        Notice::addWhereMaxId($inbox, $max_id, 'notice_id');
+
+        $inbox->orderBy('created DESC, notice_id DESC');
+
+        if (!is_null($offset)) {
+            $inbox->limit($offset, $limit);
+        }
+
+        $ids = array();
+
+        if ($inbox->find()) {
+            while ($inbox->fetch()) {
+                $ids[] = $inbox->notice_id;
+            }
+        }
+
+        return $ids;
+    }
+}
index 2ceef17d74e5e633b76cedef53ab8076571ea7de..b60fc236f706e60e34ffa8d57bf4999ea61f8ceb 100644 (file)
@@ -44,20 +44,9 @@ if (!defined('STATUSNET')) {
  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
  * @link      http://status.net/
  */
-class NoticeStream
+abstract class NoticeStream
 {
-    const CACHE_WINDOW = 200;
-
-    public $generator = null;
-    public $args      = null;
-    public $cachekey  = null;
-
-    function __construct($generator, $args, $cachekey)
-    {
-        $this->generator = $generator;
-        $this->args      = $args;
-        $this->cachekey  = $cachekey;
-    }
+    abstract function getNoticeIds($offset, $limit, $sinceId, $maxId);
 
     function getNotices($offset=0, $limit=20, $sinceId=0, $maxId=0)
     {
@@ -68,75 +57,6 @@ class NoticeStream
         return $notices;
     }
 
-    function getNoticeIds($offset=0, $limit=20, $sinceId=0, $maxId=0)
-    {
-        $cache = Cache::instance();
-
-        // We cache self::CACHE_WINDOW elements at the tip of the stream.
-        // If the cache won't be hit, just generate directly.
-
-        if (empty($cache) ||
-            $sinceId != 0 || $maxId != 0 ||
-            is_null($limit) ||
-            ($offset + $limit) > self::CACHE_WINDOW) {
-            return $this->generate($offset, $limit, $sinceId, $maxId);
-        }
-
-        // Check the cache to see if we have the stream.
-
-        $idkey = Cache::key($this->cachekey);
-
-        $idstr = $cache->get($idkey);
-
-        if ($idstr !== false) {
-            // Cache hit! Woohoo!
-            $window = explode(',', $idstr);
-            $ids = array_slice($window, $offset, $limit);
-            return $ids;
-        }
-
-        // Check the cache to see if we have a "last-known-good" version.
-        // The actual cache gets blown away when new notices are added, but
-        // the "last" value holds a lot of info. We might need to only generate
-        // a few at the "tip", which can bound our queries and save lots
-        // of time.
-
-        $laststr = $cache->get($idkey.';last');
-
-        if ($laststr !== false) {
-            $window = explode(',', $laststr);
-            $last_id = $window[0];
-            $new_ids = $this->generate(0, self::CACHE_WINDOW, $last_id, 0);
-
-            $new_window = array_merge($new_ids, $window);
-
-            $new_windowstr = implode(',', $new_window);
-
-            $result = $cache->set($idkey, $new_windowstr);
-            $result = $cache->set($idkey . ';last', $new_windowstr);
-
-            $ids = array_slice($new_window, $offset, $limit);
-
-            return $ids;
-        }
-
-        // No cache hits :( Generate directly and stick the results
-        // into the cache. Note we generate the full cache window.
-
-        $window = $this->generate(0, self::CACHE_WINDOW, 0, 0);
-
-        $windowstr = implode(',', $window);
-
-        $result = $cache->set($idkey, $windowstr);
-        $result = $cache->set($idkey . ';last', $windowstr);
-
-        // Return just the slice that was requested
-
-        $ids = array_slice($window, $offset, $limit);
-
-        return $ids;
-    }
-
     static function getStreamByIds($ids)
     {
         $cache = Cache::instance();
@@ -177,14 +97,4 @@ class NoticeStream
             return new ArrayWrapper($wrapped);
         }
     }
-
-    function generate($offset, $limit, $sinceId, $maxId)
-    {
-        $args = array_merge($this->args, array($offset,
-                                               $limit,
-                                               $sinceId,
-                                               $maxId));
-
-        return call_user_func_array($this->generator, $args);
-    }
 }
diff --git a/lib/profilenoticestream.php b/lib/profilenoticestream.php
new file mode 100644 (file)
index 0000000..9324bfb
--- /dev/null
@@ -0,0 +1,105 @@
+<?php
+/**
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2011, StatusNet, Inc.
+ *
+ * Stream of notices by a profile
+ * 
+ * PHP version 5
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category  Stream
+ * @package   StatusNet
+ * @author    Evan Prodromou <evan@status.net>
+ * @copyright 2011 StatusNet, Inc.
+ * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
+ * @link      http://status.net/
+ */
+
+if (!defined('STATUSNET')) {
+    // This check helps protect against security problems;
+    // your code file can't be executed directly from the web.
+    exit(1);
+}
+
+/**
+ * Stream of notices by a profile
+ *
+ * @category  General
+ * @package   StatusNet
+ * @author    Evan Prodromou <evan@status.net>
+ * @copyright 2011 StatusNet, Inc.
+ * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
+ * @link      http://status.net/
+ */
+
+class ProfileNoticeStream extends CachingNoticeStream
+{
+    function __construct($profile)
+    {
+        parent::__construct(new RawProfileNoticeStream($profile),
+                            'profile:notice_ids:' . $profile->id);
+    }
+}
+
+/**
+ * Raw stream of notices by a profile
+ *
+ * @category  General
+ * @package   StatusNet
+ * @author    Evan Prodromou <evan@status.net>
+ * @copyright 2011 StatusNet, Inc.
+ * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
+ * @link      http://status.net/
+ */
+
+class RawProfileNoticeStream extends NoticeStream
+{
+    protected $profile;
+
+    function __construct($profile)
+    {
+        $this->profile = $profile;
+    }
+
+    function getNoticeIds($offset, $limit, $since_id, $max_id)
+    {
+        $notice = new Notice();
+
+        $notice->profile_id = $this->profile->id;
+
+        $notice->selectAdd();
+        $notice->selectAdd('id');
+
+        Notice::addWhereSinceId($notice, $since_id);
+        Notice::addWhereMaxId($notice, $max_id);
+
+        $notice->orderBy('created DESC, id DESC');
+
+        if (!is_null($offset)) {
+            $notice->limit($offset, $limit);
+        }
+
+        $notice->find();
+
+        $ids = array();
+
+        while ($notice->fetch()) {
+            $ids[] = $notice->id;
+        }
+
+        return $ids;
+    }
+}
diff --git a/lib/publicnoticestream.php b/lib/publicnoticestream.php
new file mode 100644 (file)
index 0000000..0162375
--- /dev/null
@@ -0,0 +1,50 @@
+<?php
+
+class PublicNoticeStream extends CachingNoticeStream
+{
+    function __construct()
+    {
+        parent::__construct(new RawPublicNoticeStream(), 'public');
+    }
+}
+
+class RawPublicNoticeStream extends NoticeStream
+{
+    function getNoticeIds($offset=0, $limit=20, $since_id=0, $max_id=0)
+    {
+        $notice = new Notice();
+
+        $notice->selectAdd(); // clears it
+        $notice->selectAdd('id');
+
+        $notice->orderBy('created DESC, id DESC');
+
+        if (!is_null($offset)) {
+            $notice->limit($offset, $limit);
+        }
+
+        if (common_config('public', 'localonly')) {
+            $notice->whereAdd('is_local = ' . Notice::LOCAL_PUBLIC);
+        } else {
+            // -1 == blacklisted, -2 == gateway (i.e. Twitter)
+            $notice->whereAdd('is_local !='. Notice::LOCAL_NONPUBLIC);
+            $notice->whereAdd('is_local !='. Notice::GATEWAY);
+        }
+
+        Notice::addWhereSinceId($notice, $since_id);
+        Notice::addWhereMaxId($notice, $max_id);
+
+        $ids = array();
+
+        if ($notice->find()) {
+            while ($notice->fetch()) {
+                $ids[] = $notice->id;
+            }
+        }
+
+        $notice->free();
+        $notice = NULL;
+
+        return $ids;
+    }
+}
\ No newline at end of file
diff --git a/lib/repeatedbymenoticestream.php b/lib/repeatedbymenoticestream.php
new file mode 100644 (file)
index 0000000..2c4c00e
--- /dev/null
@@ -0,0 +1,53 @@
+<?php
+
+class RepeatedByMeNoticeStream extends CachingNoticeStream
+{
+    function __construct($user)
+    {
+        parent::__construct(new RawRepeatedByMeNoticeStream($user),
+                            'user:repeated_by_me:'.$user->id);
+    }
+}
+
+class RawRepeatedByMeNoticeStream extends NoticeStream
+{
+    protected $user;
+
+    function __construct($user)
+    {
+        $this->user = $user;
+    }
+
+    function getNoticeIds($offset, $limit, $since_id, $max_id)
+    {
+        $notice = new Notice();
+
+        $notice->selectAdd(); // clears it
+        $notice->selectAdd('id');
+
+        $notice->profile_id = $this->user->id;
+        $notice->whereAdd('repeat_of IS NOT NULL');
+
+        $notice->orderBy('created DESC, id DESC');
+
+        if (!is_null($offset)) {
+            $notice->limit($offset, $limit);
+        }
+
+        Notice::addWhereSinceId($notice, $since_id);
+        Notice::addWhereMaxId($notice, $max_id);
+
+        $ids = array();
+
+        if ($notice->find()) {
+            while ($notice->fetch()) {
+                $ids[] = $notice->id;
+            }
+        }
+
+        $notice->free();
+        $notice = NULL;
+
+        return $ids;
+    }
+}
\ No newline at end of file
diff --git a/lib/repeatsofmenoticestream.php b/lib/repeatsofmenoticestream.php
new file mode 100644 (file)
index 0000000..1441908
--- /dev/null
@@ -0,0 +1,59 @@
+<?php
+
+class RepeatsOfMeNoticeStream extends CachingNoticeStream
+{
+    function __construct($user)
+    {
+        parent::__construct(new RawRepeatsOfMeNoticeStream($user),
+                            'user:repeats_of_me:'.$user->id);
+    }
+}
+
+class RawRepeatsOfMeNoticeStream extends NoticeStream
+{
+    protected $user;
+
+    function __construct($user)
+    {
+        $this->user = $user;
+    }
+
+    function getNoticeIds($offset, $limit, $since_id, $max_id)
+    {
+        $qry =
+          'SELECT DISTINCT original.id AS id ' .
+          'FROM notice original JOIN notice rept ON original.id = rept.repeat_of ' .
+          'WHERE original.profile_id = ' . $this->user->id . ' ';
+
+        $since = Notice::whereSinceId($since_id, 'original.id', 'original.created');
+        if ($since) {
+            $qry .= "AND ($since) ";
+        }
+
+        $max = Notice::whereMaxId($max_id, 'original.id', 'original.created');
+        if ($max) {
+            $qry .= "AND ($max) ";
+        }
+
+        $qry .= 'ORDER BY original.created, original.id DESC ';
+
+        if (!is_null($offset)) {
+            $qry .= "LIMIT $limit OFFSET $offset";
+        }
+
+        $ids = array();
+
+        $notice = new Notice();
+
+        $notice->query($qry);
+
+        while ($notice->fetch()) {
+            $ids[] = $notice->id;
+        }
+
+        $notice->free();
+        $notice = NULL;
+
+        return $ids;
+    }
+}
diff --git a/lib/replynoticestream.php b/lib/replynoticestream.php
new file mode 100644 (file)
index 0000000..f358afc
--- /dev/null
@@ -0,0 +1,45 @@
+<?php
+
+class ReplyNoticeStream extends CachingNoticeStream
+{
+    function __construct($userId)
+    {
+        parent::__construct(new RawReplyNoticeStream($userId),
+                            'reply:stream:' . $userId);
+    }
+}
+
+class RawReplyNoticeStream extends NoticeStream
+{
+    protected $userId;
+
+    function __construct($userId)
+    {
+        $this->userId = $userId;
+    }
+
+    function getNoticeIds($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
+    {
+        $reply = new Reply();
+        $reply->profile_id = $this->userId;
+
+        Notice::addWhereSinceId($reply, $since_id, 'notice_id', 'modified');
+        Notice::addWhereMaxId($reply, $max_id, 'notice_id', 'modified');
+
+        $reply->orderBy('modified DESC, notice_id DESC');
+
+        if (!is_null($offset)) {
+            $reply->limit($offset, $limit);
+        }
+
+        $ids = array();
+
+        if ($reply->find()) {
+            while ($reply->fetch()) {
+                $ids[] = $reply->notice_id;
+            }
+        }
+
+        return $ids;
+    }
+}
\ No newline at end of file
diff --git a/lib/taggedprofilenoticestream.php b/lib/taggedprofilenoticestream.php
new file mode 100644 (file)
index 0000000..d171187
--- /dev/null
@@ -0,0 +1,61 @@
+<?
+
+class TaggedProfileNoticeStream extends CachingNoticeStream
+{
+    function __construct($profile, $tag)
+    {
+        parent::__construct(new RawTaggedProfileNoticeStream($profile, $tag),
+                            'profile:notice_ids_tagged:'.$profile->id.':'.Cache::keyize($tag));
+    }
+}
+
+class RawTaggedProfileNoticeStream extends NoticeStream
+{
+    protected $profile;
+    protected $tag;
+
+    function __construct($profile, $tag)
+    {
+        $this->profile = $profile;
+        $this->tag     = $tag;
+    }
+
+    function getNoticeIds($offset, $limit, $since_id, $max_id)
+    {
+        // XXX It would be nice to do this without a join
+        // (necessary to do it efficiently on accounts with long history)
+
+        $notice = new Notice();
+
+        $query =
+          "select id from notice join notice_tag on id=notice_id where tag='".
+          $notice->escape($this->tag) .
+          "' and profile_id=" . intval($this->profile->id);
+
+        $since = Notice::whereSinceId($since_id, 'id', 'notice.created');
+        if ($since) {
+            $query .= " and ($since)";
+        }
+
+        $max = Notice::whereMaxId($max_id, 'id', 'notice.created');
+        if ($max) {
+            $query .= " and ($max)";
+        }
+
+        $query .= ' order by notice.created DESC, id DESC';
+
+        if (!is_null($offset)) {
+            $query .= " LIMIT " . intval($limit) . " OFFSET " . intval($offset);
+        }
+
+        $notice->query($query);
+
+        $ids = array();
+
+        while ($notice->fetch()) {
+            $ids[] = $notice->id;
+        }
+
+        return $ids;
+    }
+}
diff --git a/lib/tagnoticestream.php b/lib/tagnoticestream.php
new file mode 100644 (file)
index 0000000..0e28774
--- /dev/null
@@ -0,0 +1,49 @@
+<?php
+
+class TagNoticeStream extends CachingNoticeStream
+{
+    function __construct($tag)
+    {
+        parent::__construct(new RawTagNoticeStream($tag),
+                            'notice_tag:notice_ids:' . Cache::keyize($tag));
+    }
+}
+
+class RawTagNoticeStream extends NoticeStream
+{
+    protected $tag;
+
+    function __construct($tag)
+    {
+        $this->tag = $tag;
+    }
+
+    function getNoticeIds($offset, $limit, $since_id, $max_id)
+    {
+        $nt = new Notice_tag();
+
+        $nt->tag = $this->tag;
+
+        $nt->selectAdd();
+        $nt->selectAdd('notice_id');
+
+        Notice::addWhereSinceId($nt, $since_id, 'notice_id');
+        Notice::addWhereMaxId($nt, $max_id, 'notice_id');
+
+        $nt->orderBy('created DESC, notice_id DESC');
+
+        if (!is_null($offset)) {
+            $nt->limit($offset, $limit);
+        }
+
+        $ids = array();
+
+        if ($nt->find()) {
+            while ($nt->fetch()) {
+                $ids[] = $nt->notice_id;
+            }
+        }
+
+        return $ids;
+    }
+}