]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - lib/queuehandler.php
Merge commit 'jeff-themovie/small-fixes' into 0.8.x
[quix0rs-gnu-social.git] / lib / queuehandler.php
1 <?php
2 /*
3  * Laconica - a distributed open-source microblogging tool
4  * Copyright (C) 2008, 2009, Control Yourself, Inc.
5  *
6  * This program is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU Affero General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU Affero General Public License for more details.
15  *
16  * You should have received a copy of the GNU Affero General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 define('CLAIM_TIMEOUT', 1200);
21
22 if (!defined('LACONICA')) { exit(1); }
23
24 require_once(INSTALLDIR.'/lib/daemon.php');
25 require_once(INSTALLDIR.'/classes/Queue_item.php');
26 require_once(INSTALLDIR.'/classes/Notice.php');
27
28 class QueueHandler extends Daemon
29 {
30     var $_id = 'generic';
31
32     function __construct($id=null, $daemonize=true)
33     {
34         parent::__construct($daemonize);
35
36         if ($id) {
37             $this->set_id($id);
38         }
39     }
40
41     function class_name()
42     {
43         return ucfirst($this->transport()) . 'Handler';
44     }
45
46     function name()
47     {
48         return strtolower($this->class_name().'.'.$this->get_id());
49     }
50
51     function get_id()
52     {
53         return $this->_id;
54     }
55
56     function set_id($id)
57     {
58         $this->_id = $id;
59     }
60
61     function transport()
62     {
63         return null;
64     }
65
66     function start()
67     {
68     }
69
70     function finish()
71     {
72     }
73
74     function handle_notice($notice)
75     {
76         return true;
77     }
78
79     function db_dispatch() {
80         do {
81             $qi = Queue_item::top($this->transport());
82             if ($qi) {
83                 $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created));
84                 $notice = Notice::staticGet($qi->notice_id);
85                 if ($notice) {
86                     $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
87                     # XXX: what to do if broadcast fails?
88                     $result = $this->handle_notice($notice);
89                     if (!$result) {
90                         $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
91                         $orig = $qi;
92                         $qi->claimed = null;
93                         $qi->update($orig);
94                         $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id);
95                         continue;
96                     }
97                     $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
98                     $notice->free();
99                     unset($notice);
100                     $notice = null;
101                 } else {
102                     $this->log(LOG_WARNING, 'queue item for notice that does not exist');
103                 }
104                 $qi->delete();
105                 $qi->free();
106                 unset($qi);
107                 $this->idle(0);
108             } else {
109                 $this->clear_old_claims();
110                 $this->idle(5);
111             }
112         } while (true);
113     }
114
115     function stomp_dispatch() {
116
117         // use an external message queue system via STOMP
118         require_once("Stomp.php");
119
120         $server = common_config('queue','stomp_server');
121         $username = common_config('queue', 'stomp_username');
122         $password = common_config('queue', 'stomp_password');
123
124         $con = new Stomp($server);
125
126         if (!$con->connect($username, $password)) {
127             $this->log(LOG_ERR, 'Failed to connect to queue server');
128             return false;
129         }
130
131         $queue_basename = common_config('queue','queue_basename');
132         // subscribe to the relevant queue (format: basename-transport)
133         $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport());
134
135         do {
136             $frame = $con->readFrame();
137             if ($frame) {
138                 $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created']));
139
140                 // XXX: Now the queue handler receives only the ID of the
141                 // notice, and it has to get it from the DB
142                 // A massive improvement would be avoid DB query by transmitting
143                 // all the notice details via queue server...
144                 $notice = Notice::staticGet($frame->body);
145
146                 if ($notice) {
147                     $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
148                     $result = $this->handle_notice($notice);
149                     if ($result) {
150                         // if the msg has been handled positively, ack it
151                         // and the queue server will remove it from the queue
152                         $con->ack($frame);
153                         $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
154                     }
155                     else {
156                         // no ack
157                         $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
158                     }
159                     $notice->free();
160                     unset($notice);
161                     $notice = null;
162                 } else {
163                     $this->log(LOG_WARNING, 'queue item for notice that does not exist');
164                 }
165             }
166         } while (true);
167
168         $con->disconnect();
169     }
170
171     function run()
172     {
173         if (!$this->start()) {
174             return false;
175         }
176         $this->log(LOG_INFO, 'checking for queued notices');
177         if (common_config('queue','subsystem') == 'stomp') {
178             $this->stomp_dispatch();
179         }
180         else {
181             $this->db_dispatch();
182         }
183         if (!$this->finish()) {
184             return false;
185         }
186         return true;
187     }
188
189     function idle($timeout=0)
190     {
191         if ($timeout>0) {
192             sleep($timeout);
193         }
194     }
195
196     function clear_old_claims()
197     {
198         $qi = new Queue_item();
199         $qi->transport = $this->transport();
200         $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
201         $qi->update(DB_DATAOBJECT_WHEREADD_ONLY);
202         $qi->free();
203         unset($qi);
204     }
205
206     function log($level, $msg)
207     {
208         common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
209     }
210 }
211