]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - lib/iomaster.php
Merge branch 'testing' of gitorious.org:statusnet/mainline into 0.9.x
[quix0rs-gnu-social.git] / lib / iomaster.php
1 <?php
2 /**
3  * StatusNet, the distributed open-source microblogging tool
4  *
5  * I/O manager to wrap around socket-reading and polling queue & connection managers.
6  *
7  * PHP version 5
8  *
9  * LICENCE: This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU Affero General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU Affero General Public License for more details.
18  *
19  * You should have received a copy of the GNU Affero General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  * @category  QueueManager
23  * @package   StatusNet
24  * @author    Brion Vibber <brion@status.net>
25  * @copyright 2009 StatusNet, Inc.
26  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
27  * @link      http://status.net/
28  */
29
30 abstract class IoMaster
31 {
32     public $id;
33
34     protected $multiSite = false;
35     protected $managers = array();
36     protected $singletons = array();
37
38     protected $pollTimeouts = array();
39     protected $lastPoll = array();
40
41     public $shutdown = false; // Did we do a graceful shutdown?
42     public $respawn = true; // Should we respawn after shutdown?
43
44     /**
45      * @param string $id process ID to use in logging/monitoring
46      */
47     public function __construct($id)
48     {
49         $this->id = $id;
50         $this->monitor = new QueueMonitor();
51     }
52
53     public function init($multiSite=null)
54     {
55         if ($multiSite !== null) {
56             $this->multiSite = $multiSite;
57         }
58         if ($this->multiSite) {
59             $this->sites = $this->findAllSites();
60         } else {
61             $this->sites = array(common_config('site', 'server'));
62         }
63
64         if (empty($this->sites)) {
65             throw new Exception("Empty status_network table, cannot init");
66         }
67
68         foreach ($this->sites as $site) {
69             if ($site != common_config('site', 'server')) {
70                 StatusNet::init($site);
71             }
72             $this->initManagers();
73         }
74     }
75
76     /**
77      * Initialize IoManagers for the currently configured site
78      * which are appropriate to this instance.
79      *
80      * Pass class names into $this->instantiate()
81      */
82     abstract function initManagers();
83
84     /**
85      * Pull all local sites from status_network table.
86      * @return array of hostnames
87      */
88     protected function findAllSites()
89     {
90         $hosts = array();
91         $sn = new Status_network();
92         $sn->find();
93         while ($sn->fetch()) {
94             $hosts[] = $sn->getServerName();
95         }
96         return $hosts;
97     }
98
99     /**
100      * Instantiate an i/o manager class for the current site.
101      * If a multi-site capable handler is already present,
102      * we don't need to build a new one.
103      *
104      * @param string $class
105      */
106     protected function instantiate($class)
107     {
108         if (is_string($class) && isset($this->singletons[$class])) {
109             // Already instantiated a multi-site-capable handler.
110             // Just let it know it should listen to this site too!
111             $this->singletons[$class]->addSite(common_config('site', 'server'));
112             return;
113         }
114
115         $manager = $this->getManager($class);
116
117         if ($this->multiSite) {
118             $caps = $manager->multiSite();
119             if ($caps == IoManager::SINGLE_ONLY) {
120                 throw new Exception("$class can't run with --all; aborting.");
121             }
122             if ($caps == IoManager::INSTANCE_PER_PROCESS) {
123                 // Save this guy for later!
124                 // We'll only need the one to cover multiple sites.
125                 $this->singletons[$class] = $manager;
126                 $manager->addSite(common_config('site', 'server'));
127             }
128         }
129
130         $this->managers[] = $manager;
131     }
132     
133     protected function getManager($class)
134     {
135         if(is_object($class)){
136             return $class;
137         } else {
138             return call_user_func(array($class, 'get'));
139         }
140     }
141
142     /**
143      * Basic run loop...
144      *
145      * Initialize all io managers, then sit around waiting for input.
146      * Between events or timeouts, pass control back to idle() method
147      * to allow for any additional background processing.
148      */
149     function service()
150     {
151         $this->logState('init');
152         $this->start();
153
154         while (!$this->shutdown) {
155             $timeouts = array_values($this->pollTimeouts);
156             $timeouts[] = 60; // default max timeout
157
158             // Wait for something on one of our sockets
159             $sockets = array();
160             $managers = array();
161             foreach ($this->managers as $manager) {
162                 foreach ($manager->getSockets() as $socket) {
163                     $sockets[] = $socket;
164                     $managers[] = $manager;
165                 }
166                 $timeouts[] = intval($manager->timeout());
167             }
168
169             $timeout = min($timeouts);
170             if ($sockets) {
171                 $read = $sockets;
172                 $write = array();
173                 $except = array();
174                 $this->logState('listening');
175                 common_log(LOG_DEBUG, "Waiting up to $timeout seconds for socket data...");
176                 $ready = stream_select($read, $write, $except, $timeout, 0);
177
178                 if ($ready === false) {
179                     common_log(LOG_ERR, "Error selecting on sockets");
180                 } else if ($ready > 0) {
181                     foreach ($read as $socket) {
182                         $index = array_search($socket, $sockets, true);
183                         if ($index !== false) {
184                             $this->logState('queue');
185                             $managers[$index]->handleInput($socket);
186                         } else {
187                             common_log(LOG_ERR, "Saw input on a socket we didn't listen to");
188                         }
189                     }
190                 }
191             }
192
193             if ($timeout > 0 && empty($sockets)) {
194                 // If we had no listeners, sleep until the pollers' next requested wakeup.
195                 common_log(LOG_DEBUG, "Sleeping $timeout seconds until next poll cycle...");
196                 $this->logState('sleep');
197                 sleep($timeout);
198             }
199
200             $this->logState('poll');
201             $this->poll();
202
203             $this->logState('idle');
204             $this->idle();
205
206             $this->checkMemory();
207         }
208
209         $this->logState('shutdown');
210         $this->finish();
211     }
212
213     /**
214      * Check runtime memory usage, possibly triggering a graceful shutdown
215      * and thread respawn if we've crossed the soft limit.
216      */
217     protected function checkMemory()
218     {
219         $memoryLimit = $this->softMemoryLimit();
220         if ($memoryLimit > 0) {
221             $usage = memory_get_usage();
222             if ($usage > $memoryLimit) {
223                 common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
224                 $this->requestRestart();
225             } else if (common_config('queue', 'debug_memory')) {
226                 common_log(LOG_DEBUG, "Memory usage $usage");
227             }
228         }
229     }
230
231     /**
232      * Return fully-parsed soft memory limit in bytes.
233      * @return intval 0 or -1 if not set
234      */
235     function softMemoryLimit()
236     {
237         $softLimit = trim(common_config('queue', 'softlimit'));
238         if (substr($softLimit, -1) == '%') {
239             $limit = $this->parseMemoryLimit(ini_get('memory_limit'));
240             if ($limit > 0) {
241                 return intval(substr($softLimit, 0, -1) * $limit / 100);
242             } else {
243                 return -1;
244             }
245         } else {
246             return $this->parseMemoryLimit($softLimit);
247         }
248         return $softLimit;
249     }
250
251     /**
252      * Interpret PHP shorthand for memory_limit and friends.
253      * Why don't they just expose the actual numeric value? :P
254      * @param string $mem
255      * @return int
256      */
257     public function parseMemoryLimit($mem)
258     {
259         // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
260         $mem = strtolower(trim($mem));
261         $size = array('k' => 1024,
262                       'm' => 1024*1024,
263                       'g' => 1024*1024*1024);
264         if (empty($mem)) {
265             return 0;
266         } else if (is_numeric($mem)) {
267             return intval($mem);
268         } else {
269             $mult = substr($mem, -1);
270             if (isset($size[$mult])) {
271                 return substr($mem, 0, -1) * $size[$mult];
272             } else {
273                 return intval($mem);
274             }
275         }
276     }
277
278     function start()
279     {
280         foreach ($this->managers as $index => $manager) {
281             $manager->start($this);
282             // @fixme error check
283             if ($manager->pollInterval()) {
284                 // We'll want to check for input on the first pass
285                 $this->pollTimeouts[$index] = 0;
286                 $this->lastPoll[$index] = 0;
287             }
288         }
289     }
290
291     function finish()
292     {
293         foreach ($this->managers as $manager) {
294             $manager->finish();
295             // @fixme error check
296         }
297     }
298
299     /**
300      * Called during the idle portion of the runloop to see which handlers
301      */
302     function poll()
303     {
304         foreach ($this->managers as $index => $manager) {
305             $interval = $manager->pollInterval();
306             if ($interval <= 0) {
307                 // Not a polling manager.
308                 continue;
309             }
310
311             if (isset($this->pollTimeouts[$index])) {
312                 $timeout = $this->pollTimeouts[$index];
313                 if (time() - $this->lastPoll[$index] < $timeout) {
314                     // Not time to poll yet.
315                     continue;
316                 }
317             } else {
318                 $timeout = 0;
319             }
320             $hit = $manager->poll();
321
322             $this->lastPoll[$index] = time();
323             if ($hit) {
324                 // Do the next poll quickly, there may be more input!
325                 $this->pollTimeouts[$index] = 0;
326             } else {
327                 // Empty queue. Exponential backoff up to the maximum poll interval.
328                 if ($timeout > 0) {
329                     $timeout = min($timeout * 2, $interval);
330                 } else {
331                     $timeout = 1;
332                 }
333                 $this->pollTimeouts[$index] = $timeout;
334             }
335         }
336     }
337
338     /**
339      * Called after each handled item or empty polling cycle.
340      * This is a good time to e.g. service your XMPP connection.
341      */
342     function idle()
343     {
344         foreach ($this->managers as $manager) {
345             $manager->idle();
346         }
347     }
348
349     /**
350      * Send thread state update to the monitoring server, if configured.
351      *
352      * @param string $state ('init', 'queue', 'shutdown' etc)
353      * @param string $substate (optional, eg queue name 'omb' 'sms' etc)
354      */
355     protected function logState($state, $substate='')
356     {
357         $this->monitor->logState($this->id, $state, $substate);
358     }
359
360     /**
361      * Send thread stats.
362      * Thread ID will be implicit; other owners can be listed as well
363      * for per-queue and per-site records.
364      *
365      * @param string $key counter name
366      * @param array $owners list of owner keys like 'queue:xmpp' or 'site:stat01'
367      */
368     public function stats($key, $owners=array())
369     {
370         $owners[] = "thread:" . $this->id;
371         $this->monitor->stats($key, $owners);
372     }
373
374     /**
375      * For IoManagers to request a graceful shutdown at end of event loop.
376      */
377     public function requestShutdown()
378     {
379         $this->shutdown = true;
380         $this->respawn = false;
381     }
382
383     /**
384      * For IoManagers to request a graceful restart at end of event loop.
385      */
386     public function requestRestart()
387     {
388         $this->shutdown = true;
389         $this->respawn = true;
390     }
391
392 }
393