]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - lib/iomaster.php
Use new StatusNetwork->serverName() to get full domain for wildcard config until...
[quix0rs-gnu-social.git] / lib / iomaster.php
1 <?php
2 /**
3  * StatusNet, the distributed open-source microblogging tool
4  *
5  * I/O manager to wrap around socket-reading and polling queue & connection managers.
6  *
7  * PHP version 5
8  *
9  * LICENCE: This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU Affero General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU Affero General Public License for more details.
18  *
19  * You should have received a copy of the GNU Affero General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  * @category  QueueManager
23  * @package   StatusNet
24  * @author    Brion Vibber <brion@status.net>
25  * @copyright 2009 StatusNet, Inc.
26  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
27  * @link      http://status.net/
28  */
29
30 abstract class IoMaster
31 {
32     public $id;
33
34     protected $multiSite = false;
35     protected $managers = array();
36     protected $singletons = array();
37
38     protected $pollTimeouts = array();
39     protected $lastPoll = array();
40
41     /**
42      * @param string $id process ID to use in logging/monitoring
43      */
44     public function __construct($id)
45     {
46         $this->id = $id;
47         $this->monitor = new QueueMonitor();
48     }
49
50     public function init($multiSite=null)
51     {
52         if ($multiSite !== null) {
53             $this->multiSite = $multiSite;
54         }
55         if ($this->multiSite) {
56             $this->sites = $this->findAllSites();
57         } else {
58             $this->sites = array(common_config('site', 'server'));
59         }
60
61         if (empty($this->sites)) {
62             throw new Exception("Empty status_network table, cannot init");
63         }
64
65         foreach ($this->sites as $site) {
66             if ($site != common_config('site', 'server')) {
67                 StatusNet::init($site);
68             }
69             $this->initManagers();
70         }
71     }
72
73     /**
74      * Initialize IoManagers for the currently configured site
75      * which are appropriate to this instance.
76      *
77      * Pass class names into $this->instantiate()
78      */
79     abstract function initManagers();
80
81     /**
82      * Pull all local sites from status_network table.
83      * @return array of hostnames
84      */
85     protected function findAllSites()
86     {
87         $hosts = array();
88         $sn = new Status_network();
89         $sn->find();
90         while ($sn->fetch()) {
91             $hosts[] = $sn->getServerName();
92         }
93         return $hosts;
94     }
95
96     /**
97      * Instantiate an i/o manager class for the current site.
98      * If a multi-site capable handler is already present,
99      * we don't need to build a new one.
100      *
101      * @param string $class
102      */
103     protected function instantiate($class)
104     {
105         if (isset($this->singletons[$class])) {
106             // Already instantiated a multi-site-capable handler.
107             // Just let it know it should listen to this site too!
108             $this->singletons[$class]->addSite(common_config('site', 'server'));
109             return;
110         }
111
112         $manager = $this->getManager($class);
113
114         if ($this->multiSite) {
115             $caps = $manager->multiSite();
116             if ($caps == IoManager::SINGLE_ONLY) {
117                 throw new Exception("$class can't run with --all; aborting.");
118             }
119             if ($caps == IoManager::INSTANCE_PER_PROCESS) {
120                 // Save this guy for later!
121                 // We'll only need the one to cover multiple sites.
122                 $this->singletons[$class] = $manager;
123                 $manager->addSite(common_config('site', 'server'));
124             }
125         }
126
127         $this->managers[] = $manager;
128     }
129     
130     protected function getManager($class)
131     {
132         return call_user_func(array($class, 'get'));
133     }
134
135     /**
136      * Basic run loop...
137      *
138      * Initialize all io managers, then sit around waiting for input.
139      * Between events or timeouts, pass control back to idle() method
140      * to allow for any additional background processing.
141      */
142     function service()
143     {
144         $this->logState('init');
145         $this->start();
146
147         while (true) {
148             $timeouts = array_values($this->pollTimeouts);
149             $timeouts[] = 60; // default max timeout
150
151             // Wait for something on one of our sockets
152             $sockets = array();
153             $managers = array();
154             foreach ($this->managers as $manager) {
155                 foreach ($manager->getSockets() as $socket) {
156                     $sockets[] = $socket;
157                     $managers[] = $manager;
158                 }
159                 $timeouts[] = intval($manager->timeout());
160             }
161
162             $timeout = min($timeouts);
163             if ($sockets) {
164                 $read = $sockets;
165                 $write = array();
166                 $except = array();
167                 $this->logState('listening');
168                 common_log(LOG_DEBUG, "Waiting up to $timeout seconds for socket data...");
169                 $ready = stream_select($read, $write, $except, $timeout, 0);
170
171                 if ($ready === false) {
172                     common_log(LOG_ERR, "Error selecting on sockets");
173                 } else if ($ready > 0) {
174                     foreach ($read as $socket) {
175                         $index = array_search($socket, $sockets, true);
176                         if ($index !== false) {
177                             $this->logState('queue');
178                             $managers[$index]->handleInput($socket);
179                         } else {
180                             common_log(LOG_ERR, "Saw input on a socket we didn't listen to");
181                         }
182                     }
183                 }
184             }
185
186             if ($timeout > 0 && empty($sockets)) {
187                 // If we had no listeners, sleep until the pollers' next requested wakeup.
188                 common_log(LOG_DEBUG, "Sleeping $timeout seconds until next poll cycle...");
189                 $this->logState('sleep');
190                 sleep($timeout);
191             }
192
193             $this->logState('poll');
194             $this->poll();
195
196             $this->logState('idle');
197             $this->idle();
198
199             $memoryLimit = $this->softMemoryLimit();
200             if ($memoryLimit > 0) {
201                 $usage = memory_get_usage();
202                 if ($usage > $memoryLimit) {
203                     common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
204                     break;
205                 } else if (common_config('queue', 'debug_memory')) {
206                     common_log(LOG_DEBUG, "Memory usage $usage");
207                 }
208             }
209         }
210
211         $this->logState('shutdown');
212         $this->finish();
213     }
214
215     /**
216      * Return fully-parsed soft memory limit in bytes.
217      * @return intval 0 or -1 if not set
218      */
219     function softMemoryLimit()
220     {
221         $softLimit = trim(common_config('queue', 'softlimit'));
222         if (substr($softLimit, -1) == '%') {
223             $limit = $this->parseMemoryLimit(ini_get('memory_limit'));
224             if ($limit > 0) {
225                 return intval(substr($softLimit, 0, -1) * $limit / 100);
226             } else {
227                 return -1;
228             }
229         } else {
230             return $this->parseMemoryLimit($softLimit);
231         }
232         return $softLimit;
233     }
234
235     /**
236      * Interpret PHP shorthand for memory_limit and friends.
237      * Why don't they just expose the actual numeric value? :P
238      * @param string $mem
239      * @return int
240      */
241     public function parseMemoryLimit($mem)
242     {
243         // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
244         $mem = strtolower(trim($mem));
245         $size = array('k' => 1024,
246                       'm' => 1024*1024,
247                       'g' => 1024*1024*1024);
248         if (empty($mem)) {
249             return 0;
250         } else if (is_numeric($mem)) {
251             return intval($mem);
252         } else {
253             $mult = substr($mem, -1);
254             if (isset($size[$mult])) {
255                 return substr($mem, 0, -1) * $size[$mult];
256             } else {
257                 return intval($mem);
258             }
259         }
260     }
261
262     function start()
263     {
264         foreach ($this->managers as $index => $manager) {
265             $manager->start($this);
266             // @fixme error check
267             if ($manager->pollInterval()) {
268                 // We'll want to check for input on the first pass
269                 $this->pollTimeouts[$index] = 0;
270                 $this->lastPoll[$index] = 0;
271             }
272         }
273     }
274
275     function finish()
276     {
277         foreach ($this->managers as $manager) {
278             $manager->finish();
279             // @fixme error check
280         }
281     }
282
283     /**
284      * Called during the idle portion of the runloop to see which handlers
285      */
286     function poll()
287     {
288         foreach ($this->managers as $index => $manager) {
289             $interval = $manager->pollInterval();
290             if ($interval <= 0) {
291                 // Not a polling manager.
292                 continue;
293             }
294
295             if (isset($this->pollTimeouts[$index])) {
296                 $timeout = $this->pollTimeouts[$index];
297                 if (time() - $this->lastPoll[$index] < $timeout) {
298                     // Not time to poll yet.
299                     continue;
300                 }
301             } else {
302                 $timeout = 0;
303             }
304             $hit = $manager->poll();
305
306             $this->lastPoll[$index] = time();
307             if ($hit) {
308                 // Do the next poll quickly, there may be more input!
309                 $this->pollTimeouts[$index] = 0;
310             } else {
311                 // Empty queue. Exponential backoff up to the maximum poll interval.
312                 if ($timeout > 0) {
313                     $timeout = min($timeout * 2, $interval);
314                 } else {
315                     $timeout = 1;
316                 }
317                 $this->pollTimeouts[$index] = $timeout;
318             }
319         }
320     }
321
322     /**
323      * Called after each handled item or empty polling cycle.
324      * This is a good time to e.g. service your XMPP connection.
325      */
326     function idle()
327     {
328         foreach ($this->managers as $manager) {
329             $manager->idle();
330         }
331     }
332
333     /**
334      * Send thread state update to the monitoring server, if configured.
335      *
336      * @param string $state ('init', 'queue', 'shutdown' etc)
337      * @param string $substate (optional, eg queue name 'omb' 'sms' etc)
338      */
339     protected function logState($state, $substate='')
340     {
341         $this->monitor->logState($this->id, $state, $substate);
342     }
343
344     /**
345      * Send thread stats.
346      * Thread ID will be implicit; other owners can be listed as well
347      * for per-queue and per-site records.
348      *
349      * @param string $key counter name
350      * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01'
351      */
352     public function stats($key, $owners=array())
353     {
354         $owners[] = "thread:" . $this->id;
355         $this->monitor->stats($key, $owners);
356     }
357 }
358