]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - lib/stompqueuemanager.php
Merge branch 'master' of gitorious.org:statusnet/mainline
[quix0rs-gnu-social.git] / lib / stompqueuemanager.php
1 <?php
2 /**
3  * StatusNet, the distributed open-source microblogging tool
4  *
5  * Abstract class for queue managers
6  *
7  * PHP version 5
8  *
9  * LICENCE: This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU Affero General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU Affero General Public License for more details.
18  *
19  * You should have received a copy of the GNU Affero General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  * @category  QueueManager
23  * @package   StatusNet
24  * @author    Evan Prodromou <evan@status.net>
25  * @author    Sarven Capadisli <csarven@status.net>
26  * @copyright 2009 StatusNet, Inc.
27  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
28  * @link      http://status.net/
29  */
30
31 require_once 'Stomp.php';
32
33
34 class StompQueueManager extends QueueManager
35 {
36     var $server = null;
37     var $username = null;
38     var $password = null;
39     var $base = null;
40     var $con = null;
41     
42     protected $master = null;
43     protected $sites = array();
44
45     function __construct()
46     {
47         parent::__construct();
48         $this->server   = common_config('queue', 'stomp_server');
49         $this->username = common_config('queue', 'stomp_username');
50         $this->password = common_config('queue', 'stomp_password');
51         $this->base     = common_config('queue', 'queue_basename');
52     }
53
54     /**
55      * Tell the i/o master we only need a single instance to cover
56      * all sites running in this process.
57      */
58     public static function multiSite()
59     {
60         return IoManager::INSTANCE_PER_PROCESS;
61     }
62
63     /**
64      * Record each site we'll be handling input for in this process,
65      * so we can listen to the necessary queues for it.
66      *
67      * @fixme possibly actually do subscription here to save another
68      *        loop over all sites later?
69      * @fixme possibly don't assume it's the current site
70      */
71     public function addSite($server)
72     {
73         $this->sites[] = $server;
74         $this->initialize();
75     }
76
77
78     /**
79      * Instantiate the appropriate QueueHandler class for the given queue.
80      *
81      * @param string $queue
82      * @return mixed QueueHandler or null
83      */
84     function getHandler($queue)
85     {
86         $handlers = $this->handlers[common_config('site', 'server')];
87         if (isset($handlers[$queue])) {
88             $class = $handlers[$queue];
89             if (class_exists($class)) {
90                 return new $class();
91             } else {
92                 common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
93             }
94         } else {
95             common_log(LOG_ERR, "Requested handler for unkown queue '$queue'");
96         }
97         return null;
98     }
99
100     /**
101      * Get a list of all registered queue transport names.
102      *
103      * @return array of strings
104      */
105     function getQueues()
106     {
107         $site = common_config('site', 'server');
108         if (empty($this->handlers[$site])) {
109             return array();
110         } else {
111             return array_keys($this->handlers[$site]);
112         }
113     }
114
115     /**
116      * Register a queue transport name and handler class for your plugin.
117      * Only registered transports will be reliably picked up!
118      *
119      * @param string $transport
120      * @param string $class
121      */
122     public function connect($transport, $class)
123     {
124         $this->handlers[common_config('site', 'server')][$transport] = $class;
125     }
126
127     /**
128      * Saves a notice object reference into the queue item table.
129      * @return boolean true on success
130      */
131     public function enqueue($object, $queue)
132     {
133         $notice = $object;
134
135         $this->_connect();
136
137         // XXX: serialize and send entire notice
138
139         $result = $this->con->send($this->queueName($queue),
140                                    $notice->id,                 // BODY of the message
141                                    array ('created' => $notice->created));
142
143         if (!$result) {
144             common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
145             return false;
146         }
147
148         common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
149                    . $notice->id . ' for ' . $queue);
150         $this->stats('enqueued', $queue);
151     }
152
153     /**
154      * Send any sockets we're listening on to the IO manager
155      * to wait for input.
156      *
157      * @return array of resources
158      */
159     public function getSockets()
160     {
161         return array($this->con->getSocket());
162     }
163
164     /**
165      * We've got input to handle on our socket!
166      * Read any waiting Stomp frame(s) and process them.
167      *
168      * @param resource $socket
169      * @return boolean ok on success
170      */
171     public function handleInput($socket)
172     {
173         assert($socket === $this->con->getSocket());
174         $ok = true;
175         $frames = $this->con->readFrames();
176         foreach ($frames as $frame) {
177             $ok = $ok && $this->_handleNotice($frame);
178         }
179         return $ok;
180     }
181
182     /**
183      * Initialize our connection and subscribe to all the queues
184      * we're going to need to handle...
185      *
186      * Side effects: in multi-site mode, may reset site configuration.
187      *
188      * @param IoMaster $master process/event controller
189      * @return bool return false on failure
190      */
191     public function start($master)
192     {
193         parent::start($master);
194         if ($this->sites) {
195             foreach ($this->sites as $server) {
196                 StatusNet::init($server);
197                 $this->doSubscribe();
198             }
199         } else {
200             $this->doSubscribe();
201         }
202         return true;
203     }
204     
205     /**
206      * Subscribe to all the queues we're going to need to handle...
207      *
208      * Side effects: in multi-site mode, may reset site configuration.
209      *
210      * @return bool return false on failure
211      */
212     public function finish()
213     {
214         if ($this->sites) {
215             foreach ($this->sites as $server) {
216                 StatusNet::init($server);
217                 $this->doUnsubscribe();
218             }
219         } else {
220             $this->doUnsubscribe();
221         }
222         return true;
223     }
224     
225     /**
226      * Lazy open connection to Stomp queue server.
227      */
228     protected function _connect()
229     {
230         if (empty($this->con)) {
231             $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'...");
232             $this->con = new LiberalStomp($this->server);
233
234             if ($this->con->connect($this->username, $this->password)) {
235                 $this->_log(LOG_INFO, "Connected.");
236             } else {
237                 $this->_log(LOG_ERR, 'Failed to connect to queue server');
238                 throw new ServerException('Failed to connect to queue server');
239             }
240         }
241     }
242
243     /**
244      * Subscribe to all enabled notice queues for the current site.
245      */
246     protected function doSubscribe()
247     {
248         $this->_connect();
249         foreach ($this->getQueues() as $queue) {
250             $rawqueue = $this->queueName($queue);
251             $this->_log(LOG_INFO, "Subscribing to $rawqueue");
252             $this->con->subscribe($rawqueue);
253         }
254     }
255     
256     /**
257      * Subscribe from all enabled notice queues for the current site.
258      */
259     protected function doUnsubscribe()
260     {
261         $this->_connect();
262         foreach ($this->getQueues() as $queue) {
263             $this->con->unsubscribe($this->queueName($queue));
264         }
265     }
266
267     /**
268      * Handle and acknowledge a notice event that's come in through a queue.
269      *
270      * If the queue handler reports failure, the message is requeued for later.
271      * Missing notices or handler classes will drop the message.
272      *
273      * Side effects: in multi-site mode, may reset site configuration to
274      * match the site that queued the event.
275      *
276      * @param StompFrame $frame
277      * @return bool
278      */
279     protected function _handleNotice($frame)
280     {
281         list($site, $queue) = $this->parseDestination($frame->headers['destination']);
282         if ($site != common_config('site', 'server')) {
283             $this->stats('switch');
284             StatusNet::init($site);
285         }
286
287         $id = intval($frame->body);
288         $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
289
290         $notice = Notice::staticGet('id', $id);
291         if (empty($notice)) {
292             $this->_log(LOG_WARNING, "Skipping missing $info");
293             $this->con->ack($frame);
294             $this->stats('badnotice', $queue);
295             return false;
296         }
297
298         $handler = $this->getHandler($queue);
299         if (!$handler) {
300             $this->_log(LOG_ERROR, "Missing handler class; skipping $info");
301             $this->con->ack($frame);
302             $this->stats('badhandler', $queue);
303             return false;
304         }
305
306         $ok = $handler->handle_notice($notice);
307
308         if (!$ok) {
309             $this->_log(LOG_WARNING, "Failed handling $info");
310             // FIXME we probably shouldn't have to do
311             // this kind of queue management ourselves;
312             // if we don't ack, it should resend...
313             $this->con->ack($frame);
314             $this->enqueue($notice, $queue);
315             $this->stats('requeued', $queue);
316             return false;
317         }
318
319         $this->_log(LOG_INFO, "Successfully handled $info");
320         $this->con->ack($frame);
321         $this->stats('handled', $queue);
322         return true;
323     }
324
325     /**
326      * Combines the queue_basename from configuration with the
327      * site server name and queue name to give eg:
328      *
329      * /queue/statusnet/identi.ca/sms
330      *
331      * @param string $queue
332      * @return string
333      */
334     protected function queueName($queue)
335     {
336         return common_config('queue', 'queue_basename') .
337             common_config('site', 'server') . '/' . $queue;
338     }
339
340     /**
341      * Returns the site and queue name from the server-side queue.
342      *
343      * @param string queue destination (eg '/queue/statusnet/identi.ca/sms')
344      * @return array of site and queue: ('identi.ca','sms') or false if unrecognized
345      */
346     protected function parseDestination($dest)
347     {
348         $prefix = common_config('queue', 'queue_basename');
349         if (substr($dest, 0, strlen($prefix)) == $prefix) {
350             $rest = substr($dest, strlen($prefix));
351             return explode("/", $rest, 2);
352         } else {
353             common_log(LOG_ERR, "Got a message from unrecognized stomp queue: $dest");
354             return array(false, false);
355         }
356     }
357
358     function _log($level, $msg)
359     {
360         common_log($level, 'StompQueueManager: '.$msg);
361     }
362 }
363