]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - lib/iomaster.php
Merge branch 'master' into testing
[quix0rs-gnu-social.git] / lib / iomaster.php
1 <?php
2 /**
3  * StatusNet, the distributed open-source microblogging tool
4  *
5  * I/O manager to wrap around socket-reading and polling queue & connection managers.
6  *
7  * PHP version 5
8  *
9  * LICENCE: This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU Affero General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU Affero General Public License for more details.
18  *
19  * You should have received a copy of the GNU Affero General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  * @category  QueueManager
23  * @package   StatusNet
24  * @author    Brion Vibber <brion@status.net>
25  * @copyright 2009 StatusNet, Inc.
26  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
27  * @link      http://status.net/
28  */
29
30 abstract class IoMaster
31 {
32     public $id;
33
34     protected $multiSite = false;
35     protected $managers = array();
36     protected $singletons = array();
37
38     protected $pollTimeouts = array();
39     protected $lastPoll = array();
40
41     public $shutdown = false; // Did we do a graceful shutdown?
42     public $respawn = true; // Should we respawn after shutdown?
43
44     /**
45      * @param string $id process ID to use in logging/monitoring
46      */
47     public function __construct($id)
48     {
49         $this->id = $id;
50         $this->monitor = new QueueMonitor();
51     }
52
53     public function init($multiSite=null)
54     {
55         if ($multiSite !== null) {
56             $this->multiSite = $multiSite;
57         }
58         if ($this->multiSite) {
59             $this->sites = StatusNet::findAllSites();
60         } else {
61             $this->sites = array(StatusNet::currentSite());
62         }
63
64         if (empty($this->sites)) {
65             throw new Exception("Empty status_network table, cannot init");
66         }
67
68         foreach ($this->sites as $site) {
69             StatusNet::switchSite($site);
70             $this->initManagers();
71         }
72     }
73
74     /**
75      * Initialize IoManagers for the currently configured site
76      * which are appropriate to this instance.
77      *
78      * Pass class names into $this->instantiate()
79      */
80     abstract function initManagers();
81
82     /**
83      * Instantiate an i/o manager class for the current site.
84      * If a multi-site capable handler is already present,
85      * we don't need to build a new one.
86      *
87      * @param mixed $manager class name (to run $class::get()) or object
88      */
89     protected function instantiate($manager)
90     {
91         if (is_string($manager)) {
92             $manager = call_user_func(array($class, 'get'));
93         }
94
95         $caps = $manager->multiSite();
96         if ($caps == IoManager::SINGLE_ONLY) {
97             if ($this->multiSite) {
98                 throw new Exception("$class can't run with --all; aborting.");
99             }
100         } else if ($caps == IoManager::INSTANCE_PER_PROCESS) {
101             $manager->addSite();
102         }
103
104         if (!in_array($manager, $this->managers, true)) {
105             // Only need to save singletons once
106             $this->managers[] = $manager;
107         }
108     }
109
110     /**
111      * Basic run loop...
112      *
113      * Initialize all io managers, then sit around waiting for input.
114      * Between events or timeouts, pass control back to idle() method
115      * to allow for any additional background processing.
116      */
117     function service()
118     {
119         $this->logState('init');
120         $this->start();
121         $this->checkMemory(false);
122
123         while (!$this->shutdown) {
124             $timeouts = array_values($this->pollTimeouts);
125             $timeouts[] = 60; // default max timeout
126
127             // Wait for something on one of our sockets
128             $sockets = array();
129             $managers = array();
130             foreach ($this->managers as $manager) {
131                 foreach ($manager->getSockets() as $socket) {
132                     $sockets[] = $socket;
133                     $managers[] = $manager;
134                 }
135                 $timeouts[] = intval($manager->timeout());
136             }
137
138             $timeout = min($timeouts);
139             if ($sockets) {
140                 $read = $sockets;
141                 $write = array();
142                 $except = array();
143                 $this->logState('listening');
144                 common_log(LOG_DEBUG, "Waiting up to $timeout seconds for socket data...");
145                 $ready = stream_select($read, $write, $except, $timeout, 0);
146
147                 if ($ready === false) {
148                     common_log(LOG_ERR, "Error selecting on sockets");
149                 } else if ($ready > 0) {
150                     foreach ($read as $socket) {
151                         $index = array_search($socket, $sockets, true);
152                         if ($index !== false) {
153                             $this->logState('queue');
154                             $managers[$index]->handleInput($socket);
155                         } else {
156                             common_log(LOG_ERR, "Saw input on a socket we didn't listen to");
157                         }
158                     }
159                 }
160             }
161
162             if ($timeout > 0 && empty($sockets)) {
163                 // If we had no listeners, sleep until the pollers' next requested wakeup.
164                 common_log(LOG_DEBUG, "Sleeping $timeout seconds until next poll cycle...");
165                 $this->logState('sleep');
166                 sleep($timeout);
167             }
168
169             $this->logState('poll');
170             $this->poll();
171
172             $this->logState('idle');
173             $this->idle();
174
175             $this->checkMemory();
176         }
177
178         $this->logState('shutdown');
179         $this->finish();
180     }
181
182     /**
183      * Check runtime memory usage, possibly triggering a graceful shutdown
184      * and thread respawn if we've crossed the soft limit.
185      *
186      * @param boolean $respawn if false we'll shut down instead of respawning
187      */
188     protected function checkMemory($respawn=true)
189     {
190         $memoryLimit = $this->softMemoryLimit();
191         if ($memoryLimit > 0) {
192             $usage = memory_get_usage();
193             if ($usage > $memoryLimit) {
194                 common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
195                 if ($respawn) {
196                     $this->requestRestart();
197                 } else {
198                     $this->requestShutdown();
199                 }
200             } else if (common_config('queue', 'debug_memory')) {
201                 $fmt = number_format($usage);
202                 common_log(LOG_DEBUG, "Memory usage $fmt");
203             }
204         }
205     }
206
207     /**
208      * Return fully-parsed soft memory limit in bytes.
209      * @return intval 0 or -1 if not set
210      */
211     function softMemoryLimit()
212     {
213         $softLimit = trim(common_config('queue', 'softlimit'));
214         if (substr($softLimit, -1) == '%') {
215             $limit = $this->parseMemoryLimit(ini_get('memory_limit'));
216             if ($limit > 0) {
217                 return intval(substr($softLimit, 0, -1) * $limit / 100);
218             } else {
219                 return -1;
220             }
221         } else {
222             return $this->parseMemoryLimit($softLimit);
223         }
224         return $softLimit;
225     }
226
227     /**
228      * Interpret PHP shorthand for memory_limit and friends.
229      * Why don't they just expose the actual numeric value? :P
230      * @param string $mem
231      * @return int
232      */
233     public function parseMemoryLimit($mem)
234     {
235         // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
236         $mem = strtolower(trim($mem));
237         $size = array('k' => 1024,
238                       'm' => 1024*1024,
239                       'g' => 1024*1024*1024);
240         if (empty($mem)) {
241             return 0;
242         } else if (is_numeric($mem)) {
243             return intval($mem);
244         } else {
245             $mult = substr($mem, -1);
246             if (isset($size[$mult])) {
247                 return substr($mem, 0, -1) * $size[$mult];
248             } else {
249                 return intval($mem);
250             }
251         }
252     }
253
254     function start()
255     {
256         foreach ($this->managers as $index => $manager) {
257             $manager->start($this);
258             // @fixme error check
259             if ($manager->pollInterval()) {
260                 // We'll want to check for input on the first pass
261                 $this->pollTimeouts[$index] = 0;
262                 $this->lastPoll[$index] = 0;
263             }
264         }
265     }
266
267     function finish()
268     {
269         foreach ($this->managers as $manager) {
270             $manager->finish();
271             // @fixme error check
272         }
273     }
274
275     /**
276      * Called during the idle portion of the runloop to see which handlers
277      */
278     function poll()
279     {
280         foreach ($this->managers as $index => $manager) {
281             $interval = $manager->pollInterval();
282             if ($interval <= 0) {
283                 // Not a polling manager.
284                 continue;
285             }
286
287             if (isset($this->pollTimeouts[$index])) {
288                 $timeout = $this->pollTimeouts[$index];
289                 if (time() - $this->lastPoll[$index] < $timeout) {
290                     // Not time to poll yet.
291                     continue;
292                 }
293             } else {
294                 $timeout = 0;
295             }
296             $hit = $manager->poll();
297
298             $this->lastPoll[$index] = time();
299             if ($hit) {
300                 // Do the next poll quickly, there may be more input!
301                 $this->pollTimeouts[$index] = 0;
302             } else {
303                 // Empty queue. Exponential backoff up to the maximum poll interval.
304                 if ($timeout > 0) {
305                     $timeout = min($timeout * 2, $interval);
306                 } else {
307                     $timeout = 1;
308                 }
309                 $this->pollTimeouts[$index] = $timeout;
310             }
311         }
312     }
313
314     /**
315      * Called after each handled item or empty polling cycle.
316      * This is a good time to e.g. service your XMPP connection.
317      */
318     function idle()
319     {
320         foreach ($this->managers as $manager) {
321             $manager->idle();
322         }
323     }
324
325     /**
326      * Send thread state update to the monitoring server, if configured.
327      *
328      * @param string $state ('init', 'queue', 'shutdown' etc)
329      * @param string $substate (optional, eg queue name 'omb' 'sms' etc)
330      */
331     protected function logState($state, $substate='')
332     {
333         $this->monitor->logState($this->id, $state, $substate);
334     }
335
336     /**
337      * Send thread stats.
338      * Thread ID will be implicit; other owners can be listed as well
339      * for per-queue and per-site records.
340      *
341      * @param string $key counter name
342      * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01'
343      */
344     public function stats($key, $owners=array())
345     {
346         $owners[] = "thread:" . $this->id;
347         $this->monitor->stats($key, $owners);
348     }
349
350     /**
351      * For IoManagers to request a graceful shutdown at end of event loop.
352      */
353     public function requestShutdown()
354     {
355         $this->shutdown = true;
356         $this->respawn = false;
357     }
358
359     /**
360      * For IoManagers to request a graceful restart at end of event loop.
361      */
362     public function requestRestart()
363     {
364         $this->shutdown = true;
365         $this->respawn = true;
366     }
367
368 }
369