3 * StatusNet, the distributed open-source microblogging tool
7 * LICENCE: This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU Affero General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Affero General Public License for more details.
17 * You should have received a copy of the GNU Affero General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
22 * @author Brion Vibber <brion@status.net>
23 * @copyright 2010 StatusNet, Inc.
24 * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
25 * @link http://status.net/
30 public $consumer_key, $consumer_secret, $token, $token_secret;
36 abstract class JsonStreamReader
41 protected $socket = null;
42 protected $state = 'init'; // 'init', 'connecting', 'waiting', 'headers', 'active'
44 public function __construct()
46 $this->id = get_class($this) . '.' . substr(md5(mt_rand()), 0, 8);
50 * Starts asynchronous connect operation...
52 * @fixme Can we do the open-socket fully async to? (need write select infrastructure)
56 public function connect($url)
58 common_log(LOG_DEBUG, "$this->id opening connection to $url");
60 $scheme = parse_url($url, PHP_URL_SCHEME);
61 if ($scheme == 'http') {
63 } else if ($scheme == 'https') {
66 // TRANS: Server exception thrown when an invalid URL scheme is detected.
67 throw new ServerException(_m('Invalid URL scheme for HTTP stream reader.'));
70 $host = parse_url($url, PHP_URL_HOST);
71 $port = parse_url($url, PHP_URL_PORT);
73 if ($scheme == 'https') {
80 $path = parse_url($url, PHP_URL_PATH);
81 $query = parse_url($url, PHP_URL_QUERY);
83 $path .= '?' . $query;
86 $errno = $errstr = null;
88 //$flags = STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT;
89 $flags = STREAM_CLIENT_CONNECT;
90 // @fixme add SSL params
91 $this->socket = stream_socket_client("$rawScheme://$host:$port", $errno, $errstr, $timeout, $flags);
93 $this->send($this->httpOpen($host, $path));
95 stream_set_blocking($this->socket, false);
96 $this->state = 'waiting';
100 * Send some fun data off to the server.
102 * @param string $buffer
104 function send($buffer)
106 fwrite($this->socket, $buffer);
110 * Read next packet of data from the socket.
116 $buffer = fread($this->socket, 65536);
121 * Build HTTP request headers.
123 * @param string $host
124 * @param string $path
127 protected function httpOpen($host, $path)
130 "GET $path HTTP/1.1",
132 'User-Agent: ' . HTTPClient::userAgent() . ' (TwitterBridgePlugin)',
137 return implode(self::CRLF, $lines);
141 * Close the current connection, if open.
143 public function close()
145 if ($this->isConnected()) {
146 common_log(LOG_DEBUG, "$this->id closing connection.");
147 fclose($this->socket);
148 $this->socket = null;
153 * Are we currently connected?
157 public function isConnected()
159 return $this->socket !== null;
163 * Send any sockets we're listening on to the IO manager
166 * @return array of resources
168 public function getSockets()
170 if ($this->isConnected()) {
171 return array($this->socket);
177 * Take a chunk of input over the horn and go go go! :D
179 * @param string $buffer
181 public function handleInput($socket)
183 if ($this->socket !== $socket) {
184 // TRANS: Exception thrown when input from an inexpected socket is encountered.
185 throw new Exception(_m('Got input from unexpected socket!'));
189 $buffer = $this->read();
190 $lines = explode(self::CRLF, $buffer);
191 foreach ($lines as $line) {
192 $this->handleLine($line);
194 } catch (Exception $e) {
195 common_log(LOG_ERR, "$this->id aborting connection due to error: " . $e->getMessage());
196 fclose($this->socket);
201 protected function handleLine($line)
203 switch ($this->state)
206 $this->handleLineWaiting($line);
209 $this->handleLineHeaders($line);
212 $this->handleLineActive($line);
215 // TRANS: Exception thrown when an invalid state is encountered in handleLine.
216 // TRANS: %s is the invalid state.
217 throw new Exception(sprintf(_m('Invalid state in handleLine: %s.'),$this->state));
223 * @param <type> $line
225 protected function handleLineWaiting($line)
227 $bits = explode(' ', $line, 3);
228 if (count($bits) != 3) {
229 // TRANS: Exception thrown when an invalid response line is encountered.
230 // TRANS: %s is the invalid line.
231 throw new Exception(sprintf(_m('Invalid HTTP response line: %s.'),$line));
234 list($http, $status, $text) = $bits;
235 if (substr($http, 0, 5) != 'HTTP/') {
236 // TRANS: Exception thrown when an invalid response line part is encountered.
237 // TRANS: %1$s is the chunk, %2$s is the line.
238 throw new Exception(sprintf(_m('Invalid HTTP response line chunk "%1$s": %2$s.'),$http, $line));
240 if ($status != '200') {
241 // TRANS: Exception thrown when an invalid response code is encountered.
242 // TRANS: %1$s is the response code, %2$s is the line.
243 throw new Exception(sprintf(_m('Bad HTTP response code %1$s: %2$s.'),$status,$line));
245 common_log(LOG_DEBUG, "$this->id $line");
246 $this->state = 'headers';
249 protected function handleLineHeaders($line)
252 $this->state = 'active';
253 common_log(LOG_DEBUG, "$this->id connection is active!");
255 common_log(LOG_DEBUG, "$this->id read HTTP header: $line");
256 $this->responseHeaders[] = $line;
260 protected function handleLineActive($line)
263 // Server sends empty lines as keepalive.
266 $data = json_decode($line);
268 $this->handleJson($data);
270 common_log(LOG_ERR, "$this->id received bogus JSON data: " . var_export($line, true));
274 abstract protected function handleJson(stdClass $data);