3 * StatusNet, the distributed open-source microblogging tool
5 * I/O manager to wrap around socket-reading and polling queue & connection managers.
9 * LICENCE: This program is free software: you can redistribute it and/or modify
10 * it under the terms of the GNU Affero General Public License as published by
11 * the Free Software Foundation, either version 3 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Affero General Public License for more details.
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see <http://www.gnu.org/licenses/>.
22 * @category QueueManager
24 * @author Brion Vibber <brion@status.net>
25 * @copyright 2009 StatusNet, Inc.
26 * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
27 * @link http://status.net/
34 protected $multiSite = false;
35 protected $includeGlobalSingletons = true;
36 protected $managers = array();
37 protected $singletons = array();
39 protected $pollTimeouts = array();
40 protected $lastPoll = array();
43 * @param string $id process ID to use in logging/monitoring
45 public function __construct($id)
48 $this->monitor = new QueueMonitor();
51 public function init($multiSite=null, $includeGlobalSingletons = true)
53 $this->includeGlobalSingletons = $includeGlobalSingletons;
54 if ($multiSite !== null) {
55 $this->multiSite = $multiSite;
57 if ($this->multiSite) {
58 $this->sites = $this->findAllSites();
60 $this->sites = array(common_config('site', 'server'));
63 if (empty($this->sites)) {
64 throw new Exception("Empty status_network table, cannot init");
67 foreach ($this->sites as $site) {
68 if ($site != common_config('site', 'server')) {
69 StatusNet::init($site);
73 if (Event::handle('StartIoManagerClasses', array(&$classes))) {
74 $classes[] = 'QueueManager';
75 if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
76 $classes[] = 'XmppManager'; // handles pings/reconnects
77 $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations
80 Event::handle('EndIoManagerClasses', array(&$classes));
82 foreach ($classes as $class) {
83 $this->instantiate($class);
89 * Pull all local sites from status_network table.
90 * @return array of hostnames
92 protected function findAllSites()
95 $sn = new Status_network();
97 while ($sn->fetch()) {
98 $hosts[] = $sn->hostname;
104 * Instantiate an i/o manager class for the current site.
105 * If a multi-site capable handler is already present,
106 * we don't need to build a new one.
108 * @param string $class
110 protected function instantiate($class)
112 if (is_string($class) && isset($this->singletons[$class])) {
113 // Already instantiated a multi-site-capable handler.
114 // Just let it know it should listen to this site too!
115 $this->singletons[$class]->addSite(common_config('site', 'server'));
119 $manager = $this->getManager($class);
121 $caps = $manager->multiSite();
122 if ($this->multiSite) {
123 if ($caps == IoManager::SINGLE_ONLY) {
124 throw new Exception("$class can't run with --all; aborting.");
126 if ($caps == IoManager::INSTANCE_PER_PROCESS ||
127 ( $this->includeGlobalSingletons && $caps == IoManager::GLOBAL_SINGLE_ONLY )) {
128 // Save this guy for later!
129 // We'll only need the one to cover multiple sites.
130 if (is_string($class)){
131 $this->singletons[$class] = $manager;
133 $manager->addSite(common_config('site', 'server'));
137 if( $this->includeGlobalSingletons || $caps != IoManager::GLOBAL_SINGLE_ONLY ) {
138 $this->managers[] = $manager;
142 protected function getManager($class)
144 if(is_object($class)){
147 return call_user_func(array($class, 'get'));
154 * Initialize all io managers, then sit around waiting for input.
155 * Between events or timeouts, pass control back to idle() method
156 * to allow for any additional background processing.
160 $this->logState('init');
164 $timeouts = array_values($this->pollTimeouts);
165 $timeouts[] = 60; // default max timeout
167 // Wait for something on one of our sockets
170 foreach ($this->managers as $manager) {
171 foreach ($manager->getSockets() as $socket) {
172 $sockets[] = $socket;
173 $managers[] = $manager;
175 $timeouts[] = intval($manager->timeout());
178 $timeout = min($timeouts);
183 $this->logState('listening');
184 common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data...");
185 $ready = stream_select($read, $write, $except, $timeout, 0);
187 if ($ready === false) {
188 common_log(LOG_ERR, "Error selecting on sockets");
189 } else if ($ready > 0) {
190 foreach ($read as $socket) {
191 $index = array_search($socket, $sockets, true);
192 if ($index !== false) {
193 $this->logState('queue');
194 $managers[$index]->handleInput($socket);
196 common_log(LOG_ERR, "Saw input on a socket we didn't listen to");
202 if ($timeout > 0 && empty($sockets)) {
203 // If we had no listeners, sleep until the pollers' next requested wakeup.
204 common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle...");
205 $this->logState('sleep');
209 $this->logState('poll');
212 $this->logState('idle');
215 $memoryLimit = $this->softMemoryLimit();
216 if ($memoryLimit > 0) {
217 $usage = memory_get_usage();
218 if ($usage > $memoryLimit) {
219 common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
225 $this->logState('shutdown');
230 * Return fully-parsed soft memory limit in bytes.
231 * @return intval 0 or -1 if not set
233 function softMemoryLimit()
235 $softLimit = trim(common_config('queue', 'softlimit'));
236 if (substr($softLimit, -1) == '%') {
237 $limit = trim(ini_get('memory_limit'));
238 $limit = $this->parseMemoryLimit($limit);
240 return intval(substr($softLimit, 0, -1) * $limit / 100);
245 return $this->parseMemoryLimit($softLimit);
251 * Interpret PHP shorthand for memory_limit and friends.
252 * Why don't they just expose the actual numeric value? :P
256 protected function parseMemoryLimit($mem)
258 // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
259 $size = array('k' => 1024,
261 'g' => 1024*1024*1024);
264 } else if (is_numeric($mem)) {
267 $mult = strtolower(substr($mem, -1));
268 if (isset($size[$mult])) {
269 return substr($mem, 0, -1) * $size[$mult];
278 foreach ($this->managers as $index => $manager) {
279 $manager->start($this);
280 // @fixme error check
281 if ($manager->pollInterval()) {
282 // We'll want to check for input on the first pass
283 $this->pollTimeouts[$index] = 0;
284 $this->lastPoll[$index] = 0;
291 foreach ($this->managers as $manager) {
293 // @fixme error check
298 * Called during the idle portion of the runloop to see which handlers
302 foreach ($this->managers as $index => $manager) {
303 $interval = $manager->pollInterval();
304 if ($interval <= 0) {
305 // Not a polling manager.
309 if (isset($this->pollTimeouts[$index])) {
310 $timeout = $this->pollTimeouts[$index];
311 if (time() - $this->lastPoll[$index] < $timeout) {
312 // Not time to poll yet.
318 $hit = $manager->poll();
320 $this->lastPoll[$index] = time();
322 // Do the next poll quickly, there may be more input!
323 $this->pollTimeouts[$index] = 0;
325 // Empty queue. Exponential backoff up to the maximum poll interval.
327 $timeout = min($timeout * 2, $interval);
331 $this->pollTimeouts[$index] = $timeout;
337 * Called after each handled item or empty polling cycle.
338 * This is a good time to e.g. service your XMPP connection.
342 foreach ($this->managers as $manager) {
348 * Send thread state update to the monitoring server, if configured.
350 * @param string $state ('init', 'queue', 'shutdown' etc)
351 * @param string $substate (optional, eg queue name 'omb' 'sms' etc)
353 protected function logState($state, $substate='')
355 $this->monitor->logState($this->id, $state, $substate);
360 * Thread ID will be implicit; other owners can be listed as well
361 * for per-queue and per-site records.
363 * @param string $key counter name
364 * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01'
366 public function stats($key, $owners=array())
368 $owners[] = "thread:" . $this->id;
369 $this->monitor->stats($key, $owners);