#!/usr/bin/env php . */ define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); $shortoptions = 'fi:at:'; $longoptions = array('id=', 'foreground', 'all', 'threads=', 'skip-xmpp', 'xmpp-only'); /** * Attempts to get a count of the processors available on the current system * to fan out multiple threads. * * Recognizes Linux and Mac OS X; others will return default of 1. * * @return intval */ function getProcessorCount() { $cpus = 0; switch (PHP_OS) { case 'Linux': $cpuinfo = file('/proc/cpuinfo'); foreach (file('/proc/cpuinfo') as $line) { if (preg_match('/^processor\s+:\s+(\d+)\s?$/', $line)) { $cpus++; } } break; case 'Darwin': $cpus = intval(shell_exec("/usr/sbin/sysctl -n hw.ncpu 2>/dev/null")); break; } if ($cpus) { return $cpus; } return 1; } $threads = getProcessorCount(); $helptext = << Spawn processing threads (default $threads) END_OF_QUEUE_HELP; require_once INSTALLDIR.'/scripts/commandline.inc'; require_once(INSTALLDIR.'/lib/daemon.php'); require_once(INSTALLDIR.'/classes/Queue_item.php'); require_once(INSTALLDIR.'/classes/Notice.php'); define('CLAIM_TIMEOUT', 1200); /** * Queue handling daemon... * * The queue daemon by default launches in the background, at which point * it'll pass control to the configured QueueManager class to poll for updates. * * We can then pass individual items through the QueueHandler subclasses * they belong to. */ class QueueDaemon extends Daemon { protected $allsites; protected $threads=1; function __construct($id=null, $daemonize=true, $threads=1, $allsites=false) { parent::__construct($daemonize); if ($id) { $this->set_id($id); } $this->all = $allsites; $this->threads = $threads; } /** * How many seconds a polling-based queue manager should wait between * checks for new items to handle. * * Defaults to 60 seconds; override to speed up or slow down. * * @return int timeout in seconds */ function timeout() { return 60; } function name() { return strtolower(get_class($this).'.'.$this->get_id()); } function run() { if ($this->threads > 1) { return $this->runThreads(); } else { return $this->runLoop(); } } function runThreads() { $children = array(); for ($i = 1; $i <= $this->threads; $i++) { $pid = pcntl_fork(); if ($pid < 0) { print "Couldn't fork for thread $i; aborting\n"; exit(1); } else if ($pid == 0) { $this->runChild($i); exit(0); } else { $this->log(LOG_INFO, "Spawned thread $i as pid $pid"); $children[$i] = $pid; } } $this->log(LOG_INFO, "Waiting for children to complete."); while (count($children) > 0) { $status = null; $pid = pcntl_wait($status); if ($pid > 0) { $i = array_search($pid, $children); if ($i === false) { $this->log(LOG_ERR, "Unrecognized child pid $pid exited!"); continue; } unset($children[$i]); $this->log(LOG_INFO, "Thread $i pid $pid exited."); $pid = pcntl_fork(); if ($pid < 0) { print "Couldn't fork to respawn thread $i; aborting thread.\n"; } else if ($pid == 0) { $this->runChild($i); exit(0); } else { $this->log(LOG_INFO, "Respawned thread $i as pid $pid"); $children[$i] = $pid; } } } $this->log(LOG_INFO, "All child processes complete."); return true; } function runChild($thread) { $this->set_id($this->get_id() . "." . $thread); $this->resetDb(); $this->runLoop(); } /** * Reconnect to the database for each child process, * or they'll get very confused trying to use the * same socket. */ function resetDb() { // @fixme do we need to explicitly open the db too // or is this implied? global $_DB_DATAOBJECT; unset($_DB_DATAOBJECT['CONNECTIONS']); // Reconnect main memcached, or threads will stomp on // each other and corrupt their requests. $cache = common_memcache(); if ($cache) { $cache->reconnect(); } // Also reconnect memcached for status_network table. if (!empty(Status_network::$cache)) { Status_network::$cache->close(); Status_network::$cache = null; } } /** * Setup and start of run loop for this queue handler as a daemon. * Most of the heavy lifting is passed on to the QueueManager's service() * method, which passes control on to the QueueHandler's handle_notice() * method for each notice that comes in on the queue. * * Most of the time this won't need to be overridden in a subclass. * * @return boolean true on success, false on failure */ function runLoop() { $this->log(LOG_INFO, 'checking for queued notices'); $master = new IoMaster($this->get_id()); $master->init($this->all); $master->service(); $this->log(LOG_INFO, 'finished servicing the queue'); $this->log(LOG_INFO, 'terminating normally'); return true; } function log($level, $msg) { common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg); } } if (have_option('i')) { $id = get_option_value('i'); } else if (have_option('--id')) { $id = get_option_value('--id'); } else if (count($args) > 0) { $id = $args[0]; } else { $id = null; } if (have_option('t')) { $threads = intval(get_option_value('t')); } else if (have_option('--threads')) { $threads = intval(get_option_value('--threads')); } else { $threads = 0; } if (!$threads) { $threads = getProcessorCount(); } $daemonize = !(have_option('f') || have_option('--foreground')); $all = have_option('a') || have_option('--all'); if (have_option('--skip-xmpp')) { define('XMPP_EMERGENCY_FLAG', true); } if (have_option('--xmpp-only')) { define('XMPP_ONLY_FLAG', true); } $daemon = new QueueDaemon($id, $daemonize, $threads, $all); $daemon->runOnce();