]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - lib/stompqueuemanager.php
a7d735d1cf835ec808b6148d8cb7ff95c49caa77
[quix0rs-gnu-social.git] / lib / stompqueuemanager.php
1 <?php
2 /**
3  * StatusNet, the distributed open-source microblogging tool
4  *
5  * Abstract class for queue managers
6  *
7  * PHP version 5
8  *
9  * LICENCE: This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU Affero General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU Affero General Public License for more details.
18  *
19  * You should have received a copy of the GNU Affero General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  * @category  QueueManager
23  * @package   StatusNet
24  * @author    Evan Prodromou <evan@status.net>
25  * @author    Sarven Capadisli <csarven@status.net>
26  * @copyright 2009 StatusNet, Inc.
27  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
28  * @link      http://status.net/
29  */
30
31 require_once 'Stomp.php';
32
33
34 class StompQueueManager extends QueueManager
35 {
36     var $server = null;
37     var $username = null;
38     var $password = null;
39     var $base = null;
40     var $con = null;
41     
42     protected $master = null;
43     protected $sites = array();
44
45     function __construct()
46     {
47         parent::__construct();
48         $this->server   = common_config('queue', 'stomp_server');
49         $this->username = common_config('queue', 'stomp_username');
50         $this->password = common_config('queue', 'stomp_password');
51         $this->base     = common_config('queue', 'queue_basename');
52     }
53
54     /**
55      * Tell the i/o master we only need a single instance to cover
56      * all sites running in this process.
57      */
58     public static function multiSite()
59     {
60         return IoManager::INSTANCE_PER_PROCESS;
61     }
62
63     /**
64      * Record each site we'll be handling input for in this process,
65      * so we can listen to the necessary queues for it.
66      *
67      * @fixme possibly actually do subscription here to save another
68      *        loop over all sites later?
69      * @fixme possibly don't assume it's the current site
70      */
71     public function addSite($server)
72     {
73         $this->sites[] = $server;
74         $this->initialize();
75     }
76
77
78     /**
79      * Instantiate the appropriate QueueHandler class for the given queue.
80      *
81      * @param string $queue
82      * @return mixed QueueHandler or null
83      */
84     function getHandler($queue)
85     {
86         $handlers = $this->handlers[common_config('site', 'server')];
87         if (isset($handlers[$queue])) {
88             $class = $handlers[$queue];
89             if (class_exists($class)) {
90                 return new $class();
91             } else {
92                 common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
93             }
94         } else {
95             common_log(LOG_ERR, "Requested handler for unkown queue '$queue'");
96         }
97         return null;
98     }
99
100     /**
101      * Get a list of all registered queue transport names.
102      *
103      * @return array of strings
104      */
105     function getQueues()
106     {
107         return array_keys($this->handlers[common_config('site', 'server')]);
108     }
109
110     /**
111      * Register a queue transport name and handler class for your plugin.
112      * Only registered transports will be reliably picked up!
113      *
114      * @param string $transport
115      * @param string $class
116      */
117     public function connect($transport, $class)
118     {
119         $this->handlers[common_config('site', 'server')][$transport] = $class;
120     }
121
122     /**
123      * Saves a notice object reference into the queue item table.
124      * @return boolean true on success
125      */
126     public function enqueue($object, $queue)
127     {
128         $notice = $object;
129
130         $this->_connect();
131
132         // XXX: serialize and send entire notice
133
134         $result = $this->con->send($this->queueName($queue),
135                                    $notice->id,                 // BODY of the message
136                                    array ('created' => $notice->created));
137
138         if (!$result) {
139             common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
140             return false;
141         }
142
143         common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
144                    . $notice->id . ' for ' . $queue);
145         $this->stats('enqueued', $queue);
146     }
147
148     /**
149      * Send any sockets we're listening on to the IO manager
150      * to wait for input.
151      *
152      * @return array of resources
153      */
154     public function getSockets()
155     {
156         return array($this->con->getSocket());
157     }
158
159     /**
160      * We've got input to handle on our socket!
161      * Read any waiting Stomp frame(s) and process them.
162      *
163      * @param resource $socket
164      * @return boolean ok on success
165      */
166     public function handleInput($socket)
167     {
168         assert($socket === $this->con->getSocket());
169         $ok = true;
170         $frames = $this->con->readFrames();
171         foreach ($frames as $frame) {
172             $ok = $ok && $this->_handleNotice($frame);
173         }
174         return $ok;
175     }
176
177     /**
178      * Initialize our connection and subscribe to all the queues
179      * we're going to need to handle...
180      *
181      * Side effects: in multi-site mode, may reset site configuration.
182      *
183      * @param IoMaster $master process/event controller
184      * @return bool return false on failure
185      */
186     public function start($master)
187     {
188         parent::start($master);
189         if ($this->sites) {
190             foreach ($this->sites as $server) {
191                 StatusNet::init($server);
192                 $this->doSubscribe();
193             }
194         } else {
195             $this->doSubscribe();
196         }
197         return true;
198     }
199     
200     /**
201      * Subscribe to all the queues we're going to need to handle...
202      *
203      * Side effects: in multi-site mode, may reset site configuration.
204      *
205      * @return bool return false on failure
206      */
207     public function finish()
208     {
209         if ($this->sites) {
210             foreach ($this->sites as $server) {
211                 StatusNet::init($server);
212                 $this->doUnsubscribe();
213             }
214         } else {
215             $this->doUnsubscribe();
216         }
217         return true;
218     }
219     
220     /**
221      * Lazy open connection to Stomp queue server.
222      */
223     protected function _connect()
224     {
225         if (empty($this->con)) {
226             $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'...");
227             $this->con = new LiberalStomp($this->server);
228
229             if ($this->con->connect($this->username, $this->password)) {
230                 $this->_log(LOG_INFO, "Connected.");
231             } else {
232                 $this->_log(LOG_ERR, 'Failed to connect to queue server');
233                 throw new ServerException('Failed to connect to queue server');
234             }
235         }
236     }
237
238     /**
239      * Subscribe to all enabled notice queues for the current site.
240      */
241     protected function doSubscribe()
242     {
243         $this->_connect();
244         foreach ($this->getQueues() as $queue) {
245             $rawqueue = $this->queueName($queue);
246             $this->_log(LOG_INFO, "Subscribing to $rawqueue");
247             $this->con->subscribe($rawqueue);
248         }
249     }
250     
251     /**
252      * Subscribe from all enabled notice queues for the current site.
253      */
254     protected function doUnsubscribe()
255     {
256         $this->_connect();
257         foreach ($this->getQueues() as $queue) {
258             $this->con->unsubscribe($this->queueName($queue));
259         }
260     }
261
262     /**
263      * Handle and acknowledge a notice event that's come in through a queue.
264      *
265      * If the queue handler reports failure, the message is requeued for later.
266      * Missing notices or handler classes will drop the message.
267      *
268      * Side effects: in multi-site mode, may reset site configuration to
269      * match the site that queued the event.
270      *
271      * @param StompFrame $frame
272      * @return bool
273      */
274     protected function _handleNotice($frame)
275     {
276         list($site, $queue) = $this->parseDestination($frame->headers['destination']);
277         if ($site != common_config('site', 'server')) {
278             $this->stats('switch');
279             StatusNet::init($site);
280         }
281
282         $id = intval($frame->body);
283         $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
284
285         $notice = Notice::staticGet('id', $id);
286         if (empty($notice)) {
287             $this->_log(LOG_WARNING, "Skipping missing $info");
288             $this->con->ack($frame);
289             $this->stats('badnotice', $queue);
290             return false;
291         }
292
293         $handler = $this->getHandler($queue);
294         if (!$handler) {
295             $this->_log(LOG_ERROR, "Missing handler class; skipping $info");
296             $this->con->ack($frame);
297             $this->stats('badhandler', $queue);
298             return false;
299         }
300
301         $ok = $handler->handle_notice($notice);
302
303         if (!$ok) {
304             $this->_log(LOG_WARNING, "Failed handling $info");
305             // FIXME we probably shouldn't have to do
306             // this kind of queue management ourselves;
307             // if we don't ack, it should resend...
308             $this->con->ack($frame);
309             $this->enqueue($notice, $queue);
310             $this->stats('requeued', $queue);
311             return false;
312         }
313
314         $this->_log(LOG_INFO, "Successfully handled $info");
315         $this->con->ack($frame);
316         $this->stats('handled', $queue);
317         return true;
318     }
319
320     /**
321      * Combines the queue_basename from configuration with the
322      * site server name and queue name to give eg:
323      *
324      * /queue/statusnet/identi.ca/sms
325      *
326      * @param string $queue
327      * @return string
328      */
329     protected function queueName($queue)
330     {
331         return common_config('queue', 'queue_basename') .
332             common_config('site', 'server') . '/' . $queue;
333     }
334
335     /**
336      * Returns the site and queue name from the server-side queue.
337      *
338      * @param string queue destination (eg '/queue/statusnet/identi.ca/sms')
339      * @return array of site and queue: ('identi.ca','sms') or false if unrecognized
340      */
341     protected function parseDestination($dest)
342     {
343         $prefix = common_config('queue', 'queue_basename');
344         if (substr($dest, 0, strlen($prefix)) == $prefix) {
345             $rest = substr($dest, strlen($prefix));
346             return explode("/", $rest, 2);
347         } else {
348             common_log(LOG_ERR, "Got a message from unrecognized stomp queue: $dest");
349             return array(false, false);
350         }
351     }
352
353     function _log($level, $msg)
354     {
355         common_log($level, 'StompQueueManager: '.$msg);
356     }
357 }
358