]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - plugins/OStatus/classes/HubSub.php
Merge remote-tracking branch 'mainline/1.0.x' into people_tags_rebase
[quix0rs-gnu-social.git] / plugins / OStatus / classes / HubSub.php
1 <?php
2 /*
3  * StatusNet - the distributed open-source microblogging tool
4  * Copyright (C) 2010, StatusNet, Inc.
5  *
6  * This program is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU Affero General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU Affero General Public License for more details.
15  *
16  * You should have received a copy of the GNU Affero General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 if (!defined('STATUSNET')) {
21     exit(1);
22 }
23
24 /**
25  * PuSH feed subscription record
26  * @package Hub
27  * @author Brion Vibber <brion@status.net>
28  */
29 class HubSub extends Memcached_DataObject
30 {
31     public $__table = 'hubsub';
32
33     public $hashkey; // sha1(topic . '|' . $callback); (topic, callback) key is too long for myisam in utf8
34     public $topic;
35     public $callback;
36     public $secret;
37     public $lease;
38     public $sub_start;
39     public $sub_end;
40     public $created;
41     public $modified;
42
43     public /*static*/ function staticGet($topic, $callback)
44     {
45         return parent::staticGet(__CLASS__, 'hashkey', self::hashkey($topic, $callback));
46     }
47
48     protected static function hashkey($topic, $callback)
49     {
50         return sha1($topic . '|' . $callback);
51     }
52
53     /**
54      * return table definition for DB_DataObject
55      *
56      * DB_DataObject needs to know something about the table to manipulate
57      * instances. This method provides all the DB_DataObject needs to know.
58      *
59      * @return array array of column definitions
60      */
61     function table()
62     {
63         return array('hashkey' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
64                      'topic' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
65                      'callback' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
66                      'secret' => DB_DATAOBJECT_STR,
67                      'lease' =>  DB_DATAOBJECT_INT,
68                      'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
69                      'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
70                      'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL,
71                      'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL);
72     }
73
74     static function schemaDef()
75     {
76         return array(new ColumnDef('hashkey', 'char',
77                                    /*size*/40,
78                                    /*nullable*/false,
79                                    /*key*/'PRI'),
80                      new ColumnDef('topic', 'varchar',
81                                    /*size*/255,
82                                    /*nullable*/false,
83                                    /*key*/'MUL'),
84                      new ColumnDef('callback', 'varchar',
85                                    255, false),
86                      new ColumnDef('secret', 'text',
87                                    null, true),
88                      new ColumnDef('lease', 'int',
89                                    null, true),
90                      new ColumnDef('sub_start', 'datetime',
91                                    null, true),
92                      new ColumnDef('sub_end', 'datetime',
93                                    null, true),
94                      new ColumnDef('created', 'datetime',
95                                    null, false),
96                      new ColumnDef('modified', 'datetime',
97                                    null, false));
98     }
99
100     function keys()
101     {
102         return array_keys($this->keyTypes());
103     }
104
105     function sequenceKey()
106     {
107         return array(false, false, false);
108     }
109
110     /**
111      * return key definitions for DB_DataObject
112      *
113      * DB_DataObject needs to know about keys that the table has; this function
114      * defines them.
115      *
116      * @return array key definitions
117      */
118     function keyTypes()
119     {
120         return array('hashkey' => 'K');
121     }
122
123     /**
124      * Validates a requested lease length, sets length plus
125      * subscription start & end dates.
126      *
127      * Does not save to database -- use before insert() or update().
128      *
129      * @param int $length in seconds
130      */
131     function setLease($length)
132     {
133         assert(is_int($length));
134
135         $min = 86400;
136         $max = 86400 * 30;
137
138         if ($length == 0) {
139             // We want to garbage collect dead subscriptions!
140             $length = $max;
141         } elseif( $length < $min) {
142             $length = $min;
143         } else if ($length > $max) {
144             $length = $max;
145         }
146
147         $this->lease = $length;
148         $this->start_sub = common_sql_now();
149         $this->end_sub = common_sql_date(time() + $length);
150     }
151
152     /**
153      * Schedule a future verification ping to the subscriber.
154      * If queues are disabled, will be immediate.
155      *
156      * @param string $mode 'subscribe' or 'unsubscribe'
157      * @param string $token hub.verify_token value, if provided by client
158      */
159     function scheduleVerify($mode, $token=null, $retries=null)
160     {
161         if ($retries === null) {
162             $retries = intval(common_config('ostatus', 'hub_retries'));
163         }
164         $data = array('sub' => clone($this),
165                       'mode' => $mode,
166                       'token' => $token,
167                       'retries' => $retries);
168         $qm = QueueManager::get();
169         $qm->enqueue($data, 'hubconf');
170     }
171
172     /**
173      * Send a verification ping to subscriber, and if confirmed apply the changes.
174      * This may create, update, or delete the database record.
175      *
176      * @param string $mode 'subscribe' or 'unsubscribe'
177      * @param string $token hub.verify_token value, if provided by client
178      * @throws ClientException on failure
179      */
180     function verify($mode, $token=null)
181     {
182         assert($mode == 'subscribe' || $mode == 'unsubscribe');
183
184         $challenge = common_good_rand(32);
185         $params = array('hub.mode' => $mode,
186                         'hub.topic' => $this->topic,
187                         'hub.challenge' => $challenge);
188         if ($mode == 'subscribe') {
189             $params['hub.lease_seconds'] = $this->lease;
190         }
191         if ($token !== null) {
192             $params['hub.verify_token'] = $token;
193         }
194
195         // Any existing query string parameters must be preserved
196         $url = $this->callback;
197         if (strpos($url, '?') !== false) {
198             $url .= '&';
199         } else {
200             $url .= '?';
201         }
202         $url .= http_build_query($params, '', '&');
203
204         $request = new HTTPClient();
205         $response = $request->get($url);
206         $status = $response->getStatus();
207
208         if ($status >= 200 && $status < 300) {
209             common_log(LOG_INFO, "Verified $mode of $this->callback:$this->topic");
210         } else {
211             // TRANS: Client exception. %s is a HTTP status code.
212             throw new ClientException(sprintf(_m('Hub subscriber verification returned HTTP %s.'),$status));
213         }
214
215         $old = HubSub::staticGet($this->topic, $this->callback);
216         if ($mode == 'subscribe') {
217             if ($old) {
218                 $this->update($old);
219             } else {
220                 $ok = $this->insert();
221             }
222         } else if ($mode == 'unsubscribe') {
223             if ($old) {
224                 $old->delete();
225             } else {
226                 // That's ok, we're already unsubscribed.
227             }
228         }
229     }
230
231     /**
232      * Insert wrapper; transparently set the hash key from topic and callback columns.
233      * @return mixed success
234      */
235     function insert()
236     {
237         $this->hashkey = self::hashkey($this->topic, $this->callback);
238         $this->created = common_sql_now();
239         $this->modified = common_sql_now();
240         return parent::insert();
241     }
242
243     /**
244      * Update wrapper; transparently update modified column.
245      * @return boolean success
246      */
247     function update($old=null)
248     {
249         $this->modified = common_sql_now();
250         return parent::update($old);
251     }
252
253     /**
254      * Schedule delivery of a 'fat ping' to the subscriber's callback
255      * endpoint. If queues are disabled, this will run immediately.
256      *
257      * @param string $atom well-formed Atom feed
258      * @param int $retries optional count of retries if POST fails; defaults to hub_retries from config or 0 if unset
259      */
260     function distribute($atom, $retries=null)
261     {
262         if ($retries === null) {
263             $retries = intval(common_config('ostatus', 'hub_retries'));
264         }
265
266         if (common_config('ostatus', 'local_push_bypass')) {
267             // If target is a local site, bypass the web server and drop the
268             // item directly into the target's input queue.
269             $url = parse_url($this->callback);
270             $wildcard = common_config('ostatus', 'local_wildcard');
271             $site = Status_network::getFromHostname($url['host'], $wildcard);
272
273             if ($site) {
274                 if ($this->secret) {
275                     $hmac = 'sha1=' . hash_hmac('sha1', $atom, $this->secret);
276                 } else {
277                     $hmac = '';
278                 }
279
280                 // Hack: at the moment we stick the subscription ID in the callback
281                 // URL so we don't have to look inside the Atom to route the subscription.
282                 // For now this means we need to extract that from the target URL
283                 // so we can include it in the data.
284                 $parts = explode('/', $url['path']);
285                 $subId = intval(array_pop($parts));
286
287                 $data = array('feedsub_id' => $subId,
288                               'post' => $atom,
289                               'hmac' => $hmac);
290                 common_log(LOG_DEBUG, "Cross-site PuSH bypass enqueueing straight to $site->nickname feed $subId");
291                 $qm = QueueManager::get();
292                 $qm->enqueue($data, 'pushin', $site->nickname);
293                 return;
294             }
295         }
296
297         // We dare not clone() as when the clone is discarded it'll
298         // destroy the result data for the parent query.
299         // @fixme use clone() again when it's safe to copy an
300         // individual item from a multi-item query again.
301         $sub = HubSub::staticGet($this->topic, $this->callback);
302         $data = array('sub' => $sub,
303                       'atom' => $atom,
304                       'retries' => $retries);
305         common_log(LOG_INFO, "Queuing PuSH: $this->topic to $this->callback");
306         $qm = QueueManager::get();
307         $qm->enqueue($data, 'hubout');
308     }
309
310     /**
311      * Queue up a large batch of pushes to multiple subscribers
312      * for this same topic update.
313      *
314      * If queues are disabled, this will run immediately.
315      *
316      * @param string $atom well-formed Atom feed
317      * @param array $pushCallbacks list of callback URLs
318      */
319     function bulkDistribute($atom, $pushCallbacks)
320     {
321         $data = array('atom' => $atom,
322                       'topic' => $this->topic,
323                       'pushCallbacks' => $pushCallbacks);
324         common_log(LOG_INFO, "Queuing PuSH batch: $this->topic to " .
325                              count($pushCallbacks) . " sites");
326         $qm = QueueManager::get();
327         $qm->enqueue($data, 'hubprep');
328     }
329
330     /**
331      * Send a 'fat ping' to the subscriber's callback endpoint
332      * containing the given Atom feed chunk.
333      *
334      * Determination of which items to send should be done at
335      * a higher level; don't just shove in a complete feed!
336      *
337      * @param string $atom well-formed Atom feed
338      * @throws Exception (HTTP or general)
339      */
340     function push($atom)
341     {
342         $headers = array('Content-Type: application/atom+xml');
343         if ($this->secret) {
344             $hmac = hash_hmac('sha1', $atom, $this->secret);
345             $headers[] = "X-Hub-Signature: sha1=$hmac";
346         } else {
347             $hmac = '(none)';
348         }
349         common_log(LOG_INFO, "About to push feed to $this->callback for $this->topic, HMAC $hmac");
350
351         $request = new HTTPClient();
352         $request->setBody($atom);
353         $response = $request->post($this->callback, $headers);
354
355         if ($response->isOk()) {
356             return true;
357         } else {
358             // TRANS: Exception. %1$s is a response status code, %2$s is the body of the response.
359             throw new Exception(sprintf(_m('Callback returned status: %1$s. Body: %2$s'),
360                                 $response->getStatus(),trim($response->getBody())));
361         }
362     }
363 }