From 0b35ce7c370bbb6cb9d55bb2a4256f58cb1158f1 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 23 Mar 2011 11:29:55 -0400 Subject: [PATCH] New NoticeStream class to reify streams of notices We've been muddling through with 6- or 8-argument functions for managing streams. I'd like to start thinking of streams as their own thing, and give them some more value. So, the new NoticeStream class takes over the Notice::stream() function and Notice::getStreamByIds(). There's probably some fine-tuning to do on the object interface. --- classes/Fave.php | 12 +-- classes/File.php | 9 +- classes/Notice.php | 114 ++----------------------- classes/Notice_tag.php | 15 ++-- classes/Profile.php | 20 ++--- classes/Reply.php | 10 +-- classes/User.php | 24 +++--- classes/User_group.php | 9 +- lib/noticestream.php | 189 +++++++++++++++++++++++++++++++++++++++++ 9 files changed, 243 insertions(+), 159 deletions(-) create mode 100644 lib/noticestream.php diff --git a/classes/Fave.php b/classes/Fave.php index efbceee6a8..a61f35d190 100644 --- a/classes/Fave.php +++ b/classes/Fave.php @@ -79,12 +79,12 @@ class Fave extends Memcached_DataObject function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $own=false, $since_id=0, $max_id=0) { - $ids = Notice::stream(array('Fave', '_streamDirect'), - array($user_id, $own), - ($own) ? 'fave:ids_by_user_own:'.$user_id : - 'fave:ids_by_user:'.$user_id, - $offset, $limit, $since_id, $max_id); - return $ids; + $stream = new NoticeStream(array('Fave', '_streamDirect'), + array($user_id, $own), + ($own) ? 'fave:ids_by_user_own:'.$user_id : + 'fave:ids_by_user:'.$user_id); + + return $stream->getNotices($offset, $limit, $since_id, $max_id); } /** diff --git a/classes/File.php b/classes/File.php index e9a0131c4e..681c33f9cd 100644 --- a/classes/File.php +++ b/classes/File.php @@ -449,12 +449,11 @@ class File extends Memcached_DataObject function stream($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) { - $ids = Notice::stream(array($this, '_streamDirect'), - array(), - 'file:notice-ids:'.$this->url, - $offset, $limit, $since_id, $max_id); + $stream = new NoticeStream(array($this, '_streamDirect'), + array(), + 'file:notice-ids:'.$this->url); - return Notice::getStreamByIds($ids); + return $stream->getNotices($offset, $limit, $since_id, $max_id); } /** diff --git a/classes/Notice.php b/classes/Notice.php index b228a49c7c..8200e4554f 100644 --- a/classes/Notice.php +++ b/classes/Notice.php @@ -629,54 +629,14 @@ class Notice extends Memcached_DataObject return $att; } - function getStreamByIds($ids) - { - $cache = Cache::instance(); - - if (!empty($cache)) { - $notices = array(); - foreach ($ids as $id) { - $n = Notice::staticGet('id', $id); - if (!empty($n)) { - $notices[] = $n; - } - } - return new ArrayWrapper($notices); - } else { - $notice = new Notice(); - if (empty($ids)) { - //if no IDs requested, just return the notice object - return $notice; - } - $notice->whereAdd('id in (' . implode(', ', $ids) . ')'); - - $notice->find(); - - $temp = array(); - - while ($notice->fetch()) { - $temp[$notice->id] = clone($notice); - } - - $wrapped = array(); - - foreach ($ids as $id) { - if (array_key_exists($id, $temp)) { - $wrapped[] = $temp[$id]; - } - } - - return new ArrayWrapper($wrapped); - } - } function publicStream($offset=0, $limit=20, $since_id=0, $max_id=0) { - $ids = Notice::stream(array('Notice', '_publicStreamDirect'), - array(), - 'public', - $offset, $limit, $since_id, $max_id); - return Notice::getStreamByIds($ids); + $stream = new NoticeStream(array('Notice', '_publicStreamDirect'), + array(), + 'public'); + + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _publicStreamDirect($offset=0, $limit=20, $since_id=0, $max_id=0) @@ -719,12 +679,11 @@ class Notice extends Memcached_DataObject function conversationStream($id, $offset=0, $limit=20, $since_id=0, $max_id=0) { - $ids = Notice::stream(array('Notice', '_conversationStreamDirect'), - array($id), - 'notice:conversation_ids:'.$id, - $offset, $limit, $since_id, $max_id); + $stream = new NoticeStream(array('Notice', '_conversationStreamDirect'), + array($id), + 'notice:conversation_ids:'.$id); - return Notice::getStreamByIds($ids); + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _conversationStreamDirect($id, $offset=0, $limit=20, $since_id=0, $max_id=0) @@ -1540,61 +1499,6 @@ class Notice extends Memcached_DataObject } } - function stream($fn, $args, $cachekey, $offset=0, $limit=20, $since_id=0, $max_id=0) - { - $cache = Cache::instance(); - - if (empty($cache) || - $since_id != 0 || $max_id != 0 || - is_null($limit) || - ($offset + $limit) > NOTICE_CACHE_WINDOW) { - return call_user_func_array($fn, array_merge($args, array($offset, $limit, $since_id, - $max_id))); - } - - $idkey = Cache::key($cachekey); - - $idstr = $cache->get($idkey); - - if ($idstr !== false) { - // Cache hit! Woohoo! - $window = explode(',', $idstr); - $ids = array_slice($window, $offset, $limit); - return $ids; - } - - $laststr = $cache->get($idkey.';last'); - - if ($laststr !== false) { - $window = explode(',', $laststr); - $last_id = $window[0]; - $new_ids = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW, - $last_id, 0, null))); - - $new_window = array_merge($new_ids, $window); - - $new_windowstr = implode(',', $new_window); - - $result = $cache->set($idkey, $new_windowstr); - $result = $cache->set($idkey . ';last', $new_windowstr); - - $ids = array_slice($new_window, $offset, $limit); - - return $ids; - } - - $window = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW, - 0, 0, null))); - - $windowstr = implode(',', $window); - - $result = $cache->set($idkey, $windowstr); - $result = $cache->set($idkey . ';last', $windowstr); - - $ids = array_slice($window, $offset, $limit); - - return $ids; - } /** * Determine which notice, if any, a new notice is in reply to. diff --git a/classes/Notice_tag.php b/classes/Notice_tag.php index 81d346c5d3..813242253d 100644 --- a/classes/Notice_tag.php +++ b/classes/Notice_tag.php @@ -36,14 +36,13 @@ class Notice_tag extends Memcached_DataObject /* the code above is auto generated do not remove the tag below */ ###END_AUTOCODE - static function getStream($tag, $offset=0, $limit=20) { - - $ids = Notice::stream(array('Notice_tag', '_streamDirect'), - array($tag), - 'notice_tag:notice_ids:' . Cache::keyize($tag), - $offset, $limit); - - return Notice::getStreamByIds($ids); + static function getStream($tag, $offset=0, $limit=20, $sinceId=0, $maxId=0) + { + $stream = new NoticeStream(array('Notice_tag', '_streamDirect'), + array($tag), + 'notice_tag:notice_ids:' . Cache::keyize($tag)); + + return $stream->getNotices($offset, $limit, $sinceId, $maxId); } function _streamDirect($tag, $offset, $limit, $since_id, $max_id) diff --git a/classes/Profile.php b/classes/Profile.php index d84d5da290..209e5ef84a 100644 --- a/classes/Profile.php +++ b/classes/Profile.php @@ -198,22 +198,20 @@ class Profile extends Memcached_DataObject function getTaggedNotices($tag, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) { - $ids = Notice::stream(array($this, '_streamTaggedDirect'), - array($tag), - 'profile:notice_ids_tagged:' . $this->id . ':' . $tag, - $offset, $limit, $since_id, $max_id); - return Notice::getStreamByIds($ids); + $stream = new NoticeStream(array($this, '_streamTaggedDirect'), + array($tag), + 'profile:notice_ids_tagged:'.$this->id.':'.$tag); + + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function getNotices($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) { - // XXX: I'm not sure this is going to be any faster. It probably isn't. - $ids = Notice::stream(array($this, '_streamDirect'), - array(), - 'profile:notice_ids:' . $this->id, - $offset, $limit, $since_id, $max_id); + $stream = new NoticeStream(array($this, '_streamDirect'), + array(), + 'profile:notice_ids:' . $this->id); - return Notice::getStreamByIds($ids); + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _streamTaggedDirect($tag, $offset, $limit, $since_id, $max_id) diff --git a/classes/Reply.php b/classes/Reply.php index 371c16cf48..d5341b9a05 100644 --- a/classes/Reply.php +++ b/classes/Reply.php @@ -38,11 +38,11 @@ class Reply extends Memcached_DataObject function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) { - $ids = Notice::stream(array('Reply', '_streamDirect'), - array($user_id), - 'reply:stream:' . $user_id, - $offset, $limit, $since_id, $max_id); - return $ids; + $stream = new NoticeStream(array('Reply', '_streamDirect'), + array($user_id), + 'reply:stream:' . $user_id); + + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _streamDirect($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) diff --git a/classes/User.php b/classes/User.php index 31b132d0f3..4bd7b039df 100644 --- a/classes/User.php +++ b/classes/User.php @@ -448,8 +448,7 @@ class User extends Memcached_DataObject function getReplies($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0) { - $ids = Reply::stream($this->id, $offset, $limit, $since_id, $before_id); - return Notice::getStreamByIds($ids); + return Reply::stream($this->id, $offset, $limit, $since_id, $before_id); } function getTaggedNotices($tag, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0) { @@ -465,8 +464,7 @@ class User extends Memcached_DataObject function favoriteNotices($own=false, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) { - $ids = Fave::stream($this->id, $offset, $limit, $own, $since_id, $max_id); - return Notice::getStreamByIds($ids); + return Fave::stream($this->id, $offset, $limit, $own, $since_id, $max_id); } function noticesWithFriends($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0) @@ -769,12 +767,11 @@ class User extends Memcached_DataObject function repeatedByMe($offset=0, $limit=20, $since_id=null, $max_id=null) { - $ids = Notice::stream(array($this, '_repeatedByMeDirect'), - array(), - 'user:repeated_by_me:'.$this->id, - $offset, $limit, $since_id, $max_id, null); + $stream = new NoticeStream(array($this, '_repeatedByMeDirect'), + array(), + 'user:repeated_by_me:'.$this->id); - return Notice::getStreamByIds($ids); + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _repeatedByMeDirect($offset, $limit, $since_id, $max_id) @@ -812,12 +809,11 @@ class User extends Memcached_DataObject function repeatsOfMe($offset=0, $limit=20, $since_id=null, $max_id=null) { - $ids = Notice::stream(array($this, '_repeatsOfMeDirect'), - array(), - 'user:repeats_of_me:'.$this->id, - $offset, $limit, $since_id, $max_id); + $stream = new NoticeStream(array($this, '_repeatsOfMeDirect'), + array(), + 'user:repeats_of_me:'.$this->id); - return Notice::getStreamByIds($ids); + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _repeatsOfMeDirect($offset, $limit, $since_id, $max_id) diff --git a/classes/User_group.php b/classes/User_group.php index 707acbd13c..4d6dcfab68 100644 --- a/classes/User_group.php +++ b/classes/User_group.php @@ -87,12 +87,11 @@ class User_group extends Memcached_DataObject function getNotices($offset, $limit, $since_id=null, $max_id=null) { - $ids = Notice::stream(array($this, '_streamDirect'), - array(), - 'user_group:notice_ids:' . $this->id, - $offset, $limit, $since_id, $max_id); + $stream = new NoticeStream(array($this, '_streamDirect'), + array(), + 'user_group:notice_ids:' . $this->id); - return Notice::getStreamByIds($ids); + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _streamDirect($offset, $limit, $since_id, $max_id) diff --git a/lib/noticestream.php b/lib/noticestream.php new file mode 100644 index 0000000000..2b6e10f7b9 --- /dev/null +++ b/lib/noticestream.php @@ -0,0 +1,189 @@ +. + * + * @category Stream + * @package StatusNet + * @author Evan Prodromou + * @copyright 2011 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0 + * @link http://status.net/ + */ + +if (!defined('STATUSNET')) { + // This check helps protect against security problems; + // your code file can't be executed directly from the web. + exit(1); +} + +/** + * Class for notice streams + * + * @category Stream + * @package StatusNet + * @author Evan Prodromou + * @copyright 2011 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0 + * @link http://status.net/ + */ + +class NoticeStream +{ + public $generator = null; + public $args = null; + public $cachekey = null; + + function __construct($generator, $args, $cachekey) + { + $this->generator = $generator; + $this->args = $args; + $this->cachekey = $cachekey; + } + + function getNotices($offset=0, $limit=20, $sinceId=0, $maxId=0) + { + $ids = $this->getNoticeIds($offset, $limit, $sinceId, $maxId); + + $notices = $this->getStreamByIds($ids); + + return $notices; + } + + function getNoticeIds($offset=0, $limit=20, $sinceId=0, $maxId=0) + { + $cache = Cache::instance(); + + // We cache NOTICE_CACHE_WINDOW elements at the tip of the stream. + // If the cache won't be hit, just generate directly. + + if (empty($cache) || + $sinceId != 0 || $maxId != 0 || + is_null($limit) || + ($offset + $limit) > NOTICE_CACHE_WINDOW) { + return $this->generate($offset, $limit, $sinceId, $maxId); + } + + // Check the cache to see if we have the stream. + + $idkey = Cache::key($this->cachekey); + + $idstr = $cache->get($idkey); + + if ($idstr !== false) { + // Cache hit! Woohoo! + $window = explode(',', $idstr); + $ids = array_slice($window, $offset, $limit); + return $ids; + } + + // Check the cache to see if we have a "last-known-good" version. + // The actual cache gets blown away when new notices are added, but + // the "last" value holds a lot of info. We might need to only generate + // a few at the "tip", which can bound our queries and save lots + // of time. + + $laststr = $cache->get($idkey.';last'); + + if ($laststr !== false) { + $window = explode(',', $laststr); + $last_id = $window[0]; + $new_ids = $this->generate(0, NOTICE_CACHE_WINDOW, $last_id, 0); + + $new_window = array_merge($new_ids, $window); + + $new_windowstr = implode(',', $new_window); + + $result = $cache->set($idkey, $new_windowstr); + $result = $cache->set($idkey . ';last', $new_windowstr); + + $ids = array_slice($new_window, $offset, $limit); + + return $ids; + } + + // No cache hits :( Generate directly and stick the results + // into the cache. Note we generate the full cache window. + + $window = $this->generate(0, NOTICE_CACHE_WINDOW, 0, 0); + + $windowstr = implode(',', $window); + + $result = $cache->set($idkey, $windowstr); + $result = $cache->set($idkey . ';last', $windowstr); + + // Return just the slice that was requested + + $ids = array_slice($window, $offset, $limit); + + return $ids; + } + + function getStreamByIds($ids) + { + $cache = Cache::instance(); + + if (!empty($cache)) { + $notices = array(); + foreach ($ids as $id) { + $n = Notice::staticGet('id', $id); + if (!empty($n)) { + $notices[] = $n; + } + } + return new ArrayWrapper($notices); + } else { + $notice = new Notice(); + if (empty($ids)) { + //if no IDs requested, just return the notice object + return $notice; + } + $notice->whereAdd('id in (' . implode(', ', $ids) . ')'); + + $notice->find(); + + $temp = array(); + + while ($notice->fetch()) { + $temp[$notice->id] = clone($notice); + } + + $wrapped = array(); + + foreach ($ids as $id) { + if (array_key_exists($id, $temp)) { + $wrapped[] = $temp[$id]; + } + } + + return new ArrayWrapper($wrapped); + } + } + + function generate($offset, $limit, $sinceId, $maxId) + { + $args = array_merge($this->args, array($offset, + $limit, + $sinceId, + $maxId)); + + return call_user_func_array($this->generator, $args); + } +} -- 2.39.5