]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - plugins/TwitterBridge/jsonstreamreader.php
0d16b68bd49937c15e2c088ee820f3ea9cf9df97
[quix0rs-gnu-social.git] / plugins / TwitterBridge / jsonstreamreader.php
1 <?php
2 /**
3  * StatusNet, the distributed open-source microblogging tool
4  *
5  * PHP version 5
6  *
7  * LICENCE: This program is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU Affero General Public License as published by
9  * the Free Software Foundation, either version 3 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU Affero General Public License for more details.
16  *
17  * You should have received a copy of the GNU Affero General Public License
18  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
19  *
20  * @category  Plugin
21  * @package   StatusNet
22  * @author    Brion Vibber <brion@status.net>
23  * @copyright 2010 StatusNet, Inc.
24  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
25  * @link      http://status.net/
26  */
27
28 class OAuthData
29 {
30     public $consumer_key, $consumer_secret, $token, $token_secret;
31 }
32
33 /**
34  *
35  */
36 abstract class JsonStreamReader
37 {
38     const CRLF = "\r\n";
39
40     public $id;
41     protected $socket = null;
42     protected $state = 'init'; // 'init', 'connecting', 'waiting', 'headers', 'active'
43
44     public function __construct()
45     {
46         $this->id = get_class($this) . '.' . substr(md5(mt_rand()), 0, 8);
47     }
48
49     /**
50      * Starts asynchronous connect operation...
51      *
52      * @param <type> $url
53      */
54     public function connect($url)
55     {
56         common_log(LOG_DEBUG, "$this->id opening connection to $url");
57
58         $scheme = parse_url($url, PHP_URL_SCHEME);
59         if ($scheme == 'http') {
60             $rawScheme = 'tcp';
61         } else if ($scheme == 'https') {
62             $rawScheme = 'ssl';
63         } else {
64             throw new ServerException('Invalid URL scheme for HTTP stream reader');
65         }
66
67         $host = parse_url($url, PHP_URL_HOST);
68         $port = parse_url($url, PHP_URL_PORT);
69         if (!$port) {
70             if ($scheme == 'https') {
71                 $port = 443;
72             } else {
73                 $port = 80;
74             }
75         }
76
77         $path = parse_url($url, PHP_URL_PATH);
78         $query = parse_url($url, PHP_URL_QUERY);
79         if ($query) {
80             $path .= '?' . $query;
81         }
82
83         $errno = $errstr = null;
84         $timeout = 5;
85         //$flags = STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT;
86         $flags = STREAM_CLIENT_CONNECT;
87         // @fixme add SSL params
88         $this->socket = stream_socket_client("$rawScheme://$host:$port", $errno, $errstr, $timeout, $flags);
89
90         $this->send($this->httpOpen($host, $path));
91
92         stream_set_blocking($this->socket, false);
93         $this->state = 'waiting';
94     }
95
96     /**
97      * Send some fun data off to the server.
98      *
99      * @param string $buffer
100      */
101     function send($buffer)
102     {
103         fwrite($this->socket, $buffer);
104     }
105
106     /**
107      * Read next packet of data from the socket.
108      *
109      * @return string
110      */
111     function read()
112     {
113         $buffer = fread($this->socket, 65536);
114         return $buffer;
115     }
116
117     protected function httpOpen($host, $path)
118     {
119         $lines = array(
120             "GET $path HTTP/1.1",
121             "Host: $host",
122             "User-Agent: StatusNet/" . STATUSNET_VERSION . " (TwitterBridgePlugin)",
123             "Connection: close",
124             "",
125             ""
126         );
127         return implode(self::CRLF, $lines);
128     }
129
130     /**
131      * Close the current connection, if open.
132      */
133     public function close()
134     {
135         if ($this->isConnected()) {
136             common_log(LOG_DEBUG, "$this->id closing connection.");
137             fclose($this->socket);
138             $this->socket = null;
139         }
140     }
141
142     /**
143      * Are we currently connected?
144      *
145      * @return boolean
146      */
147     public function isConnected()
148     {
149         return $this->socket !== null;
150     }
151
152     /**
153      * Send any sockets we're listening on to the IO manager
154      * to wait for input.
155      *
156      * @return array of resources
157      */
158     public function getSockets()
159     {
160         if ($this->isConnected()) {
161             return array($this->socket);
162         }
163         return array();
164     }
165
166     /**
167      * Take a chunk of input over the horn and go go go! :D
168      * @param string $buffer
169      */
170     function handleInput($socket)
171     {
172         if ($this->socket !== $socket) {
173             throw new Exception('Got input from unexpected socket!');
174         }
175
176         $buffer = $this->read();
177         switch ($this->state)
178         {
179             case 'waiting':
180                 $this->handleInputWaiting($buffer);
181                 break;
182             case 'headers':
183                 $this->handleInputHeaders($buffer);
184                 break;
185             case 'active':
186                 $this->handleInputActive($buffer);
187                 break;
188             default:
189                 throw new Exception('Invalid state in handleInput: ' . $this->state);
190         }
191     }
192
193     function handleInputWaiting($buffer)
194     {
195         common_log(LOG_DEBUG, "$this->id Does this happen? " . $buffer);
196         $this->state = 'headers';
197         $this->handleInputHeaders($buffer);
198     }
199
200     function handleInputHeaders($buffer)
201     {
202         $lines = explode(self::CRLF, $buffer);
203         foreach ($lines as $line) {
204             if ($this->state == 'headers') {
205                 $this->handleLineHeaders($line);
206             } else if ($this->state == 'active') {
207                 $this->handleLineActive($line);
208             }
209         }
210     }
211
212     function handleInputActive($buffer)
213     {
214         // One JSON object on each line...
215         // Will we always deliver on packet boundaries?
216         $lines = explode(self::CRLF, $buffer);
217         foreach ($lines as $line) {
218             $this->handleLineActive($line);
219         }
220     }
221
222     function handleLineHeaders($line)
223     {
224         if ($line == '') {
225             $this->state = 'active';
226             common_log(LOG_DEBUG, "$this->id connection is active!");
227         } else {
228             common_log(LOG_DEBUG, "$this->id read HTTP header: $line");
229             $this->responseHeaders[] = $line;
230         }
231     }
232
233     function handleLineActive($line)
234     {
235         if ($line == "") {
236             // Server sends empty lines as keepalive.
237             return;
238         }
239         $data = json_decode($line, true);
240         if ($data) {
241             $this->handleJson($data);
242         } else {
243             common_log(LOG_ERR, "$this->id received bogus JSON data: " . var_export($line, true));
244         }
245     }
246
247     abstract protected function handleJson(array $data);
248 }