3 * StatusNet, the distributed open-source microblogging tool
5 * I/O manager to wrap around socket-reading and polling queue & connection managers.
9 * LICENCE: This program is free software: you can redistribute it and/or modify
10 * it under the terms of the GNU Affero General Public License as published by
11 * the Free Software Foundation, either version 3 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Affero General Public License for more details.
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see <http://www.gnu.org/licenses/>.
22 * @category QueueManager
24 * @author Brion Vibber <brion@status.net>
25 * @copyright 2009 StatusNet, Inc.
26 * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
27 * @link http://status.net/
30 abstract class IoMaster
34 protected $multiSite = false;
35 protected $managers = array();
36 protected $singletons = array();
38 protected $pollTimeouts = array();
39 protected $lastPoll = array();
41 public $shutdown = false; // Did we do a graceful shutdown?
42 public $respawn = true; // Should we respawn after shutdown?
45 * @param string $id process ID to use in logging/monitoring
47 public function __construct($id)
50 $this->monitor = new QueueMonitor();
53 public function init($multiSite=null)
55 if ($multiSite !== null) {
56 $this->multiSite = $multiSite;
58 if ($this->multiSite) {
59 $this->sites = StatusNet::findAllSites();
61 $this->sites = array(StatusNet::currentSite());
64 if (empty($this->sites)) {
65 throw new Exception("Empty status_network table, cannot init");
68 foreach ($this->sites as $site) {
69 StatusNet::switchSite($site);
70 $this->initManagers();
75 * Initialize IoManagers for the currently configured site
76 * which are appropriate to this instance.
78 * Pass class names into $this->instantiate()
80 abstract function initManagers();
83 * Instantiate an i/o manager class for the current site.
84 * If a multi-site capable handler is already present,
85 * we don't need to build a new one.
87 * @param mixed $manager class name (to run $class::get()) or object
89 protected function instantiate($manager)
91 if (is_string($manager)) {
92 $manager = call_user_func(array($class, 'get'));
95 $caps = $manager->multiSite();
96 if ($caps == IoManager::SINGLE_ONLY) {
97 if ($this->multiSite) {
98 throw new Exception("$class can't run with --all; aborting.");
100 } else if ($caps == IoManager::INSTANCE_PER_PROCESS) {
104 if (!in_array($manager, $this->managers, true)) {
105 // Only need to save singletons once
106 $this->managers[] = $manager;
113 * Initialize all io managers, then sit around waiting for input.
114 * Between events or timeouts, pass control back to idle() method
115 * to allow for any additional background processing.
119 $this->logState('init');
121 $this->checkMemory(false);
123 while (!$this->shutdown) {
124 $timeouts = array_values($this->pollTimeouts);
125 $timeouts[] = 60; // default max timeout
127 // Wait for something on one of our sockets
130 foreach ($this->managers as $manager) {
131 foreach ($manager->getSockets() as $socket) {
132 $sockets[] = $socket;
133 $managers[] = $manager;
135 $timeouts[] = intval($manager->timeout());
138 $timeout = min($timeouts);
143 $this->logState('listening');
144 common_log(LOG_DEBUG, "Waiting up to $timeout seconds for socket data...");
145 $ready = stream_select($read, $write, $except, $timeout, 0);
147 if ($ready === false) {
148 common_log(LOG_ERR, "Error selecting on sockets");
149 } else if ($ready > 0) {
150 foreach ($read as $socket) {
151 $index = array_search($socket, $sockets, true);
152 if ($index !== false) {
153 $this->logState('queue');
154 $managers[$index]->handleInput($socket);
156 common_log(LOG_ERR, "Saw input on a socket we didn't listen to");
162 if ($timeout > 0 && empty($sockets)) {
163 // If we had no listeners, sleep until the pollers' next requested wakeup.
164 common_log(LOG_DEBUG, "Sleeping $timeout seconds until next poll cycle...");
165 $this->logState('sleep');
169 $this->logState('poll');
172 $this->logState('idle');
175 $this->checkMemory();
178 $this->logState('shutdown');
183 * Check runtime memory usage, possibly triggering a graceful shutdown
184 * and thread respawn if we've crossed the soft limit.
186 * @param boolean $respawn if false we'll shut down instead of respawning
188 protected function checkMemory($respawn=true)
190 $memoryLimit = $this->softMemoryLimit();
191 if ($memoryLimit > 0) {
192 $usage = memory_get_usage();
193 if ($usage > $memoryLimit) {
194 common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
196 $this->requestRestart();
198 $this->requestShutdown();
200 } else if (common_config('queue', 'debug_memory')) {
201 $fmt = number_format($usage);
202 common_log(LOG_DEBUG, "Memory usage $fmt");
208 * Return fully-parsed soft memory limit in bytes.
209 * @return intval 0 or -1 if not set
211 function softMemoryLimit()
213 $softLimit = trim(common_config('queue', 'softlimit'));
214 if (substr($softLimit, -1) == '%') {
215 $limit = $this->parseMemoryLimit(ini_get('memory_limit'));
217 return intval(substr($softLimit, 0, -1) * $limit / 100);
222 return $this->parseMemoryLimit($softLimit);
228 * Interpret PHP shorthand for memory_limit and friends.
229 * Why don't they just expose the actual numeric value? :P
233 public function parseMemoryLimit($mem)
235 // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
236 $mem = strtolower(trim($mem));
237 $size = array('k' => 1024,
239 'g' => 1024*1024*1024);
242 } else if (is_numeric($mem)) {
245 $mult = substr($mem, -1);
246 if (isset($size[$mult])) {
247 return substr($mem, 0, -1) * $size[$mult];
256 foreach ($this->managers as $index => $manager) {
257 $manager->start($this);
258 // @fixme error check
259 if ($manager->pollInterval()) {
260 // We'll want to check for input on the first pass
261 $this->pollTimeouts[$index] = 0;
262 $this->lastPoll[$index] = 0;
269 foreach ($this->managers as $manager) {
271 // @fixme error check
276 * Called during the idle portion of the runloop to see which handlers
280 foreach ($this->managers as $index => $manager) {
281 $interval = $manager->pollInterval();
282 if ($interval <= 0) {
283 // Not a polling manager.
287 if (isset($this->pollTimeouts[$index])) {
288 $timeout = $this->pollTimeouts[$index];
289 if (time() - $this->lastPoll[$index] < $timeout) {
290 // Not time to poll yet.
296 $hit = $manager->poll();
298 $this->lastPoll[$index] = time();
300 // Do the next poll quickly, there may be more input!
301 $this->pollTimeouts[$index] = 0;
303 // Empty queue. Exponential backoff up to the maximum poll interval.
305 $timeout = min($timeout * 2, $interval);
309 $this->pollTimeouts[$index] = $timeout;
315 * Called after each handled item or empty polling cycle.
316 * This is a good time to e.g. service your XMPP connection.
320 foreach ($this->managers as $manager) {
326 * Send thread state update to the monitoring server, if configured.
328 * @param string $state ('init', 'queue', 'shutdown' etc)
329 * @param string $substate (optional, eg queue name 'omb' 'sms' etc)
331 protected function logState($state, $substate='')
333 $this->monitor->logState($this->id, $state, $substate);
338 * Thread ID will be implicit; other owners can be listed as well
339 * for per-queue and per-site records.
341 * @param string $key counter name
342 * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01'
344 public function stats($key, $owners=array())
346 $owners[] = "thread:" . $this->id;
347 $this->monitor->stats($key, $owners);
351 * For IoManagers to request a graceful shutdown at end of event loop.
353 public function requestShutdown()
355 $this->shutdown = true;
356 $this->respawn = false;
360 * For IoManagers to request a graceful restart at end of event loop.
362 public function requestRestart()
364 $this->shutdown = true;
365 $this->respawn = true;