]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
New NoticeStream class to reify streams of notices
authorEvan Prodromou <evan@status.net>
Wed, 23 Mar 2011 15:29:55 +0000 (11:29 -0400)
committerEvan Prodromou <evan@status.net>
Wed, 23 Mar 2011 15:29:55 +0000 (11:29 -0400)
We've been muddling through with 6- or 8-argument functions for managing streams. I'd
like to start thinking of streams as their own thing, and give them some more value.

So, the new NoticeStream class takes over the Notice::stream() function and Notice::getStreamByIds().

There's probably some fine-tuning to do on the object interface.

classes/Fave.php
classes/File.php
classes/Notice.php
classes/Notice_tag.php
classes/Profile.php
classes/Reply.php
classes/User.php
classes/User_group.php
lib/noticestream.php [new file with mode: 0644]

index efbceee6a83ae8de1b69d8e49ff2095cacf97419..a61f35d19000f97e77a24044ea01de68ab0fa4f4 100644 (file)
@@ -79,12 +79,12 @@ class Fave extends Memcached_DataObject
 
     function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $own=false, $since_id=0, $max_id=0)
     {
-        $ids = Notice::stream(array('Fave', '_streamDirect'),
-                              array($user_id, $own),
-                              ($own) ? 'fave:ids_by_user_own:'.$user_id :
-                              'fave:ids_by_user:'.$user_id,
-                              $offset, $limit, $since_id, $max_id);
-        return $ids;
+        $stream = new NoticeStream(array('Fave', '_streamDirect'),
+                                   array($user_id, $own),
+                                   ($own) ? 'fave:ids_by_user_own:'.$user_id :
+                                   'fave:ids_by_user:'.$user_id);
+
+        return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
     /**
index e9a0131c4e4f6110713f0f572ea125d3890226b3..681c33f9cd4f788c962ed6379cfb8ed19d4b2d5c 100644 (file)
@@ -449,12 +449,11 @@ class File extends Memcached_DataObject
 
     function stream($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
     {
-        $ids = Notice::stream(array($this, '_streamDirect'),
-                              array(),
-                              'file:notice-ids:'.$this->url,
-                              $offset, $limit, $since_id, $max_id);
+        $stream = new NoticeStream(array($this, '_streamDirect'),
+                                   array(),
+                                   'file:notice-ids:'.$this->url);
 
-        return Notice::getStreamByIds($ids);
+        return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
     /**
index b228a49c7c7991a4215b5504b8e8686a2733ca0d..8200e4554ff1d64d38ecc792f661efc3b344a584 100644 (file)
@@ -629,54 +629,14 @@ class Notice extends Memcached_DataObject
         return $att;
     }
 
-    function getStreamByIds($ids)
-    {
-        $cache = Cache::instance();
-
-        if (!empty($cache)) {
-            $notices = array();
-            foreach ($ids as $id) {
-                $n = Notice::staticGet('id', $id);
-                if (!empty($n)) {
-                    $notices[] = $n;
-                }
-            }
-            return new ArrayWrapper($notices);
-        } else {
-            $notice = new Notice();
-            if (empty($ids)) {
-                //if no IDs requested, just return the notice object
-                return $notice;
-            }
-            $notice->whereAdd('id in (' . implode(', ', $ids) . ')');
-
-            $notice->find();
-
-            $temp = array();
-
-            while ($notice->fetch()) {
-                $temp[$notice->id] = clone($notice);
-            }
-
-            $wrapped = array();
-
-            foreach ($ids as $id) {
-                if (array_key_exists($id, $temp)) {
-                    $wrapped[] = $temp[$id];
-                }
-            }
-
-            return new ArrayWrapper($wrapped);
-        }
-    }
 
     function publicStream($offset=0, $limit=20, $since_id=0, $max_id=0)
     {
-        $ids = Notice::stream(array('Notice', '_publicStreamDirect'),
-                              array(),
-                              'public',
-                              $offset, $limit, $since_id, $max_id);
-        return Notice::getStreamByIds($ids);
+        $stream = new NoticeStream(array('Notice', '_publicStreamDirect'),
+                                   array(),
+                                   'public');
+
+        return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
     function _publicStreamDirect($offset=0, $limit=20, $since_id=0, $max_id=0)
@@ -719,12 +679,11 @@ class Notice extends Memcached_DataObject
 
     function conversationStream($id, $offset=0, $limit=20, $since_id=0, $max_id=0)
     {
-        $ids = Notice::stream(array('Notice', '_conversationStreamDirect'),
-                              array($id),
-                              'notice:conversation_ids:'.$id,
-                              $offset, $limit, $since_id, $max_id);
+        $stream = new NoticeStream(array('Notice', '_conversationStreamDirect'),
+                                   array($id),
+                                   'notice:conversation_ids:'.$id);
 
-        return Notice::getStreamByIds($ids);
+        return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
     function _conversationStreamDirect($id, $offset=0, $limit=20, $since_id=0, $max_id=0)
@@ -1540,61 +1499,6 @@ class Notice extends Memcached_DataObject
         }
     }
 
-    function stream($fn, $args, $cachekey, $offset=0, $limit=20, $since_id=0, $max_id=0)
-    {
-        $cache = Cache::instance();
-
-        if (empty($cache) ||
-            $since_id != 0 || $max_id != 0 ||
-            is_null($limit) ||
-            ($offset + $limit) > NOTICE_CACHE_WINDOW) {
-            return call_user_func_array($fn, array_merge($args, array($offset, $limit, $since_id,
-                                                                      $max_id)));
-        }
-
-        $idkey = Cache::key($cachekey);
-
-        $idstr = $cache->get($idkey);
-
-        if ($idstr !== false) {
-            // Cache hit! Woohoo!
-            $window = explode(',', $idstr);
-            $ids = array_slice($window, $offset, $limit);
-            return $ids;
-        }
-
-        $laststr = $cache->get($idkey.';last');
-
-        if ($laststr !== false) {
-            $window = explode(',', $laststr);
-            $last_id = $window[0];
-            $new_ids = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW,
-                                                                          $last_id, 0, null)));
-
-            $new_window = array_merge($new_ids, $window);
-
-            $new_windowstr = implode(',', $new_window);
-
-            $result = $cache->set($idkey, $new_windowstr);
-            $result = $cache->set($idkey . ';last', $new_windowstr);
-
-            $ids = array_slice($new_window, $offset, $limit);
-
-            return $ids;
-        }
-
-        $window = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW,
-                                                                     0, 0, null)));
-
-        $windowstr = implode(',', $window);
-
-        $result = $cache->set($idkey, $windowstr);
-        $result = $cache->set($idkey . ';last', $windowstr);
-
-        $ids = array_slice($window, $offset, $limit);
-
-        return $ids;
-    }
 
     /**
      * Determine which notice, if any, a new notice is in reply to.
index 81d346c5d3b2a5da148186492ff0f186175ff29c..813242253dbd2d7dc2706c5188f2e293b8defc5c 100644 (file)
@@ -36,14 +36,13 @@ class Notice_tag extends Memcached_DataObject
     /* the code above is auto generated do not remove the tag below */
     ###END_AUTOCODE
 
-    static function getStream($tag, $offset=0, $limit=20) {
-
-        $ids = Notice::stream(array('Notice_tag', '_streamDirect'),
-                              array($tag),
-                              'notice_tag:notice_ids:' . Cache::keyize($tag),
-                              $offset, $limit);
-
-        return Notice::getStreamByIds($ids);
+    static function getStream($tag, $offset=0, $limit=20, $sinceId=0, $maxId=0)
+    {
+        $stream = new NoticeStream(array('Notice_tag', '_streamDirect'),
+                                   array($tag),
+                                   'notice_tag:notice_ids:' . Cache::keyize($tag));
+        
+        return $stream->getNotices($offset, $limit, $sinceId, $maxId);
     }
 
     function _streamDirect($tag, $offset, $limit, $since_id, $max_id)
index d84d5da290dee37111a7c15fefb250079006b2dd..209e5ef84ae6700a14bbfdb3bdb735719c7f39d8 100644 (file)
@@ -198,22 +198,20 @@ class Profile extends Memcached_DataObject
 
     function getTaggedNotices($tag, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
     {
-        $ids = Notice::stream(array($this, '_streamTaggedDirect'),
-                              array($tag),
-                              'profile:notice_ids_tagged:' . $this->id . ':' . $tag,
-                              $offset, $limit, $since_id, $max_id);
-        return Notice::getStreamByIds($ids);
+        $stream = new NoticeStream(array($this, '_streamTaggedDirect'),
+                                   array($tag),
+                                   'profile:notice_ids_tagged:'.$this->id.':'.$tag);
+
+        return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
     function getNotices($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
     {
-        // XXX: I'm not sure this is going to be any faster. It probably isn't.
-        $ids = Notice::stream(array($this, '_streamDirect'),
-                              array(),
-                              'profile:notice_ids:' . $this->id,
-                              $offset, $limit, $since_id, $max_id);
+        $stream = new NoticeStream(array($this, '_streamDirect'),
+                                   array(),
+                                   'profile:notice_ids:' . $this->id);
 
-        return Notice::getStreamByIds($ids);
+        return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
     function _streamTaggedDirect($tag, $offset, $limit, $since_id, $max_id)
index 371c16cf48388ff5d1cace8e2fb28e5e5914318c..d5341b9a05cdd49f18afe2b3f7d760ce7223b3c9 100644 (file)
@@ -38,11 +38,11 @@ class Reply extends Memcached_DataObject
 
     function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
     {
-        $ids = Notice::stream(array('Reply', '_streamDirect'),
-                              array($user_id),
-                              'reply:stream:' . $user_id,
-                              $offset, $limit, $since_id, $max_id);
-        return $ids;
+        $stream = new NoticeStream(array('Reply', '_streamDirect'),
+                                   array($user_id),
+                                   'reply:stream:' . $user_id);
+
+        return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
     function _streamDirect($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
index 31b132d0f33a7fed3b910b35694b71586f82c1db..4bd7b039df33df7a05732358762222b353ab10d4 100644 (file)
@@ -448,8 +448,7 @@ class User extends Memcached_DataObject
 
     function getReplies($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0)
     {
-        $ids = Reply::stream($this->id, $offset, $limit, $since_id, $before_id);
-        return Notice::getStreamByIds($ids);
+        return Reply::stream($this->id, $offset, $limit, $since_id, $before_id);
     }
 
     function getTaggedNotices($tag, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0) {
@@ -465,8 +464,7 @@ class User extends Memcached_DataObject
 
     function favoriteNotices($own=false, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
     {
-        $ids = Fave::stream($this->id, $offset, $limit, $own, $since_id, $max_id);
-        return Notice::getStreamByIds($ids);
+        return Fave::stream($this->id, $offset, $limit, $own, $since_id, $max_id);
     }
 
     function noticesWithFriends($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0)
@@ -769,12 +767,11 @@ class User extends Memcached_DataObject
 
     function repeatedByMe($offset=0, $limit=20, $since_id=null, $max_id=null)
     {
-        $ids = Notice::stream(array($this, '_repeatedByMeDirect'),
-                              array(),
-                              'user:repeated_by_me:'.$this->id,
-                              $offset, $limit, $since_id, $max_id, null);
+        $stream = new NoticeStream(array($this, '_repeatedByMeDirect'),
+                                   array(),
+                                   'user:repeated_by_me:'.$this->id);
 
-        return Notice::getStreamByIds($ids);
+        return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
     function _repeatedByMeDirect($offset, $limit, $since_id, $max_id)
@@ -812,12 +809,11 @@ class User extends Memcached_DataObject
 
     function repeatsOfMe($offset=0, $limit=20, $since_id=null, $max_id=null)
     {
-        $ids = Notice::stream(array($this, '_repeatsOfMeDirect'),
-                              array(),
-                              'user:repeats_of_me:'.$this->id,
-                              $offset, $limit, $since_id, $max_id);
+        $stream = new NoticeStream(array($this, '_repeatsOfMeDirect'),
+                                   array(),
+                                   'user:repeats_of_me:'.$this->id);
 
-        return Notice::getStreamByIds($ids);
+        return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
     function _repeatsOfMeDirect($offset, $limit, $since_id, $max_id)
index 707acbd13cca7f77cc286ade0a74e1de176c970b..4d6dcfab689d536834e7b79e7f3bc6ddb16f6e0f 100644 (file)
@@ -87,12 +87,11 @@ class User_group extends Memcached_DataObject
 
     function getNotices($offset, $limit, $since_id=null, $max_id=null)
     {
-        $ids = Notice::stream(array($this, '_streamDirect'),
-                              array(),
-                              'user_group:notice_ids:' . $this->id,
-                              $offset, $limit, $since_id, $max_id);
+        $stream = new NoticeStream(array($this, '_streamDirect'),
+                                   array(),
+                                   'user_group:notice_ids:' . $this->id);
 
-        return Notice::getStreamByIds($ids);
+        return $stream->getNotices($offset, $limit, $since_id, $max_id);
     }
 
     function _streamDirect($offset, $limit, $since_id, $max_id)
diff --git a/lib/noticestream.php b/lib/noticestream.php
new file mode 100644 (file)
index 0000000..2b6e10f
--- /dev/null
@@ -0,0 +1,189 @@
+<?php
+/**
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2011, StatusNet, Inc.
+ *
+ * A stream of notices
+ * 
+ * PHP version 5
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category  Stream
+ * @package   StatusNet
+ * @author    Evan Prodromou <evan@status.net>
+ * @copyright 2011 StatusNet, Inc.
+ * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
+ * @link      http://status.net/
+ */
+
+if (!defined('STATUSNET')) {
+    // This check helps protect against security problems;
+    // your code file can't be executed directly from the web.
+    exit(1);
+}
+
+/**
+ * Class for notice streams
+ *
+ * @category  Stream
+ * @package   StatusNet
+ * @author    Evan Prodromou <evan@status.net>
+ * @copyright 2011 StatusNet, Inc.
+ * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
+ * @link      http://status.net/
+ */
+
+class NoticeStream
+{
+    public $generator = null;
+    public $args      = null;
+    public $cachekey  = null;
+
+    function __construct($generator, $args, $cachekey)
+    {
+        $this->generator = $generator;
+        $this->args      = $args;
+        $this->cachekey  = $cachekey;
+    }
+
+    function getNotices($offset=0, $limit=20, $sinceId=0, $maxId=0)
+    {
+        $ids = $this->getNoticeIds($offset, $limit, $sinceId, $maxId);
+
+        $notices = $this->getStreamByIds($ids);
+
+        return $notices;
+    }
+
+    function getNoticeIds($offset=0, $limit=20, $sinceId=0, $maxId=0)
+    {
+        $cache = Cache::instance();
+
+        // We cache NOTICE_CACHE_WINDOW elements at the tip of the stream.
+        // If the cache won't be hit, just generate directly.
+
+        if (empty($cache) ||
+            $sinceId != 0 || $maxId != 0 ||
+            is_null($limit) ||
+            ($offset + $limit) > NOTICE_CACHE_WINDOW) {
+            return $this->generate($offset, $limit, $sinceId, $maxId);
+        }
+
+        // Check the cache to see if we have the stream.
+
+        $idkey = Cache::key($this->cachekey);
+
+        $idstr = $cache->get($idkey);
+
+        if ($idstr !== false) {
+            // Cache hit! Woohoo!
+            $window = explode(',', $idstr);
+            $ids = array_slice($window, $offset, $limit);
+            return $ids;
+        }
+
+        // Check the cache to see if we have a "last-known-good" version.
+        // The actual cache gets blown away when new notices are added, but
+        // the "last" value holds a lot of info. We might need to only generate
+        // a few at the "tip", which can bound our queries and save lots
+        // of time.
+
+        $laststr = $cache->get($idkey.';last');
+
+        if ($laststr !== false) {
+            $window = explode(',', $laststr);
+            $last_id = $window[0];
+            $new_ids = $this->generate(0, NOTICE_CACHE_WINDOW, $last_id, 0);
+
+            $new_window = array_merge($new_ids, $window);
+
+            $new_windowstr = implode(',', $new_window);
+
+            $result = $cache->set($idkey, $new_windowstr);
+            $result = $cache->set($idkey . ';last', $new_windowstr);
+
+            $ids = array_slice($new_window, $offset, $limit);
+
+            return $ids;
+        }
+
+        // No cache hits :( Generate directly and stick the results
+        // into the cache. Note we generate the full cache window.
+
+        $window = $this->generate(0, NOTICE_CACHE_WINDOW, 0, 0);
+
+        $windowstr = implode(',', $window);
+
+        $result = $cache->set($idkey, $windowstr);
+        $result = $cache->set($idkey . ';last', $windowstr);
+
+        // Return just the slice that was requested
+
+        $ids = array_slice($window, $offset, $limit);
+
+        return $ids;
+    }
+
+    function getStreamByIds($ids)
+    {
+        $cache = Cache::instance();
+
+        if (!empty($cache)) {
+            $notices = array();
+            foreach ($ids as $id) {
+                $n = Notice::staticGet('id', $id);
+                if (!empty($n)) {
+                    $notices[] = $n;
+                }
+            }
+            return new ArrayWrapper($notices);
+        } else {
+            $notice = new Notice();
+            if (empty($ids)) {
+                //if no IDs requested, just return the notice object
+                return $notice;
+            }
+            $notice->whereAdd('id in (' . implode(', ', $ids) . ')');
+
+            $notice->find();
+
+            $temp = array();
+
+            while ($notice->fetch()) {
+                $temp[$notice->id] = clone($notice);
+            }
+
+            $wrapped = array();
+
+            foreach ($ids as $id) {
+                if (array_key_exists($id, $temp)) {
+                    $wrapped[] = $temp[$id];
+                }
+            }
+
+            return new ArrayWrapper($wrapped);
+        }
+    }
+
+    function generate($offset, $limit, $sinceId, $maxId)
+    {
+        $args = array_merge($this->args, array($offset,
+                                               $limit,
+                                               $sinceId,
+                                               $maxId));
+
+        return call_user_func_array($this->generator, $args);
+    }
+}