]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - lib/stompqueuemanager.php
3090e0bfb6f48a45641c142984ad2082c1c4c19a
[quix0rs-gnu-social.git] / lib / stompqueuemanager.php
1 <?php
2 /**
3  * StatusNet, the distributed open-source microblogging tool
4  *
5  * Abstract class for queue managers
6  *
7  * PHP version 5
8  *
9  * LICENCE: This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU Affero General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU Affero General Public License for more details.
18  *
19  * You should have received a copy of the GNU Affero General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  * @category  QueueManager
23  * @package   StatusNet
24  * @author    Evan Prodromou <evan@status.net>
25  * @author    Sarven Capadisli <csarven@status.net>
26  * @copyright 2009 StatusNet, Inc.
27  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
28  * @link      http://status.net/
29  */
30
31 require_once 'Stomp.php';
32
33
34 class StompQueueManager extends QueueManager
35 {
36     var $server = null;
37     var $username = null;
38     var $password = null;
39     var $base = null;
40     var $con = null;
41     
42     protected $master = null;
43     protected $sites = array();
44
45     function __construct()
46     {
47         parent::__construct();
48         $this->server   = common_config('queue', 'stomp_server');
49         $this->username = common_config('queue', 'stomp_username');
50         $this->password = common_config('queue', 'stomp_password');
51         $this->base     = common_config('queue', 'queue_basename');
52     }
53
54     /**
55      * Tell the i/o master we only need a single instance to cover
56      * all sites running in this process.
57      */
58     public static function multiSite()
59     {
60         return IoManager::INSTANCE_PER_PROCESS;
61     }
62
63     /**
64      * Record each site we'll be handling input for in this process,
65      * so we can listen to the necessary queues for it.
66      *
67      * @fixme possibly actually do subscription here to save another
68      *        loop over all sites later?
69      */
70     public function addSite($server)
71     {
72         $this->sites[] = $server;
73     }
74
75     /**
76      * Saves a notice object reference into the queue item table.
77      * @return boolean true on success
78      */
79     public function enqueue($object, $queue)
80     {
81         $notice = $object;
82
83         $this->_connect();
84
85         // XXX: serialize and send entire notice
86
87         $result = $this->con->send($this->queueName($queue),
88                                    $notice->id,                 // BODY of the message
89                                    array ('created' => $notice->created));
90
91         if (!$result) {
92             common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
93             return false;
94         }
95
96         common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
97                    . $notice->id . ' for ' . $queue);
98         $this->stats('enqueued', $queue);
99     }
100
101     /**
102      * Send any sockets we're listening on to the IO manager
103      * to wait for input.
104      *
105      * @return array of resources
106      */
107     public function getSockets()
108     {
109         return array($this->con->getSocket());
110     }
111
112     /**
113      * We've got input to handle on our socket!
114      * Read any waiting Stomp frame(s) and process them.
115      *
116      * @param resource $socket
117      * @return boolean ok on success
118      */
119     public function handleInput($socket)
120     {
121         assert($socket === $this->con->getSocket());
122         $ok = true;
123         $frames = $this->con->readFrames();
124         foreach ($frames as $frame) {
125             $ok = $ok && $this->_handleNotice($frame);
126         }
127         return $ok;
128     }
129
130     /**
131      * Initialize our connection and subscribe to all the queues
132      * we're going to need to handle...
133      *
134      * Side effects: in multi-site mode, may reset site configuration.
135      *
136      * @param IoMaster $master process/event controller
137      * @return bool return false on failure
138      */
139     public function start($master)
140     {
141         parent::start($master);
142         if ($this->sites) {
143             foreach ($this->sites as $server) {
144                 StatusNet::init($server);
145                 $this->doSubscribe();
146             }
147         } else {
148             $this->doSubscribe();
149         }
150         return true;
151     }
152     
153     /**
154      * Subscribe to all the queues we're going to need to handle...
155      *
156      * Side effects: in multi-site mode, may reset site configuration.
157      *
158      * @return bool return false on failure
159      */
160     public function finish()
161     {
162         if ($this->sites) {
163             foreach ($this->sites as $server) {
164                 StatusNet::init($server);
165                 $this->doUnsubscribe();
166             }
167         } else {
168             $this->doUnsubscribe();
169         }
170         return true;
171     }
172     
173     /**
174      * Lazy open connection to Stomp queue server.
175      */
176     protected function _connect()
177     {
178         if (empty($this->con)) {
179             $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'...");
180             $this->con = new LiberalStomp($this->server);
181
182             if ($this->con->connect($this->username, $this->password)) {
183                 $this->_log(LOG_INFO, "Connected.");
184             } else {
185                 $this->_log(LOG_ERR, 'Failed to connect to queue server');
186                 throw new ServerException('Failed to connect to queue server');
187             }
188         }
189     }
190
191     /**
192      * Subscribe to all enabled notice queues for the current site.
193      */
194     protected function doSubscribe()
195     {
196         $this->_connect();
197         foreach ($this->getQueues() as $queue) {
198             $rawqueue = $this->queueName($queue);
199             $this->_log(LOG_INFO, "Subscribing to $rawqueue");
200             $this->con->subscribe($rawqueue);
201         }
202     }
203     
204     /**
205      * Subscribe from all enabled notice queues for the current site.
206      */
207     protected function doUnsubscribe()
208     {
209         $this->_connect();
210         foreach ($this->getQueues() as $queue) {
211             $this->con->unsubscribe($this->queueName($queue));
212         }
213     }
214
215     /**
216      * Handle and acknowledge a notice event that's come in through a queue.
217      *
218      * If the queue handler reports failure, the message is requeued for later.
219      * Missing notices or handler classes will drop the message.
220      *
221      * Side effects: in multi-site mode, may reset site configuration to
222      * match the site that queued the event.
223      *
224      * @param StompFrame $frame
225      * @return bool
226      */
227     protected function _handleNotice($frame)
228     {
229         list($site, $queue) = $this->parseDestination($frame->headers['destination']);
230         if ($site != common_config('site', 'server')) {
231             $this->stats('switch');
232             StatusNet::init($site);
233         }
234
235         $id = intval($frame->body);
236         $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
237
238         $notice = Notice::staticGet('id', $id);
239         if (empty($notice)) {
240             $this->_log(LOG_WARNING, "Skipping missing $info");
241             $this->con->ack($frame);
242             $this->stats('badnotice', $queue);
243             return false;
244         }
245
246         $handler = $this->getHandler($queue);
247         if (!$handler) {
248             $this->_log(LOG_ERROR, "Missing handler class; skipping $info");
249             $this->con->ack($frame);
250             $this->stats('badhandler', $queue);
251             return false;
252         }
253
254         $ok = $handler->handle_notice($notice);
255
256         if (!$ok) {
257             $this->_log(LOG_WARNING, "Failed handling $info");
258             // FIXME we probably shouldn't have to do
259             // this kind of queue management ourselves;
260             // if we don't ack, it should resend...
261             $this->con->ack($frame);
262             $this->enqueue($notice, $queue);
263             $this->stats('requeued', $queue);
264             return false;
265         }
266
267         $this->_log(LOG_INFO, "Successfully handled $info");
268         $this->con->ack($frame);
269         $this->stats('handled', $queue);
270         return true;
271     }
272
273     /**
274      * Combines the queue_basename from configuration with the
275      * site server name and queue name to give eg:
276      *
277      * /queue/statusnet/identi.ca/sms
278      *
279      * @param string $queue
280      * @return string
281      */
282     protected function queueName($queue)
283     {
284         return common_config('queue', 'queue_basename') .
285             common_config('site', 'server') . '/' . $queue;
286     }
287
288     /**
289      * Returns the site and queue name from the server-side queue.
290      *
291      * @param string queue destination (eg '/queue/statusnet/identi.ca/sms')
292      * @return array of site and queue: ('identi.ca','sms') or false if unrecognized
293      */
294     protected function parseDestination($dest)
295     {
296         $prefix = common_config('queue', 'queue_basename');
297         if (substr($dest, 0, strlen($prefix)) == $prefix) {
298             $rest = substr($dest, strlen($prefix));
299             return explode("/", $rest, 2);
300         } else {
301             common_log(LOG_ERR, "Got a message from unrecognized stomp queue: $dest");
302             return array(false, false);
303         }
304     }
305
306     function _log($level, $msg)
307     {
308         common_log($level, 'StompQueueManager: '.$msg);
309     }
310 }
311