]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - lib/iomaster.php
979f73e75381800ad67aaf4e3c3685cc3d7af4d1
[quix0rs-gnu-social.git] / lib / iomaster.php
1 <?php
2 /**
3  * StatusNet, the distributed open-source microblogging tool
4  *
5  * I/O manager to wrap around socket-reading and polling queue & connection managers.
6  *
7  * PHP version 5
8  *
9  * LICENCE: This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU Affero General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU Affero General Public License for more details.
18  *
19  * You should have received a copy of the GNU Affero General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  * @category  QueueManager
23  * @package   StatusNet
24  * @author    Brion Vibber <brion@status.net>
25  * @copyright 2009 StatusNet, Inc.
26  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
27  * @link      http://status.net/
28  */
29
30 class IoMaster
31 {
32     public $id;
33
34     protected $multiSite = false;
35     protected $includeGlobalSingletons = true;
36     protected $managers = array();
37     protected $singletons = array();
38
39     protected $pollTimeouts = array();
40     protected $lastPoll = array();
41
42     /**
43      * @param string $id process ID to use in logging/monitoring
44      */
45     public function __construct($id)
46     {
47         $this->id = $id;
48         $this->monitor = new QueueMonitor();
49     }
50
51     public function init($multiSite=null, $includeGlobalSingletons = true)
52     {
53         $this->includeGlobalSingletons = $includeGlobalSingletons;
54         if ($multiSite !== null) {
55             $this->multiSite = $multiSite;
56         }
57         if ($this->multiSite) {
58             $this->sites = $this->findAllSites();
59         } else {
60             $this->sites = array(common_config('site', 'server'));
61         }
62
63         if (empty($this->sites)) {
64             throw new Exception("Empty status_network table, cannot init");
65         }
66
67         foreach ($this->sites as $site) {
68             if ($site != common_config('site', 'server')) {
69                 StatusNet::init($site);
70             }
71
72             $classes = array();
73             if (Event::handle('StartIoManagerClasses', array(&$classes))) {
74                 $classes[] = 'QueueManager';
75                 if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
76                     $classes[] = 'XmppManager'; // handles pings/reconnects
77                     $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations
78                 }
79             }
80             Event::handle('EndIoManagerClasses', array(&$classes));
81
82             foreach ($classes as $class) {
83                 $this->instantiate($class);
84             }
85         }
86     }
87
88     /**
89      * Pull all local sites from status_network table.
90      * @return array of hostnames
91      */
92     protected function findAllSites()
93     {
94         $hosts = array();
95         $sn = new Status_network();
96         $sn->find();
97         while ($sn->fetch()) {
98             $hosts[] = $sn->hostname;
99         }
100         return $hosts;
101     }
102
103     /**
104      * Instantiate an i/o manager class for the current site.
105      * If a multi-site capable handler is already present,
106      * we don't need to build a new one.
107      *
108      * @param string $class
109      */
110     protected function instantiate($class)
111     {
112         if (is_string($class) && isset($this->singletons[$class])) {
113             // Already instantiated a multi-site-capable handler.
114             // Just let it know it should listen to this site too!
115             $this->singletons[$class]->addSite(common_config('site', 'server'));
116             return;
117         }
118
119         $manager = $this->getManager($class);
120
121         $caps = $manager->multiSite();
122         if ($this->multiSite) {
123             if ($caps == IoManager::SINGLE_ONLY) {
124                 throw new Exception("$class can't run with --all; aborting.");
125             }
126             if ($caps == IoManager::INSTANCE_PER_PROCESS ||
127                ( $this->includeGlobalSingletons && $caps == IoManager::GLOBAL_SINGLE_ONLY )) {
128                 // Save this guy for later!
129                 // We'll only need the one to cover multiple sites.
130                 if (is_string($class)){
131                     $this->singletons[$class] = $manager;
132                 }
133                 $manager->addSite(common_config('site', 'server'));
134             }
135         }
136
137         if( $this->includeGlobalSingletons || $caps != IoManager::GLOBAL_SINGLE_ONLY ) {
138             $this->managers[] = $manager;
139         }
140     }
141     
142     protected function getManager($class)
143     {
144         if(is_object($class)){
145             return $class;
146         }else{
147             return call_user_func(array($class, 'get'));
148         }
149     }
150
151     /**
152      * Basic run loop...
153      *
154      * Initialize all io managers, then sit around waiting for input.
155      * Between events or timeouts, pass control back to idle() method
156      * to allow for any additional background processing.
157      */
158     function service()
159     {
160         $this->logState('init');
161         $this->start();
162
163         while (true) {
164             $timeouts = array_values($this->pollTimeouts);
165             $timeouts[] = 60; // default max timeout
166
167             // Wait for something on one of our sockets
168             $sockets = array();
169             $managers = array();
170             foreach ($this->managers as $manager) {
171                 foreach ($manager->getSockets() as $socket) {
172                     $sockets[] = $socket;
173                     $managers[] = $manager;
174                 }
175                 $timeouts[] = intval($manager->timeout());
176             }
177
178             $timeout = min($timeouts);
179             if ($sockets) {
180                 $read = $sockets;
181                 $write = array();
182                 $except = array();
183                 $this->logState('listening');
184                 common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data...");
185                 $ready = stream_select($read, $write, $except, $timeout, 0);
186
187                 if ($ready === false) {
188                     common_log(LOG_ERR, "Error selecting on sockets");
189                 } else if ($ready > 0) {
190                     foreach ($read as $socket) {
191                         $index = array_search($socket, $sockets, true);
192                         if ($index !== false) {
193                             $this->logState('queue');
194                             $managers[$index]->handleInput($socket);
195                         } else {
196                             common_log(LOG_ERR, "Saw input on a socket we didn't listen to");
197                         }
198                     }
199                 }
200             }
201
202             if ($timeout > 0 && empty($sockets)) {
203                 // If we had no listeners, sleep until the pollers' next requested wakeup.
204                 common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle...");
205                 $this->logState('sleep');
206                 sleep($timeout);
207             }
208
209             $this->logState('poll');
210             $this->poll();
211
212             $this->logState('idle');
213             $this->idle();
214
215             $memoryLimit = $this->softMemoryLimit();
216             if ($memoryLimit > 0) {
217                 $usage = memory_get_usage();
218                 if ($usage > $memoryLimit) {
219                     common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
220                     break;
221                 }
222             }
223         }
224
225         $this->logState('shutdown');
226         $this->finish();
227     }
228
229     /**
230      * Return fully-parsed soft memory limit in bytes.
231      * @return intval 0 or -1 if not set
232      */
233     function softMemoryLimit()
234     {
235         $softLimit = trim(common_config('queue', 'softlimit'));
236         if (substr($softLimit, -1) == '%') {
237             $limit = trim(ini_get('memory_limit'));
238             $limit = $this->parseMemoryLimit($limit);
239             if ($limit > 0) {
240                 return intval(substr($softLimit, 0, -1) * $limit / 100);
241             } else {
242                 return -1;
243             }
244         } else {
245             return $this->parseMemoryLimit($softLimit);
246         }
247         return $softLimit;
248     }
249
250     /**
251      * Interpret PHP shorthand for memory_limit and friends.
252      * Why don't they just expose the actual numeric value? :P
253      * @param string $mem
254      * @return int
255      */
256     protected function parseMemoryLimit($mem)
257     {
258         // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
259         $size = array('k' => 1024,
260                       'm' => 1024*1024,
261                       'g' => 1024*1024*1024);
262         if (empty($mem)) {
263             return 0;
264         } else if (is_numeric($mem)) {
265             return intval($mem);
266         } else {
267             $mult = strtolower(substr($mem, -1));
268             if (isset($size[$mult])) {
269                 return substr($mem, 0, -1) * $size[$mult];
270             } else {
271                 return intval($mem);
272             }
273         }
274     }
275
276     function start()
277     {
278         foreach ($this->managers as $index => $manager) {
279             $manager->start($this);
280             // @fixme error check
281             if ($manager->pollInterval()) {
282                 // We'll want to check for input on the first pass
283                 $this->pollTimeouts[$index] = 0;
284                 $this->lastPoll[$index] = 0;
285             }
286         }
287     }
288
289     function finish()
290     {
291         foreach ($this->managers as $manager) {
292             $manager->finish();
293             // @fixme error check
294         }
295     }
296
297     /**
298      * Called during the idle portion of the runloop to see which handlers
299      */
300     function poll()
301     {
302         foreach ($this->managers as $index => $manager) {
303             $interval = $manager->pollInterval();
304             if ($interval <= 0) {
305                 // Not a polling manager.
306                 continue;
307             }
308
309             if (isset($this->pollTimeouts[$index])) {
310                 $timeout = $this->pollTimeouts[$index];
311                 if (time() - $this->lastPoll[$index] < $timeout) {
312                     // Not time to poll yet.
313                     continue;
314                 }
315             } else {
316                 $timeout = 0;
317             }
318             $hit = $manager->poll();
319
320             $this->lastPoll[$index] = time();
321             if ($hit) {
322                 // Do the next poll quickly, there may be more input!
323                 $this->pollTimeouts[$index] = 0;
324             } else {
325                 // Empty queue. Exponential backoff up to the maximum poll interval.
326                 if ($timeout > 0) {
327                     $timeout = min($timeout * 2, $interval);
328                 } else {
329                     $timeout = 1;
330                 }
331                 $this->pollTimeouts[$index] = $timeout;
332             }
333         }
334     }
335
336     /**
337      * Called after each handled item or empty polling cycle.
338      * This is a good time to e.g. service your XMPP connection.
339      */
340     function idle()
341     {
342         foreach ($this->managers as $manager) {
343             $manager->idle();
344         }
345     }
346
347     /**
348      * Send thread state update to the monitoring server, if configured.
349      *
350      * @param string $state ('init', 'queue', 'shutdown' etc)
351      * @param string $substate (optional, eg queue name 'omb' 'sms' etc)
352      */
353     protected function logState($state, $substate='')
354     {
355         $this->monitor->logState($this->id, $state, $substate);
356     }
357
358     /**
359      * Send thread stats.
360      * Thread ID will be implicit; other owners can be listed as well
361      * for per-queue and per-site records.
362      *
363      * @param string $key counter name
364      * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01'
365      */
366     public function stats($key, $owners=array())
367     {
368         $owners[] = "thread:" . $this->id;
369         $this->monitor->stats($key, $owners);
370     }
371 }
372