]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - plugins/OStatus/lib/hubprepqueuehandler.php
typo mixing up and in salmonaction
[quix0rs-gnu-social.git] / plugins / OStatus / lib / hubprepqueuehandler.php
1 <?php
2 /*
3  * StatusNet - the distributed open-source microblogging tool
4  * Copyright (C) 2010, StatusNet, Inc.
5  *
6  * This program is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU Affero General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU Affero General Public License for more details.
15  *
16  * You should have received a copy of the GNU Affero General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 /**
21  * When we have a large batch of PuSH consumers, we break the data set
22  * into smaller chunks. Enqueue final destinations...
23  *
24  * @package Hub
25  * @author Brion Vibber <brion@status.net>
26  */
27 class HubPrepQueueHandler extends QueueHandler
28 {
29     // Enqueue this many low-level distributions before re-queueing the rest
30     // of the batch to be processed later. Helps to keep latency down for other
31     // things happening during a particularly long OStatus delivery session.
32     //
33     // [Could probably ditch this if we had working message delivery priorities
34     // for queueing, but this isn't supported in ActiveMQ 5.3.]
35     const ROLLING_BATCH = 20;
36
37     function transport()
38     {
39         return 'hubprep';
40     }
41
42     function handle($data)
43     {
44         $topic = $data['topic'];
45         $atom = $data['atom'];
46         $pushCallbacks = $data['pushCallbacks'];
47
48         assert(is_string($atom));
49         assert(is_string($topic));
50         assert(is_array($pushCallbacks));
51
52         // Set up distribution for the first n subscribing sites...
53         // If we encounter an uncatchable error, queue handling should
54         // automatically re-run the batch, which could lead to some dupe
55         // distributions.
56         //
57         // Worst case is if one of these hubprep entries dies too many
58         // times and gets dropped; the rest of the batch won't get processed.
59         try {
60             $n = 0;
61             while (count($pushCallbacks) && $n < self::ROLLING_BATCH) {
62                 $n++;
63                 $callback = array_shift($pushCallbacks);
64                 $sub = HubSub::staticGet($topic, $callback);
65                 if (!$sub) {
66                     common_log(LOG_ERR, "Skipping PuSH delivery for deleted(?) consumer $callback on $topic");
67                     continue;
68                 }
69
70                 $sub->distribute($atom);
71             }
72         } catch (Exception $e) {
73             common_log(LOG_ERR, "Exception during PuSH batch out: " .
74                                 $e->getMessage() .
75                                 " prepping $topic to $callback");
76         }
77
78         // And re-queue the rest of the batch!
79         if (count($pushCallbacks) > 0) {
80             $sub = new HubSub();
81             $sub->topic = $topic;
82             $sub->bulkDistribute($atom, $pushCallbacks);
83         }
84
85         return true;
86     }
87 }