From 3b304fc0efd0bb4d75b964fe5585703d113d9c99 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Mon, 4 Oct 2010 18:28:54 -0700 Subject: [PATCH] Initial stub code for pulling data from Twitter's User Streams and Site Streams interfaces. This should allow us to make a much more efficient background importer which can use a relatively small number of connections getting push data for either a single user or for many users with credentials on the site. --- plugins/TwitterBridge/jsonstreamreader.php | 224 ++++++++++++++++++ plugins/TwitterBridge/scripts/streamtest.php | 142 +++++++++++ plugins/TwitterBridge/twitterstreamreader.php | 161 +++++++++++++ 3 files changed, 527 insertions(+) create mode 100644 plugins/TwitterBridge/jsonstreamreader.php create mode 100644 plugins/TwitterBridge/scripts/streamtest.php create mode 100644 plugins/TwitterBridge/twitterstreamreader.php diff --git a/plugins/TwitterBridge/jsonstreamreader.php b/plugins/TwitterBridge/jsonstreamreader.php new file mode 100644 index 0000000000..ad8e2626ad --- /dev/null +++ b/plugins/TwitterBridge/jsonstreamreader.php @@ -0,0 +1,224 @@ +. + * + * @category Plugin + * @package StatusNet + * @author Brion Vibber + * @copyright 2010 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +class OAuthData +{ + public $consumer_key, $consumer_secret, $token, $token_secret; +} + +/** + * + */ +abstract class JsonStreamReader +{ + const CRLF = "\r\n"; + + public $id; + protected $socket = null; + protected $state = 'init'; // 'init', 'connecting', 'waiting', 'headers', 'active' + + public function __construct() + { + $this->id = get_class($this) . '.' . substr(md5(mt_rand()), 0, 8); + } + + /** + * Starts asynchronous connect operation... + * + * @param $url + */ + public function connect($url) + { + common_log(LOG_DEBUG, "$this->id opening connection to $url"); + + $scheme = parse_url($url, PHP_URL_SCHEME); + if ($scheme == 'http') { + $rawScheme = 'tcp'; + } else if ($scheme == 'https') { + $rawScheme = 'ssl'; + } else { + throw new ServerException('Invalid URL scheme for HTTP stream reader'); + } + + $host = parse_url($url, PHP_URL_HOST); + $port = parse_url($url, PHP_URL_PORT); + if (!$port) { + if ($scheme == 'https') { + $port = 443; + } else { + $port = 80; + } + } + + $path = parse_url($url, PHP_URL_PATH); + $query = parse_url($url, PHP_URL_QUERY); + if ($query) { + $path .= '?' . $query; + } + + $errno = $errstr = null; + $timeout = 5; + //$flags = STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT; + $flags = STREAM_CLIENT_CONNECT; + // @fixme add SSL params + $this->socket = stream_socket_client("$rawScheme://$host:$port", $errno, $errstr, $timeout, $flags); + + $this->send($this->httpOpen($host, $path)); + + stream_set_blocking($this->socket, false); + $this->state = 'waiting'; + } + + function send($buffer) + { + echo "Writing...\n"; + var_dump($buffer); + fwrite($this->socket, $buffer); + } + + function read() + { + echo "Reading...\n"; + $buffer = fread($this->socket, 65536); + var_dump($buffer); + return $buffer; + } + + protected function httpOpen($host, $path) + { + $lines = array( + "GET $path HTTP/1.1", + "Host: $host", + "User-Agent: StatusNet/" . STATUSNET_VERSION . " (TwitterBridgePlugin)", + "Connection: close", + "", + "" + ); + return implode(self::CRLF, $lines); + } + + /** + * Close the current connection, if open. + */ + public function close() + { + if ($this->isConnected()) { + common_log(LOG_DEBUG, "$this->id closing connection."); + fclose($this->socket); + $this->socket = null; + } + } + + /** + * Are we currently connected? + * + * @return boolean + */ + public function isConnected() + { + return $this->socket !== null; + } + + /** + * Send any sockets we're listening on to the IO manager + * to wait for input. + * + * @return array of resources + */ + public function getSockets() + { + if ($this->isConnected()) { + return array($this->socket); + } + return array(); + } + + /** + * Take a chunk of input over the horn and go go go! :D + * @param string $buffer + */ + function handleInput($socket) + { + if ($this->socket !== $socket) { + throw new Exception('Got input from unexpected socket!'); + } + + $buffer = $this->read(); + switch ($this->state) + { + case 'waiting': + $this->handleInputWaiting($buffer); + break; + case 'headers': + $this->handleInputHeaders($buffer); + break; + case 'active': + $this->handleInputActive($buffer); + break; + default: + throw new Exception('Invalid state in handleInput: ' . $this->state); + } + } + + function handleInputWaiting($buffer) + { + common_log(LOG_DEBUG, "$this->id Does this happen? " . $buffer); + $this->state = 'headers'; + $this->handleInputHeaders($buffer); + } + + function handleInputHeaders($buffer) + { + $lines = explode(self::CRLF, $buffer); + foreach ($lines as $line) { + if ($line == '') { + $this->state = 'active'; + common_log(LOG_DEBUG, "$this->id connection is active!"); + } else { + common_log(LOG_DEBUG, "$this->id read HTTP header: $line"); + $this->responseHeaders[] = $line; + } + } + } + + function handleInputActive($buffer) + { + // One JSON object on each line... + // Will we always deliver on packet boundaries? + $lines = explode("\n", $buffer); + foreach ($lines as $line) { + $data = json_decode($line, true); + if ($data) { + $this->handleJson($data); + } else { + common_log(LOG_ERR, "$this->id received bogus JSON data: " . $line); + } + } + } + + abstract function handleJson(array $data); +} diff --git a/plugins/TwitterBridge/scripts/streamtest.php b/plugins/TwitterBridge/scripts/streamtest.php new file mode 100644 index 0000000000..04d91ef439 --- /dev/null +++ b/plugins/TwitterBridge/scripts/streamtest.php @@ -0,0 +1,142 @@ +. + * + * @category Plugin + * @package StatusNet + * @author Brion Vibber + * @copyright 2010 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +define('INSTALLDIR', realpath(dirname(__FILE__) . '/../../..')); + +$shortoptions = 'n:'; +$longoptions = array('nick='); + +$helptext = << + +Attempts a User Stream connection to Twitter as the given user, dumping +data as it comes. + +ENDOFHELP; + +require_once INSTALLDIR.'/scripts/commandline.inc'; +require_once dirname(dirname(__FILE__)) . '/jsonstreamreader.php'; +require_once dirname(dirname(__FILE__)) . '/twitterstreamreader.php'; + +if (have_option('n')) { + $nickname = get_option_value('n'); +} else if (have_option('nick')) { + $nickname = get_option_value('nickname'); +} else { + show_help($helptext); + exit(0); +} + +/** + * + * @param User $user + * @return TwitterOAuthClient + */ +function twitterAuthForUser(User $user) +{ + $flink = Foreign_link::getByUserID($user->id, + TWITTER_SERVICE); + if (!$flink) { + throw new ServerException("No Twitter config for this user."); + } + + $token = TwitterOAuthClient::unpackToken($flink->credentials); + if (!$token) { + throw new ServerException("No Twitter OAuth credentials for this user."); + } + + return new TwitterOAuthClient($token->key, $token->secret); +} + +function homeStreamForUser(User $user) +{ + $auth = twitterAuthForUser($user); + return new TwitterUserStream($auth); +} + +$user = User::staticGet('nickname', $nickname); +$stream = homeStreamForUser($user); +$stream->hookEvent('raw', function($data) { + var_dump($data); +}); + +class TwitterManager extends IoManager +{ + function __construct(TwitterStreamReader $stream) + { + $this->stream = $stream; + } + + function getSockets() + { + return $this->stream->getSockets(); + } + + function handleInput($data) + { + $this->stream->handleInput($data); + return true; + } + + function start() + { + $this->stream->connect(); + return true; + } + + function finish() + { + $this->stream->close(); + return true; + } + + public static function get() + { + throw new Exception('not a singleton'); + } +} + +class TwitterStreamMaster extends IoMaster +{ + function __construct($id, $ioManager) + { + parent::__construct($id); + $this->ioManager = $ioManager; + } + + /** + * Initialize IoManagers which are appropriate to this instance. + */ + function initManagers() + { + $this->instantiate($this->ioManager); + } +} + +$master = new TwitterStreamMaster('TwitterStream', new TwitterManager($stream)); +$master->init(); +$master->service(); diff --git a/plugins/TwitterBridge/twitterstreamreader.php b/plugins/TwitterBridge/twitterstreamreader.php new file mode 100644 index 0000000000..e746228a38 --- /dev/null +++ b/plugins/TwitterBridge/twitterstreamreader.php @@ -0,0 +1,161 @@ +. + * + * @category Plugin + * @package StatusNet + * @author Brion Vibber + * @copyright 2010 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +// A single stream connection +abstract class TwitterStreamReader extends JsonStreamReader +{ + protected $callbacks = array(); + + function __construct(TwitterOAuthClient $auth, $baseUrl) + { + $this->baseUrl = $baseUrl; + $this->oauth = $auth; + } + + public function connect($method) + { + $url = $this->oAuthUrl($this->baseUrl . '/' . $method); + return parent::connect($url); + } + + /** + * Sign our target URL with OAuth auth stuff. + * + * @param string $url + * @param array $params + * @return string + */ + function oAuthUrl($url, $params=array()) + { + // In an ideal world this would be better encapsulated. :) + $request = OAuthRequest::from_consumer_and_token($this->oauth->consumer, + $this->oauth->token, 'GET', $url, $params); + $request->sign_request($this->oauth->sha1_method, + $this->oauth->consumer, $this->oauth->token); + + return $request->to_url(); + } + /** + * Add an event callback. Available event names include + * 'raw' (all data), 'friends', 'delete', 'scrubgeo', etc + * + * @param string $event + * @param callable $callback + */ + public function hookEvent($event, $callback) + { + $this->callbacks[$event][] = $callback; + } + + /** + * Call event handler callbacks for the given event. + * + * @param string $event + * @param mixed $arg1 ... one or more params to pass on + */ + public function fireEvent($event, $arg1) + { + if (array_key_exists($event, $this->callbacks)) { + $args = array_slice(func_get_args(), 1); + foreach ($this->callbacks[$event] as $callback) { + call_user_func_array($callback, $args); + } + } + } + + function handleJson(array $data) + { + $this->routeMessage($data); + } + + abstract function routeMessage($data); + + function handleMessage($data, $forUserId=null) + { + $this->fireEvent('raw', $data, $forUserId); + $known = array('friends'); + foreach ($known as $key) { + if (isset($data[$key])) { + $this->fireEvent($key, $data[$key], $forUserId); + } + } + } +} + +class TwitterSiteStream extends TwitterStreamReader +{ + protected $userIds; + + public function __construct(TwitterOAuthClient $auth, $baseUrl='https://stream.twitter.com') + { + parent::__construct($auth, $baseUrl); + } + + public function connect($method='2b/site.json') + { + return parent::connect($method); + } + + function followUsers($userIds) + { + $this->userIds = $userIds; + } + + /** + * Each message in the site stream tells us which user ID it should be + * routed to; we'll need that to let the caller know what to do. + * + * @param array $data + */ + function routeMessage($data) + { + parent::handleMessage($data['message'], $data['for_user']); + } +} + +class TwitterUserStream extends TwitterStreamReader +{ + public function __construct(TwitterOAuthClient $auth, $baseUrl='https://userstream.twitter.com') + { + parent::__construct($auth, $baseUrl); + } + + public function connect($method='2/user.json') + { + return parent::connect($method); + } + + /** + * Each message in the user stream is just ready to go. + * + * @param array $data + */ + function routeMessage($data) + { + parent::handleMessage($data, $this->userId); + } +} -- 2.39.5