]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - lib/stompqueuemanager.php
f057bd9e41718bda776bbf9ba48caf1eac4f2af7
[quix0rs-gnu-social.git] / lib / stompqueuemanager.php
1 <?php
2 /**
3  * StatusNet, the distributed open-source microblogging tool
4  *
5  * Abstract class for queue managers
6  *
7  * PHP version 5
8  *
9  * LICENCE: This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU Affero General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU Affero General Public License for more details.
18  *
19  * You should have received a copy of the GNU Affero General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  * @category  QueueManager
23  * @package   StatusNet
24  * @author    Evan Prodromou <evan@status.net>
25  * @author    Sarven Capadisli <csarven@status.net>
26  * @copyright 2009 StatusNet, Inc.
27  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
28  * @link      http://status.net/
29  */
30
31 require_once 'Stomp.php';
32
33
34 class StompQueueManager extends QueueManager
35 {
36     var $server = null;
37     var $username = null;
38     var $password = null;
39     var $base = null;
40     var $con = null;
41     
42     protected $sites = array();
43
44     function __construct()
45     {
46         parent::__construct();
47         $this->server   = common_config('queue', 'stomp_server');
48         $this->username = common_config('queue', 'stomp_username');
49         $this->password = common_config('queue', 'stomp_password');
50         $this->base     = common_config('queue', 'queue_basename');
51     }
52
53     /**
54      * Tell the i/o master we only need a single instance to cover
55      * all sites running in this process.
56      */
57     public static function multiSite()
58     {
59         return IoManager::INSTANCE_PER_PROCESS;
60     }
61
62     /**
63      * Record each site we'll be handling input for in this process,
64      * so we can listen to the necessary queues for it.
65      *
66      * @fixme possibly actually do subscription here to save another
67      *        loop over all sites later?
68      * @fixme possibly don't assume it's the current site
69      */
70     public function addSite($server)
71     {
72         $this->sites[] = $server;
73         $this->initialize();
74     }
75
76
77     /**
78      * Instantiate the appropriate QueueHandler class for the given queue.
79      *
80      * @param string $queue
81      * @return mixed QueueHandler or null
82      */
83     function getHandler($queue)
84     {
85         $handlers = $this->handlers[common_config('site', 'server')];
86         if (isset($handlers[$queue])) {
87             $class = $handlers[$queue];
88             if (class_exists($class)) {
89                 return new $class();
90             } else {
91                 common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
92             }
93         } else {
94             common_log(LOG_ERR, "Requested handler for unkown queue '$queue'");
95         }
96         return null;
97     }
98
99     /**
100      * Get a list of all registered queue transport names.
101      *
102      * @return array of strings
103      */
104     function getQueues()
105     {
106         $group = $this->activeGroup();
107         $site = common_config('site', 'server');
108         if (empty($this->groups[$site][$group])) {
109             return array();
110         } else {
111             return array_keys($this->groups[$site][$group]);
112         }
113     }
114
115     /**
116      * Register a queue transport name and handler class for your plugin.
117      * Only registered transports will be reliably picked up!
118      *
119      * @param string $transport
120      * @param string $class
121      * @param string $group
122      */
123     public function connect($transport, $class, $group='queuedaemon')
124     {
125         $this->handlers[common_config('site', 'server')][$transport] = $class;
126         $this->groups[common_config('site', 'server')][$group][$transport] = $class;
127     }
128
129     /**
130      * Saves a notice object reference into the queue item table.
131      * @return boolean true on success
132      */
133     public function enqueue($object, $queue)
134     {
135         $msg = $this->encode($object);
136         $rep = $this->logrep($object);
137
138         $this->_connect();
139
140         // XXX: serialize and send entire notice
141
142         $result = $this->con->send($this->queueName($queue),
143                                    $msg,                // BODY of the message
144                                    array ('created' => common_sql_now()));
145
146         if (!$result) {
147             common_log(LOG_ERR, "Error sending $rep to $queue queue");
148             return false;
149         }
150
151         common_log(LOG_DEBUG, "complete remote queueing $rep for $queue");
152         $this->stats('enqueued', $queue);
153     }
154
155     /**
156      * Send any sockets we're listening on to the IO manager
157      * to wait for input.
158      *
159      * @return array of resources
160      */
161     public function getSockets()
162     {
163         return array($this->con->getSocket());
164     }
165
166     /**
167      * We've got input to handle on our socket!
168      * Read any waiting Stomp frame(s) and process them.
169      *
170      * @param resource $socket
171      * @return boolean ok on success
172      */
173     public function handleInput($socket)
174     {
175         assert($socket === $this->con->getSocket());
176         $ok = true;
177         $frames = $this->con->readFrames();
178         foreach ($frames as $frame) {
179             $ok = $ok && $this->_handleItem($frame);
180         }
181         return $ok;
182     }
183
184     /**
185      * Initialize our connection and subscribe to all the queues
186      * we're going to need to handle...
187      *
188      * Side effects: in multi-site mode, may reset site configuration.
189      *
190      * @param IoMaster $master process/event controller
191      * @return bool return false on failure
192      */
193     public function start($master)
194     {
195         parent::start($master);
196         if ($this->sites) {
197             foreach ($this->sites as $server) {
198                 StatusNet::init($server);
199                 $this->doSubscribe();
200             }
201         } else {
202             $this->doSubscribe();
203         }
204         return true;
205     }
206     
207     /**
208      * Subscribe to all the queues we're going to need to handle...
209      *
210      * Side effects: in multi-site mode, may reset site configuration.
211      *
212      * @return bool return false on failure
213      */
214     public function finish()
215     {
216         if ($this->sites) {
217             foreach ($this->sites as $server) {
218                 StatusNet::init($server);
219                 $this->doUnsubscribe();
220             }
221         } else {
222             $this->doUnsubscribe();
223         }
224         return true;
225     }
226     
227     /**
228      * Lazy open connection to Stomp queue server.
229      */
230     protected function _connect()
231     {
232         if (empty($this->con)) {
233             $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'...");
234             $this->con = new LiberalStomp($this->server);
235
236             if ($this->con->connect($this->username, $this->password)) {
237                 $this->_log(LOG_INFO, "Connected.");
238             } else {
239                 $this->_log(LOG_ERR, 'Failed to connect to queue server');
240                 throw new ServerException('Failed to connect to queue server');
241             }
242         }
243     }
244
245     /**
246      * Subscribe to all enabled notice queues for the current site.
247      */
248     protected function doSubscribe()
249     {
250         $this->_connect();
251         foreach ($this->getQueues() as $queue) {
252             $rawqueue = $this->queueName($queue);
253             $this->_log(LOG_INFO, "Subscribing to $rawqueue");
254             $this->con->subscribe($rawqueue);
255         }
256     }
257     
258     /**
259      * Subscribe from all enabled notice queues for the current site.
260      */
261     protected function doUnsubscribe()
262     {
263         $this->_connect();
264         foreach ($this->getQueues() as $queue) {
265             $this->con->unsubscribe($this->queueName($queue));
266         }
267     }
268
269     /**
270      * Handle and acknowledge an event that's come in through a queue.
271      *
272      * If the queue handler reports failure, the message is requeued for later.
273      * Missing notices or handler classes will drop the message.
274      *
275      * Side effects: in multi-site mode, may reset site configuration to
276      * match the site that queued the event.
277      *
278      * @param StompFrame $frame
279      * @return bool
280      */
281     protected function _handleItem($frame)
282     {
283         list($site, $queue) = $this->parseDestination($frame->headers['destination']);
284         if ($site != common_config('site', 'server')) {
285             $this->stats('switch');
286             StatusNet::init($site);
287         }
288
289         if (is_numeric($frame->body)) {
290             $id = intval($frame->body);
291             $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
292
293             $notice = Notice::staticGet('id', $id);
294             if (empty($notice)) {
295                 $this->_log(LOG_WARNING, "Skipping missing $info");
296                 $this->con->ack($frame);
297                 $this->stats('badnotice', $queue);
298                 return false;
299             }
300
301             $item = $notice;
302         } else {
303             // @fixme should we serialize, or json, or what here?
304             $info = "string posted at {$frame->headers['created']} in queue $queue";
305             $item = $frame->body;
306         }
307
308         $handler = $this->getHandler($queue);
309         if (!$handler) {
310             $this->_log(LOG_ERROR, "Missing handler class; skipping $info");
311             $this->con->ack($frame);
312             $this->stats('badhandler', $queue);
313             return false;
314         }
315
316         $ok = $handler->handle($item);
317
318         if (!$ok) {
319             $this->_log(LOG_WARNING, "Failed handling $info");
320             // FIXME we probably shouldn't have to do
321             // this kind of queue management ourselves;
322             // if we don't ack, it should resend...
323             $this->con->ack($frame);
324             $this->enqueue($item, $queue);
325             $this->stats('requeued', $queue);
326             return false;
327         }
328
329         $this->_log(LOG_INFO, "Successfully handled $info");
330         $this->con->ack($frame);
331         $this->stats('handled', $queue);
332         return true;
333     }
334
335     /**
336      * Combines the queue_basename from configuration with the
337      * site server name and queue name to give eg:
338      *
339      * /queue/statusnet/identi.ca/sms
340      *
341      * @param string $queue
342      * @return string
343      */
344     protected function queueName($queue)
345     {
346         return common_config('queue', 'queue_basename') .
347             common_config('site', 'server') . '/' . $queue;
348     }
349
350     /**
351      * Returns the site and queue name from the server-side queue.
352      *
353      * @param string queue destination (eg '/queue/statusnet/identi.ca/sms')
354      * @return array of site and queue: ('identi.ca','sms') or false if unrecognized
355      */
356     protected function parseDestination($dest)
357     {
358         $prefix = common_config('queue', 'queue_basename');
359         if (substr($dest, 0, strlen($prefix)) == $prefix) {
360             $rest = substr($dest, strlen($prefix));
361             return explode("/", $rest, 2);
362         } else {
363             common_log(LOG_ERR, "Got a message from unrecognized stomp queue: $dest");
364             return array(false, false);
365         }
366     }
367
368     function _log($level, $msg)
369     {
370         common_log($level, 'StompQueueManager: '.$msg);
371     }
372 }
373