]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - lib/iomaster.php
Merge branch 'master' of git@gitorious.org:statusnet/mainline
[quix0rs-gnu-social.git] / lib / iomaster.php
1 <?php
2 /**
3  * StatusNet, the distributed open-source microblogging tool
4  *
5  * I/O manager to wrap around socket-reading and polling queue & connection managers.
6  *
7  * PHP version 5
8  *
9  * LICENCE: This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU Affero General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU Affero General Public License for more details.
18  *
19  * You should have received a copy of the GNU Affero General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  * @category  QueueManager
23  * @package   StatusNet
24  * @author    Brion Vibber <brion@status.net>
25  * @copyright 2009 StatusNet, Inc.
26  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
27  * @link      http://status.net/
28  */
29
30 class IoMaster
31 {
32     public $id;
33
34     protected $multiSite = false;
35     protected $managers = array();
36     protected $singletons = array();
37
38     protected $pollTimeouts = array();
39     protected $lastPoll = array();
40
41     /**
42      * @param string $id process ID to use in logging/monitoring
43      */
44     public function __construct($id)
45     {
46         $this->id = $id;
47         $this->monitor = new QueueMonitor();
48     }
49
50     public function init($multiSite=null)
51     {
52         if ($multiSite !== null) {
53             $this->multiSite = $multiSite;
54         }
55         if ($this->multiSite) {
56             $this->sites = $this->findAllSites();
57         } else {
58             $this->sites = array(common_config('site', 'server'));
59         }
60
61         if (empty($this->sites)) {
62             throw new Exception("Empty status_network table, cannot init");
63         }
64
65         foreach ($this->sites as $site) {
66             if ($site != common_config('site', 'server')) {
67                 StatusNet::init($site);
68             }
69
70             $classes = array();
71             if (Event::handle('StartIoManagerClasses', array(&$classes))) {
72                 $classes[] = 'QueueManager';
73                 if (common_config('xmpp', 'enabled')) {
74                     $classes[] = 'XmppManager'; // handles pings/reconnects
75                     $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations
76                 }
77             }
78             Event::handle('EndIoManagerClasses', array(&$classes));
79
80             foreach ($classes as $class) {
81                 $this->instantiate($class);
82             }
83         }
84     }
85
86     /**
87      * Pull all local sites from status_network table.
88      * @return array of hostnames
89      */
90     protected function findAllSites()
91     {
92         $hosts = array();
93         $sn = new Status_network();
94         $sn->find();
95         while ($sn->fetch()) {
96             $hosts[] = $sn->hostname;
97         }
98         return $hosts;
99     }
100
101     /**
102      * Instantiate an i/o manager class for the current site.
103      * If a multi-site capable handler is already present,
104      * we don't need to build a new one.
105      *
106      * @param string $class
107      */
108     protected function instantiate($class)
109     {
110         if (isset($this->singletons[$class])) {
111             // Already instantiated a multi-site-capable handler.
112             // Just let it know it should listen to this site too!
113             $this->singletons[$class]->addSite(common_config('site', 'server'));
114             return;
115         }
116
117         $manager = $this->getManager($class);
118
119         if ($this->multiSite) {
120             $caps = $manager->multiSite();
121             if ($caps == IoManager::SINGLE_ONLY) {
122                 throw new Exception("$class can't run with --all; aborting.");
123             }
124             if ($caps == IoManager::INSTANCE_PER_PROCESS) {
125                 // Save this guy for later!
126                 // We'll only need the one to cover multiple sites.
127                 $this->singletons[$class] = $manager;
128                 $manager->addSite(common_config('site', 'server'));
129             }
130         }
131
132         $this->managers[] = $manager;
133     }
134     
135     protected function getManager($class)
136     {
137         return call_user_func(array($class, 'get'));
138     }
139
140     /**
141      * Basic run loop...
142      *
143      * Initialize all io managers, then sit around waiting for input.
144      * Between events or timeouts, pass control back to idle() method
145      * to allow for any additional background processing.
146      */
147     function service()
148     {
149         $this->logState('init');
150         $this->start();
151
152         while (true) {
153             $timeouts = array_values($this->pollTimeouts);
154             $timeouts[] = 60; // default max timeout
155
156             // Wait for something on one of our sockets
157             $sockets = array();
158             $managers = array();
159             foreach ($this->managers as $manager) {
160                 foreach ($manager->getSockets() as $socket) {
161                     $sockets[] = $socket;
162                     $managers[] = $manager;
163                 }
164                 $timeouts[] = intval($manager->timeout());
165             }
166
167             $timeout = min($timeouts);
168             if ($sockets) {
169                 $read = $sockets;
170                 $write = array();
171                 $except = array();
172                 $this->logState('listening');
173                 common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data...");
174                 $ready = stream_select($read, $write, $except, $timeout, 0);
175
176                 if ($ready === false) {
177                     common_log(LOG_ERR, "Error selecting on sockets");
178                 } else if ($ready > 0) {
179                     foreach ($read as $socket) {
180                         $index = array_search($socket, $sockets, true);
181                         if ($index !== false) {
182                             $this->logState('queue');
183                             $managers[$index]->handleInput($socket);
184                         } else {
185                             common_log(LOG_ERR, "Saw input on a socket we didn't listen to");
186                         }
187                     }
188                 }
189             }
190
191             if ($timeout > 0 && empty($sockets)) {
192                 // If we had no listeners, sleep until the pollers' next requested wakeup.
193                 common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle...");
194                 $this->logState('sleep');
195                 sleep($timeout);
196             }
197
198             $this->logState('poll');
199             $this->poll();
200
201             $this->logState('idle');
202             $this->idle();
203
204             $memoryLimit = $this->softMemoryLimit();
205             if ($memoryLimit > 0) {
206                 $usage = memory_get_usage();
207                 if ($usage > $memoryLimit) {
208                     common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
209                     break;
210                 }
211             }
212         }
213
214         $this->logState('shutdown');
215         $this->finish();
216     }
217
218     /**
219      * Return fully-parsed soft memory limit in bytes.
220      * @return intval 0 or -1 if not set
221      */
222     function softMemoryLimit()
223     {
224         $softLimit = trim(common_config('queue', 'softlimit'));
225         if (substr($softLimit, -1) == '%') {
226             $limit = trim(ini_get('memory_limit'));
227             $limit = $this->parseMemoryLimit($limit);
228             if ($limit > 0) {
229                 return intval(substr($softLimit, 0, -1) * $limit / 100);
230             } else {
231                 return -1;
232             }
233         } else {
234             return $this->parseMemoryLimit($limit);
235         }
236         return $softLimit;
237     }
238
239     /**
240      * Interpret PHP shorthand for memory_limit and friends.
241      * Why don't they just expose the actual numeric value? :P
242      * @param string $mem
243      * @return int
244      */
245     protected function parseMemoryLimit($mem)
246     {
247         // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
248         $size = array('k' => 1024,
249                       'm' => 1024*1024,
250                       'g' => 1024*1024*1024);
251         if (empty($mem)) {
252             return 0;
253         } else if (is_numeric($mem)) {
254             return intval($mem);
255         } else {
256             $mult = strtolower(substr($mem, -1));
257             if (isset($size[$mult])) {
258                 return substr($mem, 0, -1) * $size[$mult];
259             } else {
260                 return intval($mem);
261             }
262         }
263     }
264
265     function start()
266     {
267         foreach ($this->managers as $index => $manager) {
268             $manager->start($this);
269             // @fixme error check
270             if ($manager->pollInterval()) {
271                 // We'll want to check for input on the first pass
272                 $this->pollTimeouts[$index] = 0;
273                 $this->lastPoll[$index] = 0;
274             }
275         }
276     }
277
278     function finish()
279     {
280         foreach ($this->managers as $manager) {
281             $manager->finish();
282             // @fixme error check
283         }
284     }
285
286     /**
287      * Called during the idle portion of the runloop to see which handlers
288      */
289     function poll()
290     {
291         foreach ($this->managers as $index => $manager) {
292             $interval = $manager->pollInterval();
293             if ($interval <= 0) {
294                 // Not a polling manager.
295                 continue;
296             }
297
298             if (isset($this->pollTimeouts[$index])) {
299                 $timeout = $this->pollTimeouts[$index];
300                 if (time() - $this->lastPoll[$index] < $timeout) {
301                     // Not time to poll yet.
302                     continue;
303                 }
304             } else {
305                 $timeout = 0;
306             }
307             $hit = $manager->poll();
308
309             $this->lastPoll[$index] = time();
310             if ($hit) {
311                 // Do the next poll quickly, there may be more input!
312                 $this->pollTimeouts[$index] = 0;
313             } else {
314                 // Empty queue. Exponential backoff up to the maximum poll interval.
315                 if ($timeout > 0) {
316                     $timeout = min($timeout * 2, $interval);
317                 } else {
318                     $timeout = 1;
319                 }
320                 $this->pollTimeouts[$index] = $timeout;
321             }
322         }
323     }
324
325     /**
326      * Called after each handled item or empty polling cycle.
327      * This is a good time to e.g. service your XMPP connection.
328      */
329     function idle()
330     {
331         foreach ($this->managers as $manager) {
332             $manager->idle();
333         }
334     }
335
336     /**
337      * Send thread state update to the monitoring server, if configured.
338      *
339      * @param string $state ('init', 'queue', 'shutdown' etc)
340      * @param string $substate (optional, eg queue name 'omb' 'sms' etc)
341      */
342     protected function logState($state, $substate='')
343     {
344         $this->monitor->logState($this->id, $state, $substate);
345     }
346
347     /**
348      * Send thread stats.
349      * Thread ID will be implicit; other owners can be listed as well
350      * for per-queue and per-site records.
351      *
352      * @param string $key counter name
353      * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01'
354      */
355     public function stats($key, $owners=array())
356     {
357         $owners[] = "thread:" . $this->id;
358         $this->monitor->stats($key, $owners);
359     }
360 }
361