From ec145b73fc91dd54695dd374c8a71a11e233b8c0 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Tue, 12 Jan 2010 19:57:15 -0800 Subject: [PATCH] Major refactoring of queue handlers to support running multiple sites in one daemon. Key changes: * Initialization code moved from common.php to StatusNet class; can now switch configurations during runtime. * As a consequence, configuration files must now be idempotent... Be careful with constant, function or class definitions. * Control structure for daemons/QueueManager/QueueHandler has been refactored; the run loop is now managed by IoMaster run via scripts/queuedaemon.php IoManager subclasses are woken to handle socket input or polling, and may cover multiple sites. * Plugins can implement notice queue handlers more easily by registering a QueueHandler class; no more need to add a daemon. The new QueueDaemon runs from scripts/queuedaemon.php: * This replaces most of the old *handler.php scripts; they've been refactored to the bare handler classes. * Spawns multiple child processes to spread load; defaults to CPU count on Linux and Mac OS X systems, or override with --threads=N * When multithreaded, child processes are automatically respawned on failure. * Threads gracefully shut down and restart when passing a soft memory limit (defaults to 90% of memory_limit), limiting damage from memory leaks. * Support for UDP-based monitoring: http://www.gitorious.org/snqmon Rough control flow diagram: QueueDaemon -> IoMaster -> IoManager QueueManager [listen or poll] -> QueueHandler XmppManager [ping & keepalive] XmppConfirmManager [poll updates] Todo: * Respawning features not currently available running single-threaded. * When running single-site, configuration changes aren't picked up. * New sites or config changes affecting queue subscriptions are not yet handled without a daemon restart. * SNMP monitoring output to integrate with general tools (nagios, ganglia) * Convert XMPP confirmation message sends to use stomp queue instead of polling * Convert xmppdaemon.php to IoManager? * Convert Twitter status, friends import polling daemons to IoManager * Clean up some error reporting and failure modes * May need to adjust queue priorities for best perf in backlog/flood cases Detailed code history available in my daemon-work branch: http://www.gitorious.org/~brion/statusnet/brion-fixes/commits/daemon-work --- classes/Memcached_DataObject.php | 23 ++ classes/Queue_item.php | 9 +- classes/Status_network.php | 23 +- lib/cache.php | 19 + lib/common.php | 194 +--------- lib/dbqueuemanager.php | 161 ++++---- lib/default.php | 2 + lib/event.php | 8 + lib/iomanager.php | 193 ++++++++++ lib/iomaster.php | 361 ++++++++++++++++++ lib/jabber.php | 25 +- {scripts => lib}/jabberqueuehandler.php | 49 +-- lib/liberalstomp.php | 133 +++++++ {scripts => lib}/ombqueuehandler.php | 57 +-- lib/pingqueuehandler.php | 37 ++ {scripts => lib}/pluginqueuehandler.php | 48 +-- {scripts => lib}/publicqueuehandler.php | 49 +-- lib/queuehandler.php | 100 ++--- lib/queuemanager.php | 149 +++++++- lib/queuemonitor.php | 116 ++++++ {scripts => lib}/smsqueuehandler.php | 48 +-- lib/statusnet.php | 295 ++++++++++++++ lib/stompqueuemanager.php | 286 ++++++++++---- lib/unqueuemanager.php | 58 +-- lib/util.php | 5 +- lib/xmppconfirmmanager.php | 168 ++++++++ lib/xmppmanager.php | 273 +++++++++++++ lib/xmppqueuehandler.php | 142 ------- plugins/Enjit/README | 5 + .../Enjit}/enjitqueuehandler.php | 44 +-- plugins/Facebook/FacebookPlugin.php | 46 +-- plugins/Facebook/facebookqueuehandler.php | 52 +-- plugins/MemcachePlugin.php | 17 + plugins/TwitterBridge/TwitterBridgePlugin.php | 50 +-- .../{daemons => }/twitterqueuehandler.php | 40 +- scripts/getvaliddaemons.php | 11 +- ...{pingqueuehandler.php => handlequeued.php} | 59 ++- scripts/queuedaemon.php | 265 +++++++++++++ scripts/xmppconfirmhandler.php | 161 -------- 39 files changed, 2595 insertions(+), 1186 deletions(-) create mode 100644 lib/iomanager.php create mode 100644 lib/iomaster.php rename {scripts => lib}/jabberqueuehandler.php (52%) mode change 100755 => 100644 create mode 100644 lib/liberalstomp.php rename {scripts => lib}/ombqueuehandler.php (57%) mode change 100755 => 100644 create mode 100644 lib/pingqueuehandler.php rename {scripts => lib}/pluginqueuehandler.php (61%) mode change 100755 => 100644 rename {scripts => lib}/publicqueuehandler.php (52%) mode change 100755 => 100644 create mode 100644 lib/queuemonitor.php rename {scripts => lib}/smsqueuehandler.php (54%) mode change 100755 => 100644 create mode 100644 lib/statusnet.php create mode 100644 lib/xmppconfirmmanager.php create mode 100644 lib/xmppmanager.php delete mode 100644 lib/xmppqueuehandler.php create mode 100644 plugins/Enjit/README rename {scripts => plugins/Enjit}/enjitqueuehandler.php (79%) mode change 100755 => 100644 mode change 100755 => 100644 plugins/Facebook/facebookqueuehandler.php rename plugins/TwitterBridge/{daemons => }/twitterqueuehandler.php (57%) mode change 100755 => 100644 rename scripts/{pingqueuehandler.php => handlequeued.php} (52%) create mode 100755 scripts/queuedaemon.php delete mode 100755 scripts/xmppconfirmhandler.php diff --git a/classes/Memcached_DataObject.php b/classes/Memcached_DataObject.php index 9c9994ceac..4ecab9db62 100644 --- a/classes/Memcached_DataObject.php +++ b/classes/Memcached_DataObject.php @@ -331,6 +331,29 @@ class Memcached_DataObject extends DB_DataObject $exists = false; } + // @fixme horrible evil hack! + // + // In multisite configuration we don't want to keep around a separate + // connection for every database; we could end up with thousands of + // connections open per thread. In an ideal world we might keep + // a connection per server and select different databases, but that'd + // be reliant on having the same db username/pass as well. + // + // MySQL connections are cheap enough we're going to try just + // closing out the old connection and reopening when we encounter + // a new DSN. + // + // WARNING WARNING if we end up actually using multiple DBs at a time + // we'll need some fancier logic here. + if (!$exists && !empty($_DB_DATAOBJECT['CONNECTIONS'])) { + foreach ($_DB_DATAOBJECT['CONNECTIONS'] as $index => $conn) { + if (!empty($conn)) { + $conn->disconnect(); + } + unset($_DB_DATAOBJECT['CONNECTIONS'][$index]); + } + } + $result = parent::_connect(); if ($result && !$exists) { diff --git a/classes/Queue_item.php b/classes/Queue_item.php index 9c673540d7..cf805a6060 100644 --- a/classes/Queue_item.php +++ b/classes/Queue_item.php @@ -25,10 +25,12 @@ class Queue_item extends Memcached_DataObject function sequenceKey() { return array(false, false); } - static function top($transport) { + static function top($transport=null) { $qi = new Queue_item(); - $qi->transport = $transport; + if ($transport) { + $qi->transport = $transport; + } $qi->orderBy('created'); $qi->whereAdd('claimed is null'); @@ -40,7 +42,8 @@ class Queue_item extends Memcached_DataObject # XXX: potential race condition # can we force it to only update if claimed is still null # (or old)? - common_log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id . ' for transport ' . $transport); + common_log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id . + ' for transport ' . $qi->transport); $orig = clone($qi); $qi->claimed = common_sql_now(); $result = $qi->update($orig); diff --git a/classes/Status_network.php b/classes/Status_network.php index 776f6abb03..ef8e1ed431 100644 --- a/classes/Status_network.php +++ b/classes/Status_network.php @@ -49,6 +49,13 @@ class Status_network extends DB_DataObject static $cache = null; static $base = null; + /** + * @param string $dbhost + * @param string $dbuser + * @param string $dbpass + * @param string $dbname + * @param array $servers memcached servers to use for caching config info + */ static function setupDB($dbhost, $dbuser, $dbpass, $dbname, $servers) { global $config; @@ -60,12 +67,17 @@ class Status_network extends DB_DataObject if (class_exists('Memcache')) { self::$cache = new Memcache(); + // Can't close persistent connections, making forking painful. + // + // @fixme only do this in *parent* CLI processes. + // single-process and child-processes *should* use persistent. + $persist = php_sapi_name() != 'cli'; if (is_array($servers)) { foreach($servers as $server) { - self::$cache->addServer($server); + self::$cache->addServer($server, 11211, $persist); } } else { - self::$cache->addServer($servers); + self::$cache->addServer($servers, 11211, $persist); } } @@ -89,7 +101,7 @@ class Status_network extends DB_DataObject if (empty($sn)) { $sn = self::staticGet($k, $v); if (!empty($sn)) { - self::$cache->set($ck, $sn); + self::$cache->set($ck, clone($sn)); } } @@ -121,6 +133,11 @@ class Status_network extends DB_DataObject return parent::delete(); } + /** + * @param string $servername hostname + * @param string $pathname URL base path + * @param string $wildcard hostname suffix to match wildcard config + */ static function setupSite($servername, $pathname, $wildcard) { global $config; diff --git a/lib/cache.php b/lib/cache.php index b7b34c0500..635c96ad4c 100644 --- a/lib/cache.php +++ b/lib/cache.php @@ -179,4 +179,23 @@ class Cache return $success; } + + /** + * Close or reconnect any remote connections, such as to give + * daemon processes a chance to reconnect on a fresh socket. + * + * @return boolean success flag + */ + + function reconnect() + { + $success = false; + + if (Event::handle('StartCacheReconnect', array(&$success))) { + $success = true; + Event::handle('EndCacheReconnect', array()); + } + + return $success; + } } diff --git a/lib/common.php b/lib/common.php index b280afec02..61decebb7a 100644 --- a/lib/common.php +++ b/lib/common.php @@ -76,159 +76,14 @@ require_once(INSTALLDIR.'/lib/language.php'); require_once(INSTALLDIR.'/lib/event.php'); require_once(INSTALLDIR.'/lib/plugin.php'); -function _sn_to_path($sn) -{ - $past_root = substr($sn, 1); - $last_slash = strrpos($past_root, '/'); - if ($last_slash > 0) { - $p = substr($past_root, 0, $last_slash); - } else { - $p = ''; - } - return $p; -} - -// Save our sanity when code gets loaded through subroutines such as PHPUnit tests -global $default, $config, $_server, $_path; - -// try to figure out where we are. $server and $path -// can be set by including module, else we guess based -// on HTTP info. - -if (isset($server)) { - $_server = $server; -} else { - $_server = array_key_exists('SERVER_NAME', $_SERVER) ? - strtolower($_SERVER['SERVER_NAME']) : - null; -} - -if (isset($path)) { - $_path = $path; -} else { - $_path = (array_key_exists('SERVER_NAME', $_SERVER) && array_key_exists('SCRIPT_NAME', $_SERVER)) ? - _sn_to_path($_SERVER['SCRIPT_NAME']) : - null; -} - -require_once(INSTALLDIR.'/lib/default.php'); - -// Set config values initially to default values - -$config = $default; - -// default configuration, overwritten in config.php - -$config['db'] = &PEAR::getStaticProperty('DB_DataObject','options'); - -$config['db'] = $default['db']; - -// Backward compatibility - -$config['site']['design'] =& $config['design']; - -if (function_exists('date_default_timezone_set')) { - /* Work internally in UTC */ - date_default_timezone_set('UTC'); -} - function addPlugin($name, $attrs = null) { - $name = ucfirst($name); - $pluginclass = "{$name}Plugin"; - - if (!class_exists($pluginclass)) { - - $files = array("local/plugins/{$pluginclass}.php", - "local/plugins/{$name}/{$pluginclass}.php", - "local/{$pluginclass}.php", - "local/{$name}/{$pluginclass}.php", - "plugins/{$pluginclass}.php", - "plugins/{$name}/{$pluginclass}.php"); - - foreach ($files as $file) { - $fullpath = INSTALLDIR.'/'.$file; - if (@file_exists($fullpath)) { - include_once($fullpath); - break; - } - } - } - - $inst = new $pluginclass(); - - if (!empty($attrs)) { - foreach ($attrs as $aname => $avalue) { - $inst->$aname = $avalue; - } - } - return $inst; -} - -// From most general to most specific: -// server-wide, then vhost-wide, then for a path, -// finally for a dir (usually only need one of the last two). - -if (isset($conffile)) { - $_config_files = array($conffile); -} else { - $_config_files = array('/etc/statusnet/statusnet.php', - '/etc/statusnet/laconica.php', - '/etc/laconica/laconica.php', - '/etc/statusnet/'.$_server.'.php', - '/etc/laconica/'.$_server.'.php'); - - if (strlen($_path) > 0) { - $_config_files[] = '/etc/statusnet/'.$_server.'_'.$_path.'.php'; - $_config_files[] = '/etc/laconica/'.$_server.'_'.$_path.'.php'; - } - - $_config_files[] = INSTALLDIR.'/config.php'; -} - -global $_have_a_config; -$_have_a_config = false; - -foreach ($_config_files as $_config_file) { - if (@file_exists($_config_file)) { - include_once($_config_file); - $_have_a_config = true; - } + return StatusNet::addPlugin($name, $attrs); } function _have_config() { - global $_have_a_config; - return $_have_a_config; -} - -// XXX: Throw a conniption if database not installed -// XXX: Find a way to use htmlwriter for this instead of handcoded markup -if (!_have_config()) { - echo '

'. _('No configuration file found. ') .'

'; - echo '

'. _('I looked for configuration files in the following places: ') .'
'. implode($_config_files, '
'); - echo '

'. _('You may wish to run the installer to fix this.') .'

'; - echo ''. _('Go to the installer.') .''; - exit; -} -// Fixup for statusnet.ini - -$_db_name = substr($config['db']['database'], strrpos($config['db']['database'], '/') + 1); - -if ($_db_name != 'statusnet' && !array_key_exists('ini_'.$_db_name, $config['db'])) { - $config['db']['ini_'.$_db_name] = INSTALLDIR.'/classes/statusnet.ini'; -} - -// Backwards compatibility - -if (array_key_exists('memcached', $config)) { - if ($config['memcached']['enabled']) { - addPlugin('Memcache', array('servers' => $config['memcached']['server'])); - } - - if (!empty($config['memcached']['base'])) { - $config['cache']['base'] = $config['memcached']['base']; - } + return StatusNet::haveConfig(); } function __autoload($cls) @@ -247,27 +102,6 @@ function __autoload($cls) } } -// Load default plugins - -foreach ($config['plugins']['default'] as $name => $params) { - if (is_null($params)) { - addPlugin($name); - } else if (is_array($params)) { - if (count($params) == 0) { - addPlugin($name); - } else { - $keys = array_keys($params); - if (is_string($keys[0])) { - addPlugin($name, $params); - } else { - foreach ($params as $paramset) { - addPlugin($name, $paramset); - } - } - } - } -} - // XXX: how many of these could be auto-loaded on use? // XXX: note that these files should not use config options // at compile time since DB config options are not yet loaded. @@ -283,20 +117,20 @@ require_once INSTALLDIR.'/lib/subs.php'; require_once INSTALLDIR.'/lib/clientexception.php'; require_once INSTALLDIR.'/lib/serverexception.php'; -// Load settings from database; note we need autoload for this - -Config::loadSettings(); - -// XXX: if plugins should check the schema at runtime, do that here. - -if ($config['db']['schemacheck'] == 'runtime') { - Event::handle('CheckSchema'); +try { + StatusNet::init(@$server, @$path, @$conffile); +} catch (NoConfigException $e) { + // XXX: Throw a conniption if database not installed + // XXX: Find a way to use htmlwriter for this instead of handcoded markup + echo '

'. _('No configuration file found. ') .'

'; + echo '

'. _('I looked for configuration files in the following places: ') .'
'; + echo implode($e->configFiles, '
'); + echo '

'. _('You may wish to run the installer to fix this.') .'

'; + echo ''. _('Go to the installer.') .''; + exit; } + // XXX: other formats here define('NICKNAME_FMT', VALIDATE_NUM.VALIDATE_ALPHA_LOWER); - -// Give plugins a chance to initialize in a fully-prepared environment - -Event::handle('InitializePlugin'); diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index 750300928e..a5c6fd28b4 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -22,16 +22,20 @@ * @category QueueManager * @package StatusNet * @author Evan Prodromou - * @copyright 2009 StatusNet, Inc. + * @author Brion Vibber + * @copyright 2009-2010 StatusNet, Inc. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 * @link http://status.net/ */ class DBQueueManager extends QueueManager { - var $qis = array(); - - function enqueue($object, $queue) + /** + * Saves a notice object reference into the queue item table. + * @return boolean true on success + * @throws ServerException on failure + */ + public function enqueue($object, $queue) { $notice = $object; @@ -47,70 +51,95 @@ class DBQueueManager extends QueueManager throw new ServerException('DB error inserting queue item'); } + $this->stats('enqueued', $queue); + return true; } - function service($queue, $handler) + /** + * Poll every minute for new events during idle periods. + * We'll look in more often when there's data available. + * + * @return int seconds + */ + public function pollInterval() + { + return 60; + } + + /** + * Run a polling cycle during idle processing in the input loop. + * @return boolean true if we had a hit + */ + public function poll() { - while (true) { - $this->_log(LOG_DEBUG, 'Checking for notices...'); - $timeout = $handler->timeout(); - $notice = $this->_nextItem($queue, $timeout); - if (empty($notice)) { - $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); - // Nothing in the queue. Do you - // have other tasks, like servicing your - // XMPP connection, to do? - $handler->idle(QUEUE_HANDLER_MISS_IDLE); + $this->_log(LOG_DEBUG, 'Checking for notices...'); + $item = $this->_nextItem(); + if ($item === false) { + $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); + return false; + } + if ($item === true) { + // We dequeued an entry for a deleted or invalid notice. + // Consider it a hit for poll rate purposes. + return true; + } + + list($queue, $notice) = $item; + $this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue); + + // Yay! Got one! + $handler = $this->getHandler($queue); + if ($handler) { + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice"); + $this->_done($notice, $queue); } else { - $this->_log(LOG_INFO, 'Got notice '. $notice->id); - // Yay! Got one! - if ($handler->handle_notice($notice)) { - $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); - $this->_done($notice, $queue); - } else { - $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id); - $this->_fail($notice, $queue); - } - // Chance to e.g. service your XMPP connection - $this->_log(LOG_DEBUG, 'Idling after success.'); - $handler->idle(QUEUE_HANDLER_HIT_IDLE); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice"); + $this->_fail($notice, $queue); } - // XXX: when do we give up? + } else { + $this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue"); + $this->_fail($notice, $queue); } + return true; } - function _nextItem($queue, $timeout=null) + /** + * Pop the oldest unclaimed item off the queue set and claim it. + * + * @return mixed false if no items; true if bogus hit; otherwise array(string, Notice) + * giving the queue transport name. + */ + protected function _nextItem() { $start = time(); $result = null; - $sleeptime = 1; + $qi = Queue_item::top(); + if (empty($qi)) { + return false; + } - do { - $qi = Queue_item::top($queue); - if (empty($qi)) { - $this->_log(LOG_DEBUG, "No new queue items, sleeping $sleeptime seconds."); - sleep($sleeptime); - $sleeptime *= 2; - } else { - $notice = Notice::staticGet('id', $qi->notice_id); - if (!empty($notice)) { - $result = $notice; - } else { - $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id); - $qi->delete(); - $qi->free(); - $qi = null; - } - $sleeptime = 1; - } - } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout)); + $queue = $qi->transport; + $notice = Notice::staticGet('id', $qi->notice_id); + if (empty($notice)) { + $this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice"); + $qi->delete(); + return true; + } - return $result; + $result = $notice; + return array($queue, $notice); } - function _done($object, $queue) + /** + * Delete our claimed item from the queue after successful processing. + * + * @param Notice $object + * @param string $queue + */ + protected function _done($object, $queue) { // XXX: right now, we only handle notices @@ -120,24 +149,29 @@ class DBQueueManager extends QueueManager 'transport' => $queue)); if (empty($qi)) { - $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item"); } else { if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. - 'for '.$notice->id.', queue '.$queue); + $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item"); } $qi->delete(); $qi->free(); - $qi = null; } - $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item"); + $this->stats('handled', $queue); $notice->free(); - $notice = null; } - function _fail($object, $queue) + /** + * Free our claimed queue item for later reprocessing in case of + * temporary failure. + * + * @param Notice $object + * @param string $queue + */ + protected function _fail($object, $queue) { // XXX: right now, we only handle notices @@ -147,11 +181,10 @@ class DBQueueManager extends QueueManager 'transport' => $queue)); if (empty($qi)) { - $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item"); } else { if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '. - 'for '.$notice->id.', queue '.$queue); + $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item"); } else { $orig = clone($qi); $qi->claimed = null; @@ -160,13 +193,13 @@ class DBQueueManager extends QueueManager } } - $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item"); + $this->stats('error', $queue); $notice->free(); - $notice = null; } - function _log($level, $msg) + protected function _log($level, $msg) { common_log($level, 'DBQueueManager: '.$msg); } diff --git a/lib/default.php b/lib/default.php index fa862f3ff1..f7f4777a2e 100644 --- a/lib/default.php +++ b/lib/default.php @@ -79,6 +79,8 @@ $default = 'queue_basename' => '/queue/statusnet/', 'stomp_username' => null, 'stomp_password' => null, + 'monitor' => null, // URL to monitor ping endpoint (work in progress) + 'softlimit' => '90%', // total size or % of memory_limit at which to restart queue threads gracefully ), 'license' => array('url' => 'http://creativecommons.org/licenses/by/3.0/', diff --git a/lib/event.php b/lib/event.php index 4819b71b4c..41fb53ffe9 100644 --- a/lib/event.php +++ b/lib/event.php @@ -138,4 +138,12 @@ class Event { } return false; } + + /** + * Disables any and all handlers that have been set up so far; + * use only if you know it's safe to reinitialize all plugins. + */ + public static function clearHandlers() { + Event::$_handlers = array(); + } } diff --git a/lib/iomanager.php b/lib/iomanager.php new file mode 100644 index 0000000000..ee2ff958b9 --- /dev/null +++ b/lib/iomanager.php @@ -0,0 +1,193 @@ +. + * + * @category QueueManager + * @package StatusNet + * @author Evan Prodromou + * @author Sarven Capadisli + * @author Brion Vibber + * @copyright 2009-2010 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +abstract class IoManager +{ + const SINGLE_ONLY = 0; + const INSTANCE_PER_SITE = 1; + const INSTANCE_PER_PROCESS = 2; + + /** + * Factory function to get an appropriate subclass. + */ + public abstract static function get(); + + /** + * Tell the i/o queue master if and how we can handle multi-site + * processes. + * + * Return one of: + * IoManager::SINGLE_ONLY + * IoManager::INSTANCE_PER_SITE + * IoManager::INSTANCE_PER_PROCESS + */ + public static function multiSite() + { + return IoManager::SINGLE_ONLY; + } + + /** + * If in a multisite configuration, the i/o master will tell + * your manager about each site you'll have to handle so you + * can do any necessary per-site setup. + * + * @param string $site target site server name + */ + public function addSite($site) + { + /* no-op */ + } + + /** + * This method is called when data is available on one of your + * i/o manager's sockets. The socket with data is passed in, + * in case you have multiple sockets. + * + * If your i/o manager is based on polling during idle processing, + * you don't need to implement this. + * + * @param resource $socket + * @return boolean true on success, false on failure + */ + public function handleInput($socket) + { + return true; + } + + /** + * Return any open sockets that the run loop should listen + * for input on. If input comes in on a listed socket, + * the matching manager's handleInput method will be called. + * + * @return array of resources + */ + function getSockets() + { + return array(); + } + + /** + * Maximum planned time between poll() calls when input isn't waiting. + * Actual time may vary! + * + * When we get a polling hit, the timeout will be cut down to 0 while + * input is coming in, then will back off to this amount if no further + * input shows up. + * + * By default polling is disabled; you must override this to enable + * polling for this manager. + * + * @return int max poll interval in seconds, or 0 to disable polling + */ + function pollInterval() + { + return 0; + } + + /** + * Request a maximum timeout for listeners before the next idle period. + * Actual wait may be shorter, so don't go crazy in your idle()! + * Wait could be longer if other handlers performed some slow activity. + * + * Return 0 to request that listeners return immediately if there's no + * i/o and speed up the idle as much as possible; but don't do that all + * the time as this will burn CPU. + * + * @return int seconds + */ + function timeout() + { + return 60; + } + + /** + * Called by IoManager after each handled item or empty polling cycle. + * This is a good time to e.g. service your XMPP connection. + * + * Doesn't need to be overridden if there's no maintenance to do. + */ + function idle() + { + return true; + } + + /** + * The meat of a polling manager... check for something to do + * and do it! Note that you should not take too long, as other + * i/o managers may need to do some work too! + * + * On a successful hit, the next poll() call will come as soon + * as possible followed by exponential backoff up to pollInterval() + * if no more data is available. + * + * @return boolean true if events were hit + */ + public function poll() + { + return false; + } + + /** + * Initialization, run when the queue manager starts. + * If this function indicates failure, the handler run will be aborted. + * + * @param IoMaster $master process/event controller + * @return boolean true on success, false on failure + */ + public function start($master) + { + $this->master = $master; + return true; + } + + /** + * Cleanup, run when the queue manager ends. + * If this function indicates failure, a warning will be logged. + * + * @return boolean true on success, false on failure + */ + public function finish() + { + return true; + } + + /** + * Ping iomaster's queue status monitor with a stats update. + * Only valid during input loop! + * + * @param string $counter keyword for counter to increment + */ + public function stats($counter, $owners=array()) + { + $this->master->stats($counter, $owners); + } +} + diff --git a/lib/iomaster.php b/lib/iomaster.php new file mode 100644 index 0000000000..aff5b145c2 --- /dev/null +++ b/lib/iomaster.php @@ -0,0 +1,361 @@ +. + * + * @category QueueManager + * @package StatusNet + * @author Brion Vibber + * @copyright 2009 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +class IoMaster +{ + public $id; + + protected $multiSite = false; + protected $managers = array(); + protected $singletons = array(); + + protected $pollTimeouts = array(); + protected $lastPoll = array(); + + /** + * @param string $id process ID to use in logging/monitoring + */ + public function __construct($id) + { + $this->id = $id; + $this->monitor = new QueueMonitor(); + } + + public function init($multiSite=null) + { + if ($multiSite !== null) { + $this->multiSite = $multiSite; + } + if ($this->multiSite) { + $this->sites = $this->findAllSites(); + } else { + $this->sites = array(common_config('site', 'server')); + } + + if (empty($this->sites)) { + throw new Exception("Empty status_network table, cannot init"); + } + + foreach ($this->sites as $site) { + if ($site != common_config('site', 'server')) { + StatusNet::init($site); + } + + $classes = array(); + if (Event::handle('StartIoManagerClasses', array(&$classes))) { + $classes[] = 'QueueManager'; + if (common_config('xmpp', 'enabled')) { + $classes[] = 'XmppManager'; // handles pings/reconnects + $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations + } + } + Event::handle('EndIoManagerClasses', array(&$classes)); + + foreach ($classes as $class) { + $this->instantiate($class); + } + } + } + + /** + * Pull all local sites from status_network table. + * @return array of hostnames + */ + protected function findAllSites() + { + $hosts = array(); + $sn = new Status_network(); + $sn->find(); + while ($sn->fetch()) { + $hosts[] = $sn->hostname; + } + return $hosts; + } + + /** + * Instantiate an i/o manager class for the current site. + * If a multi-site capable handler is already present, + * we don't need to build a new one. + * + * @param string $class + */ + protected function instantiate($class) + { + if (isset($this->singletons[$class])) { + // Already instantiated a multi-site-capable handler. + // Just let it know it should listen to this site too! + $this->singletons[$class]->addSite(common_config('site', 'server')); + return; + } + + $manager = $this->getManager($class); + + if ($this->multiSite) { + $caps = $manager->multiSite(); + if ($caps == IoManager::SINGLE_ONLY) { + throw new Exception("$class can't run with --all; aborting."); + } + if ($caps == IoManager::INSTANCE_PER_PROCESS) { + // Save this guy for later! + // We'll only need the one to cover multiple sites. + $this->singletons[$class] = $manager; + $manager->addSite(common_config('site', 'server')); + } + } + + $this->managers[] = $manager; + } + + protected function getManager($class) + { + return call_user_func(array($class, 'get')); + } + + /** + * Basic run loop... + * + * Initialize all io managers, then sit around waiting for input. + * Between events or timeouts, pass control back to idle() method + * to allow for any additional background processing. + */ + function service() + { + $this->logState('init'); + $this->start(); + + while (true) { + $timeouts = array_values($this->pollTimeouts); + $timeouts[] = 60; // default max timeout + + // Wait for something on one of our sockets + $sockets = array(); + $managers = array(); + foreach ($this->managers as $manager) { + foreach ($manager->getSockets() as $socket) { + $sockets[] = $socket; + $managers[] = $manager; + } + $timeouts[] = intval($manager->timeout()); + } + + $timeout = min($timeouts); + if ($sockets) { + $read = $sockets; + $write = array(); + $except = array(); + $this->logState('listening'); + common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data..."); + $ready = stream_select($read, $write, $except, $timeout, 0); + + if ($ready === false) { + common_log(LOG_ERR, "Error selecting on sockets"); + } else if ($ready > 0) { + foreach ($read as $socket) { + $index = array_search($socket, $sockets, true); + if ($index !== false) { + $this->logState('queue'); + $managers[$index]->handleInput($socket); + } else { + common_log(LOG_ERR, "Saw input on a socket we didn't listen to"); + } + } + } + } + + if ($timeout > 0 && empty($sockets)) { + // If we had no listeners, sleep until the pollers' next requested wakeup. + common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle..."); + $this->logState('sleep'); + sleep($timeout); + } + + $this->logState('poll'); + $this->poll(); + + $this->logState('idle'); + $this->idle(); + + $memoryLimit = $this->softMemoryLimit(); + if ($memoryLimit > 0) { + $usage = memory_get_usage(); + if ($usage > $memoryLimit) { + common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting."); + break; + } + } + } + + $this->logState('shutdown'); + $this->finish(); + } + + /** + * Return fully-parsed soft memory limit in bytes. + * @return intval 0 or -1 if not set + */ + function softMemoryLimit() + { + $softLimit = trim(common_config('queue', 'softlimit')); + if (substr($softLimit, -1) == '%') { + $limit = trim(ini_get('memory_limit')); + $limit = $this->parseMemoryLimit($limit); + if ($limit > 0) { + return intval(substr($softLimit, 0, -1) * $limit / 100); + } else { + return -1; + } + } else { + return $this->parseMemoryLimit($limit); + } + return $softLimit; + } + + /** + * Interpret PHP shorthand for memory_limit and friends. + * Why don't they just expose the actual numeric value? :P + * @param string $mem + * @return int + */ + protected function parseMemoryLimit($mem) + { + // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes + $size = array('k' => 1024, + 'm' => 1024*1024, + 'g' => 1024*1024*1024); + if (empty($mem)) { + return 0; + } else if (is_numeric($mem)) { + return intval($mem); + } else { + $mult = strtolower(substr($mem, -1)); + if (isset($size[$mult])) { + return substr($mem, 0, -1) * $size[$mult]; + } else { + return intval($mem); + } + } + } + + function start() + { + foreach ($this->managers as $index => $manager) { + $manager->start($this); + // @fixme error check + if ($manager->pollInterval()) { + // We'll want to check for input on the first pass + $this->pollTimeouts[$index] = 0; + $this->lastPoll[$index] = 0; + } + } + } + + function finish() + { + foreach ($this->managers as $manager) { + $manager->finish(); + // @fixme error check + } + } + + /** + * Called during the idle portion of the runloop to see which handlers + */ + function poll() + { + foreach ($this->managers as $index => $manager) { + $interval = $manager->pollInterval(); + if ($interval <= 0) { + // Not a polling manager. + continue; + } + + if (isset($this->pollTimeouts[$index])) { + $timeout = $this->pollTimeouts[$index]; + if (time() - $this->lastPoll[$index] < $timeout) { + // Not time to poll yet. + continue; + } + } else { + $timeout = 0; + } + $hit = $manager->poll(); + + $this->lastPoll[$index] = time(); + if ($hit) { + // Do the next poll quickly, there may be more input! + $this->pollTimeouts[$index] = 0; + } else { + // Empty queue. Exponential backoff up to the maximum poll interval. + if ($timeout > 0) { + $timeout = min($timeout * 2, $interval); + } else { + $timeout = 1; + } + $this->pollTimeouts[$index] = $timeout; + } + } + } + + /** + * Called after each handled item or empty polling cycle. + * This is a good time to e.g. service your XMPP connection. + */ + function idle() + { + foreach ($this->managers as $manager) { + $manager->idle(); + } + } + + /** + * Send thread state update to the monitoring server, if configured. + * + * @param string $state ('init', 'queue', 'shutdown' etc) + * @param string $substate (optional, eg queue name 'omb' 'sms' etc) + */ + protected function logState($state, $substate='') + { + $this->monitor->logState($this->id, $state, $substate); + } + + /** + * Send thread stats. + * Thread ID will be implicit; other owners can be listed as well + * for per-queue and per-site records. + * + * @param string $key counter name + * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01' + */ + public function stats($key, $owners=array()) + { + $owners[] = "thread:" . $this->id; + $this->monitor->stats($key, $owners); + } +} + diff --git a/lib/jabber.php b/lib/jabber.php index a821856a8b..1d0bb94231 100644 --- a/lib/jabber.php +++ b/lib/jabber.php @@ -86,7 +86,11 @@ class Sharing_XMPP extends XMPPHP_XMPP } /** - * connect the configured Jabber account to the configured server + * Lazy-connect the configured Jabber account to the configured server; + * if already opened, the same connection will be returned. + * + * In a multi-site background process, each site configuration + * will get its own connection. * * @param string $resource Resource to connect (defaults to configured resource) * @@ -95,16 +99,19 @@ class Sharing_XMPP extends XMPPHP_XMPP function jabber_connect($resource=null) { - static $conn = null; - if (!$conn) { + static $connections = array(); + $site = common_config('site', 'server'); + if (empty($connections[$site])) { + if (empty($resource)) { + $resource = common_config('xmpp', 'resource'); + } $conn = new Sharing_XMPP(common_config('xmpp', 'host') ? common_config('xmpp', 'host') : common_config('xmpp', 'server'), common_config('xmpp', 'port'), common_config('xmpp', 'user'), common_config('xmpp', 'password'), - ($resource) ? $resource : - common_config('xmpp', 'resource'), + $resource, common_config('xmpp', 'server'), common_config('xmpp', 'debug') ? true : false, @@ -115,12 +122,16 @@ function jabber_connect($resource=null) if (!$conn) { return false; } + $connections[$site] = $conn; $conn->autoSubscribe(); $conn->useEncryption(common_config('xmpp', 'encryption')); try { - $conn->connect(true); // true = persistent connection + common_log(LOG_INFO, __METHOD__ . ": connecting " . + common_config('xmpp', 'user') . '/' . $resource); + //$conn->connect(true); // true = persistent connection + $conn->connect(); // persistent connections break multisite } catch (XMPPHP_Exception $e) { common_log(LOG_ERR, $e->getMessage()); return false; @@ -128,7 +139,7 @@ function jabber_connect($resource=null) $conn->processUntil('session_start'); } - return $conn; + return $connections[$site]; } /** diff --git a/scripts/jabberqueuehandler.php b/lib/jabberqueuehandler.php old mode 100755 new mode 100644 similarity index 52% rename from scripts/jabberqueuehandler.php rename to lib/jabberqueuehandler.php index 8f3a56944d..b1518866d7 --- a/scripts/jabberqueuehandler.php +++ b/lib/jabberqueuehandler.php @@ -1,4 +1,3 @@ -#!/usr/bin/env php . */ -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = << 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new JabberQueueHandler($id); - -$handler->runOnce(); diff --git a/lib/liberalstomp.php b/lib/liberalstomp.php new file mode 100644 index 0000000000..c9233843a4 --- /dev/null +++ b/lib/liberalstomp.php @@ -0,0 +1,133 @@ + + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +class LiberalStomp extends Stomp +{ + /** + * We need to be able to get the socket so advanced daemons can + * do a select() waiting for input both from the queue and from + * other sources such as an XMPP connection. + * + * @return resource + */ + function getSocket() + { + return $this->_socket; + } + + /** + * Make socket connection to the server + * We also set the stream to non-blocking mode, since we'll be + * select'ing to wait for updates. In blocking mode it seems + * to get confused sometimes. + * + * @throws StompException + */ + protected function _makeConnection () + { + parent::_makeConnection(); + stream_set_blocking($this->_socket, 0); + } + + /** + * Version 1.0.0 of the Stomp library gets confused if messages + * come in too fast over the connection. This version will read + * out as many frames as are ready to be read from the socket. + * + * Modified from Stomp::readFrame() + * + * @return StompFrame False when no frame to read + */ + public function readFrames () + { + if (!$this->hasFrameToRead()) { + return false; + } + + $rb = 1024; + $data = ''; + $end = false; + $frames = array(); + + do { + // @fixme this sometimes hangs in blocking mode... + // shouldn't we have been idle until we found there's more data? + $read = fread($this->_socket, $rb); + if ($read === false) { + $this->_reconnect(); + // @fixme this will lose prior items + return $this->readFrames(); + } + $data .= $read; + if (strpos($data, "\x00") !== false) { + // Frames are null-delimited, but some servers + // may append an extra \n according to old bug reports. + $data = str_replace("\x00\n", "\x00", $data); + $chunks = explode("\x00", $data); + + $data = array_pop($chunks); + $frames = array_merge($frames, $chunks); + if ($data == '') { + // We're at the end of a frame; stop reading. + break; + } else { + // In the middle of a frame; keep going. + } + } + // @fixme find out why this len < 2 check was there + //$len = strlen($data); + } while (true);//$len < 2 || $end == false); + + return array_map(array($this, 'parseFrame'), $frames); + } + + /** + * Parse a raw Stomp frame into an object. + * Extracted from Stomp::readFrame() + * + * @param string $data + * @return StompFrame + */ + function parseFrame($data) + { + list ($header, $body) = explode("\n\n", $data, 2); + $header = explode("\n", $header); + $headers = array(); + $command = null; + foreach ($header as $v) { + if (isset($command)) { + list ($name, $value) = explode(':', $v, 2); + $headers[$name] = $value; + } else { + $command = $v; + } + } + $frame = new StompFrame($command, $headers, trim($body)); + if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') { + require_once 'Stomp/Message/Map.php'; + return new StompMessageMap($frame); + } else { + return $frame; + } + return $frame; + } +} + diff --git a/scripts/ombqueuehandler.php b/lib/ombqueuehandler.php old mode 100755 new mode 100644 similarity index 57% rename from scripts/ombqueuehandler.php rename to lib/ombqueuehandler.php index be33b98218..3ffc1313bc --- a/scripts/ombqueuehandler.php +++ b/lib/ombqueuehandler.php @@ -1,4 +1,3 @@ -#!/usr/bin/env php . */ -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_INFO, "INITIALIZE"); - return true; - } - + /** + * @fixme doesn't currently report failure back to the queue manager + * because omb_broadcast_notice() doesn't report it to us + */ function handle_notice($notice) { if ($this->is_remote($notice)) { $this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id); return true; } else { - return omb_broadcast_notice($notice); + require_once(INSTALLDIR.'/lib/omb.php'); + omb_broadcast_notice($notice); + return true; } } - function finish() - { - } - function is_remote($notice) { $user = User::staticGet($notice->profile_id); return is_null($user); } } - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new OmbQueueHandler($id); - -$handler->runOnce(); diff --git a/lib/pingqueuehandler.php b/lib/pingqueuehandler.php new file mode 100644 index 0000000000..8bb2180786 --- /dev/null +++ b/lib/pingqueuehandler.php @@ -0,0 +1,37 @@ +. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { + exit(1); +} + +/** + * Queue handler for pushing new notices to ping servers. + */ +class PingQueueHandler extends QueueHandler { + + function transport() { + return 'ping'; + } + + function handle_notice($notice) { + require_once INSTALLDIR . '/lib/ping.php'; + return ping_broadcast_notice($notice); + } +} diff --git a/scripts/pluginqueuehandler.php b/lib/pluginqueuehandler.php old mode 100755 new mode 100644 similarity index 61% rename from scripts/pluginqueuehandler.php rename to lib/pluginqueuehandler.php index fa39bdda6a..24d5046997 --- a/scripts/pluginqueuehandler.php +++ b/lib/pluginqueuehandler.php @@ -1,4 +1,3 @@ -#!/usr/bin/env php . */ -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_INFO, "INITIALIZE"); - return true; - } - function handle_notice($notice) { Event::handle('HandleQueuedNotice', array(&$notice)); return true; } } - -if (have_option('i', 'id')) { - $id = get_option_value('i', 'id'); -} else { - $id = null; -} - -$handler = new PluginQueueHandler($id); -$handler->runOnce(); diff --git a/scripts/publicqueuehandler.php b/lib/publicqueuehandler.php old mode 100755 new mode 100644 similarity index 52% rename from scripts/publicqueuehandler.php rename to lib/publicqueuehandler.php index 50a11bcba0..9ea9ee73a3 --- a/scripts/publicqueuehandler.php +++ b/lib/publicqueuehandler.php @@ -1,4 +1,3 @@ -#!/usr/bin/env php . */ -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); die($e->getMessage()); } + return true; } } - -// Abort immediately if xmpp is not enabled, otherwise the daemon chews up -// lots of CPU trying to connect to unconfigured servers -if (common_config('xmpp','enabled')==false) { - print "Aborting daemon - xmpp is disabled\n"; - exit(); -} - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new PublicQueueHandler($id); - -$handler->runOnce(); diff --git a/lib/queuehandler.php b/lib/queuehandler.php index cd43b1e09a..613be6e330 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -19,14 +19,6 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } -require_once(INSTALLDIR.'/lib/daemon.php'); -require_once(INSTALLDIR.'/classes/Queue_item.php'); -require_once(INSTALLDIR.'/classes/Notice.php'); - -define('CLAIM_TIMEOUT', 1200); -define('QUEUE_HANDLER_MISS_IDLE', 10); -define('QUEUE_HANDLER_HIT_IDLE', 0); - /** * Base class for queue handlers. * @@ -36,24 +28,20 @@ define('QUEUE_HANDLER_HIT_IDLE', 0); * * Subclasses must override at least the following methods: * - transport - * - start - * - finish * - handle_notice - * - * Some subclasses will also want to override the idle handler: - * - idle */ -class QueueHandler extends Daemon +#class QueueHandler extends Daemon +class QueueHandler { - function __construct($id=null, $daemonize=true) - { - parent::__construct($daemonize); - - if ($id) { - $this->set_id($id); - } - } +# function __construct($id=null, $daemonize=true) +# { +# parent::__construct($daemonize); +# +# if ($id) { +# $this->set_id($id); +# } +# } /** * How many seconds a polling-based queue manager should wait between @@ -61,22 +49,23 @@ class QueueHandler extends Daemon * * Defaults to 60 seconds; override to speed up or slow down. * + * @fixme not really compatible with global queue manager * @return int timeout in seconds */ - function timeout() - { - return 60; - } +# function timeout() +# { +# return 60; +# } - function class_name() - { - return ucfirst($this->transport()) . 'Handler'; - } +# function class_name() +# { +# return ucfirst($this->transport()) . 'Handler'; +# } - function name() - { - return strtolower($this->class_name().'.'.$this->get_id()); - } +# function name() +# { +# return strtolower($this->class_name().'.'.$this->get_id()); +# } /** * Return transport keyword which identifies items this queue handler @@ -92,30 +81,6 @@ class QueueHandler extends Daemon return null; } - /** - * Initialization, run when the queue handler starts. - * If this function indicates failure, the handler run will be aborted. - * - * @fixme run() will abort if this doesn't return true, - * but some subclasses don't bother. - * @return boolean true on success, false on failure - */ - function start() - { - } - - /** - * Cleanup, run when the queue handler ends. - * If this function indicates failure, a warning will be logged. - * - * @fixme run() will throw warnings if this doesn't return true, - * but many subclasses don't bother. - * @return boolean true on success, false on failure - */ - function finish() - { - } - /** * Here's the meat of your queue handler -- you're handed a Notice * object, which you may do as you will with. @@ -169,29 +134,10 @@ class QueueHandler extends Daemon return true; } - /** - * Called by QueueHandler after each handled item or empty polling cycle. - * This is a good time to e.g. service your XMPP connection. - * - * Doesn't need to be overridden if there's no maintenance to do. - * - * @param int $timeout seconds to sleep if there's nothing to do - */ - function idle($timeout=0) - { - if ($timeout > 0) { - sleep($timeout); - } - } function log($level, $msg) { common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); } - - function getSockets() - { - return array(); - } } diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 43105b7a86..a98c0efffb 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -2,7 +2,7 @@ /** * StatusNet, the distributed open-source microblogging tool * - * Abstract class for queue managers + * Abstract class for i/o managers * * PHP version 5 * @@ -23,16 +23,32 @@ * @package StatusNet * @author Evan Prodromou * @author Sarven Capadisli - * @copyright 2009 StatusNet, Inc. + * @author Brion Vibber + * @copyright 2009-2010 StatusNet, Inc. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 * @link http://status.net/ */ -class QueueManager +/** + * Completed child classes must implement the enqueue() method. + * + * For background processing, classes should implement either socket-based + * input (handleInput(), getSockets()) or idle-loop polling (idle()). + */ +abstract class QueueManager extends IoManager { static $qm = null; - static function get() + /** + * Factory function to pull the appropriate QueueManager object + * for this site's configuration. It can then be used to queue + * events for later processing or to spawn a processing loop. + * + * Plugins can add to the built-in types by hooking StartNewQueueManager. + * + * @return QueueManager + */ + public static function get() { if (empty(self::$qm)) { @@ -62,13 +78,130 @@ class QueueManager return self::$qm; } - function enqueue($object, $queue) + /** + * @fixme wouldn't necessarily work with other class types. + * Better to change the interface...? + */ + public static function multiSite() + { + if (common_config('queue', 'subsystem') == 'stomp') { + return IoManager::INSTANCE_PER_PROCESS; + } else { + return IoManager::SINGLE_ONLY; + } + } + + function __construct() { - throw ServerException("Unimplemented function 'enqueue' called"); + $this->initialize(); } - function service($queue, $handler) + /** + * Store an object (usually/always a Notice) into the given queue + * for later processing. No guarantee is made on when it will be + * processed; it could be immediately or at some unspecified point + * in the future. + * + * Must be implemented by any queue manager. + * + * @param Notice $object + * @param string $queue + */ + abstract function enqueue($object, $queue); + + /** + * Instantiate the appropriate QueueHandler class for the given queue. + * + * @param string $queue + * @return mixed QueueHandler or null + */ + function getHandler($queue) { - throw ServerException("Unimplemented function 'service' called"); + if (isset($this->handlers[$queue])) { + $class = $this->handlers[$queue]; + if (class_exists($class)) { + return new $class(); + } else { + common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); + } + } else { + common_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); + } + return null; + } + + /** + * Get a list of all registered queue transport names. + * + * @return array of strings + */ + function getQueues() + { + return array_keys($this->handlers); + } + + /** + * Initialize the list of queue handlers + * + * @event StartInitializeQueueManager + * @event EndInitializeQueueManager + */ + function initialize() + { + if (Event::handle('StartInitializeQueueManager', array($this))) { + $this->connect('plugin', 'PluginQueueHandler'); + $this->connect('omb', 'OmbQueueHandler'); + $this->connect('ping', 'PingQueueHandler'); + if (common_config('sms', 'enabled')) { + $this->connect('sms', 'SmsQueueHandler'); + } + + // XMPP output handlers... + if (common_config('xmpp', 'enabled')) { + $this->connect('jabber', 'JabberQueueHandler'); + $this->connect('public', 'PublicQueueHandler'); + + // @fixme this should move up a level or should get an actual queue + $this->connect('confirm', 'XmppConfirmHandler'); + } + + // For compat with old plugins not registering their own handlers. + $this->connect('plugin', 'PluginQueueHandler'); + } + Event::handle('EndInitializeQueueManager', array($this)); + } + + /** + * Register a queue transport name and handler class for your plugin. + * Only registered transports will be reliably picked up! + * + * @param string $transport + * @param string $class + */ + public function connect($transport, $class) + { + $this->handlers[$transport] = $class; + } + + /** + * Send a statistic ping to the queue monitoring system, + * optionally with a per-queue id. + * + * @param string $key + * @param string $queue + */ + function stats($key, $queue=false) + { + $owners = array(); + if ($queue) { + $owners[] = "queue:$queue"; + $owners[] = "site:" . common_config('site', 'server'); + } + if (isset($this->master)) { + $this->master->stats($key, $owners); + } else { + $monitor = new QueueMonitor(); + $monitor->stats($key, $owners); + } } } diff --git a/lib/queuemonitor.php b/lib/queuemonitor.php new file mode 100644 index 0000000000..1c306a6298 --- /dev/null +++ b/lib/queuemonitor.php @@ -0,0 +1,116 @@ +. + * + * @category QueueManager + * @package StatusNet + * @author Brion Vibber + * @copyright 2010 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +class QueueMonitor +{ + protected $monSocket = null; + + /** + * Increment monitoring statistics for a given counter, if configured. + * Only explicitly listed thread/site/queue owners will be incremented. + * + * @param string $key counter name + * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01' + */ + public function stats($key, $owners=array()) + { + $this->ping(array('counter' => $key, + 'owners' => $owners)); + } + + /** + * Send thread state update to the monitoring server, if configured. + * + * @param string $thread ID (eg 'generic.1') + * @param string $state ('init', 'queue', 'shutdown' etc) + * @param string $substate (optional, eg queue name 'omb' 'sms' etc) + */ + public function logState($threadId, $state, $substate='') + { + $this->ping(array('thread_id' => $threadId, + 'state' => $state, + 'substate' => $substate, + 'ts' => microtime(true))); + } + + /** + * General call to the monitoring server + */ + protected function ping($data) + { + $target = common_config('queue', 'monitor'); + if (empty($target)) { + return; + } + + $data = $this->prepMonitorData($data); + + if (substr($target, 0, 4) == 'udp:') { + $this->pingUdp($target, $data); + } else if (substr($target, 0, 5) == 'http:') { + $this->pingHttp($target, $data); + } else { + common_log(LOG_ERR, __METHOD__ . ' unknown monitor target type ' . $target); + } + } + + protected function pingUdp($target, $data) + { + if (!$this->monSocket) { + $this->monSocket = stream_socket_client($target, $errno, $errstr); + } + if ($this->monSocket) { + $post = http_build_query($data, '', '&'); + stream_socket_sendto($this->monSocket, $post); + } else { + common_log(LOG_ERR, __METHOD__ . " UDP logging fail: $errstr"); + } + } + + protected function pingHttp($target, $data) + { + $client = new HTTPClient(); + $result = $client->post($target, array(), $data); + + if (!$result->isOk()) { + common_log(LOG_ERR, __METHOD__ . ' HTTP ' . $result->getStatus() . + ': ' . $result->getBody()); + } + } + + protected function prepMonitorData($data) + { + #asort($data); + #$macdata = http_build_query($data, '', '&'); + #$key = 'This is a nice old key'; + #$data['hmac'] = hash_hmac('sha256', $macdata, $key); + return $data; + } + +} diff --git a/scripts/smsqueuehandler.php b/lib/smsqueuehandler.php old mode 100755 new mode 100644 similarity index 54% rename from scripts/smsqueuehandler.php rename to lib/smsqueuehandler.php index 6583a77da8..48a96409d0 --- a/scripts/smsqueuehandler.php +++ b/lib/smsqueuehandler.php @@ -1,4 +1,3 @@ -#!/usr/bin/env php . */ -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_INFO, "INITIALIZE"); - return true; - } - function handle_notice($notice) { + require_once(INSTALLDIR.'/lib/mail.php'); return mail_broadcast_notice_sms($notice); } - - function finish() - { - } } - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new SmsQueueHandler($id); - -$handler->runOnce(); diff --git a/lib/statusnet.php b/lib/statusnet.php new file mode 100644 index 0000000000..0c5807d7b9 --- /dev/null +++ b/lib/statusnet.php @@ -0,0 +1,295 @@ +. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } + +global $config, $_server, $_path; + +/** + * Global configuration setup and management. + */ +class StatusNet +{ + protected static $have_config; + + /** + * Configure and instantiate a plugin into the current configuration. + * Class definitions will be loaded from standard paths if necessary. + * Note that initialization events won't be fired until later. + * + * @param string $name class name & plugin file/subdir name + * @param array $attrs key/value pairs of public attributes to set on plugin instance + * + * @throws ServerException if plugin can't be found + */ + public static function addPlugin($name, $attrs = null) + { + $name = ucfirst($name); + $pluginclass = "{$name}Plugin"; + + if (!class_exists($pluginclass)) { + + $files = array("local/plugins/{$pluginclass}.php", + "local/plugins/{$name}/{$pluginclass}.php", + "local/{$pluginclass}.php", + "local/{$name}/{$pluginclass}.php", + "plugins/{$pluginclass}.php", + "plugins/{$name}/{$pluginclass}.php"); + + foreach ($files as $file) { + $fullpath = INSTALLDIR.'/'.$file; + if (@file_exists($fullpath)) { + include_once($fullpath); + break; + } + } + if (!class_exists($pluginclass)) { + throw new ServerException(500, "Plugin $name not found."); + } + } + + $inst = new $pluginclass(); + if (!empty($attrs)) { + foreach ($attrs as $aname => $avalue) { + $inst->$aname = $avalue; + } + } + return true; + } + + /** + * Initialize, or re-initialize, StatusNet global configuration + * and plugins. + * + * If switching site configurations during script execution, be + * careful when working with leftover objects -- global settings + * affect many things and they may not behave as you expected. + * + * @param $server optional web server hostname for picking config + * @param $path optional URL path for picking config + * @param $conffile optional configuration file path + * + * @throws NoConfigException if config file can't be found + */ + public static function init($server=null, $path=null, $conffile=null) + { + StatusNet::initDefaults($server, $path); + StatusNet::loadConfigFile($conffile); + + // Load settings from database; note we need autoload for this + Config::loadSettings(); + + self::initPlugins(); + } + + /** + * Fire initialization events for all instantiated plugins. + */ + protected static function initPlugins() + { + // Load default plugins + foreach (common_config('plugins', 'default') as $name => $params) { + if (is_null($params)) { + addPlugin($name); + } else if (is_array($params)) { + if (count($params) == 0) { + addPlugin($name); + } else { + $keys = array_keys($params); + if (is_string($keys[0])) { + addPlugin($name, $params); + } else { + foreach ($params as $paramset) { + addPlugin($name, $paramset); + } + } + } + } + } + + // XXX: if plugins should check the schema at runtime, do that here. + if (common_config('db', 'schemacheck') == 'runtime') { + Event::handle('CheckSchema'); + } + + // Give plugins a chance to initialize in a fully-prepared environment + Event::handle('InitializePlugin'); + } + + /** + * Quick-check if configuration has been established. + * Useful for functions which may get used partway through + * initialization to back off from fancier things. + * + * @return bool + */ + public function haveConfig() + { + return self::$have_config; + } + + /** + * Build default configuration array + * @return array + */ + protected static function defaultConfig() + { + global $_server, $_path; + require(INSTALLDIR.'/lib/default.php'); + return $default; + } + + /** + * Establish default configuration based on given or default server and path + * Sets global $_server, $_path, and $config + */ + protected static function initDefaults($server, $path) + { + global $_server, $_path, $config; + + Event::clearHandlers(); + + // try to figure out where we are. $server and $path + // can be set by including module, else we guess based + // on HTTP info. + + if (isset($server)) { + $_server = $server; + } else { + $_server = array_key_exists('SERVER_NAME', $_SERVER) ? + strtolower($_SERVER['SERVER_NAME']) : + null; + } + + if (isset($path)) { + $_path = $path; + } else { + $_path = (array_key_exists('SERVER_NAME', $_SERVER) && array_key_exists('SCRIPT_NAME', $_SERVER)) ? + self::_sn_to_path($_SERVER['SCRIPT_NAME']) : + null; + } + + // Set config values initially to default values + $default = self::defaultConfig(); + $config = $default; + + // default configuration, overwritten in config.php + // Keep DB_DataObject's db config synced to ours... + + $config['db'] = &PEAR::getStaticProperty('DB_DataObject','options'); + + $config['db'] = $default['db']; + + // Backward compatibility + + $config['site']['design'] =& $config['design']; + + if (function_exists('date_default_timezone_set')) { + /* Work internally in UTC */ + date_default_timezone_set('UTC'); + } + } + + protected function _sn_to_path($sn) + { + $past_root = substr($sn, 1); + $last_slash = strrpos($past_root, '/'); + if ($last_slash > 0) { + $p = substr($past_root, 0, $last_slash); + } else { + $p = ''; + } + return $p; + } + + /** + * Load the default or specified configuration file. + * Modifies global $config and may establish plugins. + * + * @throws NoConfigException + */ + protected function loadConfigFile($conffile=null) + { + global $_server, $_path, $config; + + // From most general to most specific: + // server-wide, then vhost-wide, then for a path, + // finally for a dir (usually only need one of the last two). + + if (isset($conffile)) { + $config_files = array($conffile); + } else { + $config_files = array('/etc/statusnet/statusnet.php', + '/etc/statusnet/laconica.php', + '/etc/laconica/laconica.php', + '/etc/statusnet/'.$_server.'.php', + '/etc/laconica/'.$_server.'.php'); + + if (strlen($_path) > 0) { + $config_files[] = '/etc/statusnet/'.$_server.'_'.$_path.'.php'; + $config_files[] = '/etc/laconica/'.$_server.'_'.$_path.'.php'; + } + + $config_files[] = INSTALLDIR.'/config.php'; + } + + self::$have_config = false; + + foreach ($config_files as $_config_file) { + if (@file_exists($_config_file)) { + include($_config_file); + self::$have_config = true; + } + } + + if (!self::$have_config) { + throw new NoConfigException("No configuration file found.", + $config_files); + } + + // Fixup for statusnet.ini + $_db_name = substr($config['db']['database'], strrpos($config['db']['database'], '/') + 1); + + if ($_db_name != 'statusnet' && !array_key_exists('ini_'.$_db_name, $config['db'])) { + $config['db']['ini_'.$_db_name] = INSTALLDIR.'/classes/statusnet.ini'; + } + + // Backwards compatibility + + if (array_key_exists('memcached', $config)) { + if ($config['memcached']['enabled']) { + addPlugin('Memcache', array('servers' => $config['memcached']['server'])); + } + + if (!empty($config['memcached']['base'])) { + $config['cache']['base'] = $config['memcached']['base']; + } + } + } +} + +class NoConfigException extends Exception +{ + public $config_files; + + function __construct($msg, $config_files) { + parent::__construct($msg); + $this->config_files = $config_files; + } +} diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index f059b42f00..3090e0bfb6 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -30,46 +30,53 @@ require_once 'Stomp.php'; -class LiberalStomp extends Stomp -{ - function getSocket() - { - return $this->_socket; - } -} -class StompQueueManager +class StompQueueManager extends QueueManager { var $server = null; var $username = null; var $password = null; var $base = null; var $con = null; + + protected $master = null; + protected $sites = array(); function __construct() { + parent::__construct(); $this->server = common_config('queue', 'stomp_server'); $this->username = common_config('queue', 'stomp_username'); $this->password = common_config('queue', 'stomp_password'); $this->base = common_config('queue', 'queue_basename'); } - function _connect() + /** + * Tell the i/o master we only need a single instance to cover + * all sites running in this process. + */ + public static function multiSite() { - if (empty($this->con)) { - $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); - $this->con = new LiberalStomp($this->server); + return IoManager::INSTANCE_PER_PROCESS; + } - if ($this->con->connect($this->username, $this->password)) { - $this->_log(LOG_INFO, "Connected."); - } else { - $this->_log(LOG_ERR, 'Failed to connect to queue server'); - throw new ServerException('Failed to connect to queue server'); - } - } + /** + * Record each site we'll be handling input for in this process, + * so we can listen to the necessary queues for it. + * + * @fixme possibly actually do subscription here to save another + * loop over all sites later? + */ + public function addSite($server) + { + $this->sites[] = $server; } - function enqueue($object, $queue) + /** + * Saves a notice object reference into the queue item table. + * @return boolean true on success + */ + public function enqueue($object, $queue) { $notice = $object; @@ -77,7 +84,7 @@ class StompQueueManager // XXX: serialize and send entire notice - $result = $this->con->send($this->_queueName($queue), + $result = $this->con->send($this->queueName($queue), $notice->id, // BODY of the message array ('created' => $notice->created)); @@ -88,78 +95,212 @@ class StompQueueManager common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $queue); + $this->stats('enqueued', $queue); } - function service($queue, $handler) + /** + * Send any sockets we're listening on to the IO manager + * to wait for input. + * + * @return array of resources + */ + public function getSockets() { - $result = null; - - $this->_connect(); + return array($this->con->getSocket()); + } - $this->con->setReadTimeout($handler->timeout()); + /** + * We've got input to handle on our socket! + * Read any waiting Stomp frame(s) and process them. + * + * @param resource $socket + * @return boolean ok on success + */ + public function handleInput($socket) + { + assert($socket === $this->con->getSocket()); + $ok = true; + $frames = $this->con->readFrames(); + foreach ($frames as $frame) { + $ok = $ok && $this->_handleNotice($frame); + } + return $ok; + } - $this->con->subscribe($this->_queueName($queue)); + /** + * Initialize our connection and subscribe to all the queues + * we're going to need to handle... + * + * Side effects: in multi-site mode, may reset site configuration. + * + * @param IoMaster $master process/event controller + * @return bool return false on failure + */ + public function start($master) + { + parent::start($master); + if ($this->sites) { + foreach ($this->sites as $server) { + StatusNet::init($server); + $this->doSubscribe(); + } + } else { + $this->doSubscribe(); + } + return true; + } + + /** + * Subscribe to all the queues we're going to need to handle... + * + * Side effects: in multi-site mode, may reset site configuration. + * + * @return bool return false on failure + */ + public function finish() + { + if ($this->sites) { + foreach ($this->sites as $server) { + StatusNet::init($server); + $this->doUnsubscribe(); + } + } else { + $this->doUnsubscribe(); + } + return true; + } + + /** + * Lazy open connection to Stomp queue server. + */ + protected function _connect() + { + if (empty($this->con)) { + $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); + $this->con = new LiberalStomp($this->server); - while (true) { + if ($this->con->connect($this->username, $this->password)) { + $this->_log(LOG_INFO, "Connected."); + } else { + $this->_log(LOG_ERR, 'Failed to connect to queue server'); + throw new ServerException('Failed to connect to queue server'); + } + } + } - // Wait for something on one of our sockets + /** + * Subscribe to all enabled notice queues for the current site. + */ + protected function doSubscribe() + { + $this->_connect(); + foreach ($this->getQueues() as $queue) { + $rawqueue = $this->queueName($queue); + $this->_log(LOG_INFO, "Subscribing to $rawqueue"); + $this->con->subscribe($rawqueue); + } + } + + /** + * Subscribe from all enabled notice queues for the current site. + */ + protected function doUnsubscribe() + { + $this->_connect(); + foreach ($this->getQueues() as $queue) { + $this->con->unsubscribe($this->queueName($queue)); + } + } - $stompsock = $this->con->getSocket(); + /** + * Handle and acknowledge a notice event that's come in through a queue. + * + * If the queue handler reports failure, the message is requeued for later. + * Missing notices or handler classes will drop the message. + * + * Side effects: in multi-site mode, may reset site configuration to + * match the site that queued the event. + * + * @param StompFrame $frame + * @return bool + */ + protected function _handleNotice($frame) + { + list($site, $queue) = $this->parseDestination($frame->headers['destination']); + if ($site != common_config('site', 'server')) { + $this->stats('switch'); + StatusNet::init($site); + } - $handsocks = $handler->getSockets(); + $id = intval($frame->body); + $info = "notice $id posted at {$frame->headers['created']} in queue $queue"; - $socks = array_merge(array($stompsock), $handsocks); + $notice = Notice::staticGet('id', $id); + if (empty($notice)) { + $this->_log(LOG_WARNING, "Skipping missing $info"); + $this->con->ack($frame); + $this->stats('badnotice', $queue); + return false; + } - $read = $socks; - $write = array(); - $except = array(); + $handler = $this->getHandler($queue); + if (!$handler) { + $this->_log(LOG_ERROR, "Missing handler class; skipping $info"); + $this->con->ack($frame); + $this->stats('badhandler', $queue); + return false; + } - $ready = stream_select($read, $write, $except, $handler->timeout(), 0); + $ok = $handler->handle_notice($notice); - if ($ready === false) { - $this->_log(LOG_ERR, "Error selecting on sockets"); - } else if ($ready > 0) { - if (in_array($stompsock, $read)) { - $this->_handleNotice($queue, $handler); - } - $handler->idle(QUEUE_HANDLER_HIT_IDLE); - } + if (!$ok) { + $this->_log(LOG_WARNING, "Failed handling $info"); + // FIXME we probably shouldn't have to do + // this kind of queue management ourselves; + // if we don't ack, it should resend... + $this->con->ack($frame); + $this->enqueue($notice, $queue); + $this->stats('requeued', $queue); + return false; } - $this->con->unsubscribe($this->_queueName($queue)); + $this->_log(LOG_INFO, "Successfully handled $info"); + $this->con->ack($frame); + $this->stats('handled', $queue); + return true; } - function _handleNotice($queue, $handler) + /** + * Combines the queue_basename from configuration with the + * site server name and queue name to give eg: + * + * /queue/statusnet/identi.ca/sms + * + * @param string $queue + * @return string + */ + protected function queueName($queue) { - $frame = $this->con->readFrame(); - - if (!empty($frame)) { - $notice = Notice::staticGet('id', $frame->body); - - if (empty($notice)) { - $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue); - $this->con->ack($frame); - } else { - if ($handler->handle_notice($notice)) { - $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); - $this->con->ack($frame); - } else { - $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); - // FIXME we probably shouldn't have to do - // this kind of queue management ourselves - $this->con->ack($frame); - $this->enqueue($notice, $queue); - } - unset($notice); - } - - unset($frame); - } + return common_config('queue', 'queue_basename') . + common_config('site', 'server') . '/' . $queue; } - function _queueName($queue) + /** + * Returns the site and queue name from the server-side queue. + * + * @param string queue destination (eg '/queue/statusnet/identi.ca/sms') + * @return array of site and queue: ('identi.ca','sms') or false if unrecognized + */ + protected function parseDestination($dest) { - return common_config('queue', 'queue_basename') . $queue; + $prefix = common_config('queue', 'queue_basename'); + if (substr($dest, 0, strlen($prefix)) == $prefix) { + $rest = substr($dest, strlen($prefix)); + return explode("/", $rest, 2); + } else { + common_log(LOG_ERR, "Got a message from unrecognized stomp queue: $dest"); + return array(false, false); + } } function _log($level, $msg) @@ -167,3 +308,4 @@ class StompQueueManager common_log($level, 'StompQueueManager: '.$msg); } } + diff --git a/lib/unqueuemanager.php b/lib/unqueuemanager.php index 72dbc4eede..5595eac052 100644 --- a/lib/unqueuemanager.php +++ b/lib/unqueuemanager.php @@ -23,57 +23,35 @@ * @package StatusNet * @author Evan Prodromou * @author Sarven Capadisli + * @author Brion Vibber * @copyright 2009 StatusNet, Inc. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 * @link http://status.net/ */ -class UnQueueManager +class UnQueueManager extends QueueManager { + + /** + * Dummy queue storage manager: instead of saving events for later, + * we just process them immediately. This is only suitable for events + * that can be processed quickly and don't need polling or long-running + * connections to another server such as XMPP. + * + * @param Notice $object + * @param string $queue + */ function enqueue($object, $queue) { $notice = $object; - - switch ($queue) - { - case 'omb': - if ($this->_isLocal($notice)) { - require_once(INSTALLDIR.'/lib/omb.php'); - omb_broadcast_notice($notice); - } - break; - case 'public': - if ($this->_isLocal($notice)) { - require_once(INSTALLDIR.'/lib/jabber.php'); - jabber_public_notice($notice); - } - break; - case 'ping': - if ($this->_isLocal($notice)) { - require_once INSTALLDIR . '/lib/ping.php'; - return ping_broadcast_notice($notice); - } - case 'sms': - require_once(INSTALLDIR.'/lib/mail.php'); - mail_broadcast_notice_sms($notice); - break; - case 'jabber': - require_once(INSTALLDIR.'/lib/jabber.php'); - jabber_broadcast_notice($notice); - break; - case 'plugin': - Event::handle('HandleQueuedNotice', array(&$notice)); - break; - default: + + $handler = $this->getHandler($queue); + if ($handler) { + $handler->handle_notice($notice); + } else { if (Event::handle('UnqueueHandleNotice', array(&$notice, $queue))) { throw new ServerException("UnQueueManager: Unknown queue: $queue"); } } } - - function _isLocal($notice) - { - return ($notice->is_local == Notice::LOCAL_PUBLIC || - $notice->is_local == Notice::LOCAL_NONPUBLIC); - } -} \ No newline at end of file +} diff --git a/lib/util.php b/lib/util.php index 1237d718bd..f161794e38 100644 --- a/lib/util.php +++ b/lib/util.php @@ -1132,8 +1132,9 @@ function common_log_line($priority, $msg) function common_request_id() { $pid = getmypid(); + $server = common_config('site', 'server'); if (php_sapi_name() == 'cli') { - return $pid; + return "$server:$pid"; } else { static $req_id = null; if (!isset($req_id)) { @@ -1143,7 +1144,7 @@ function common_request_id() $url = $_SERVER['REQUEST_URI']; } $method = $_SERVER['REQUEST_METHOD']; - return "$pid.$req_id $method $url"; + return "$server:$pid.$req_id $method $url"; } } diff --git a/lib/xmppconfirmmanager.php b/lib/xmppconfirmmanager.php new file mode 100644 index 0000000000..ee4e294fd4 --- /dev/null +++ b/lib/xmppconfirmmanager.php @@ -0,0 +1,168 @@ +. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { + exit(1); +} + +/** + * Event handler for pushing new confirmations to Jabber users. + * @fixme recommend redoing this on a queue-trigger model + * @fixme expiration of old items got dropped in the past, put it back? + */ +class XmppConfirmManager extends IoManager +{ + + /** + * @return mixed XmppConfirmManager, or false if unneeded + */ + public static function get() + { + if (common_config('xmpp', 'enabled')) { + $site = common_config('site', 'server'); + return new XmppConfirmManager(); + } else { + return false; + } + } + + /** + * Tell the i/o master we need one instance for each supporting site + * being handled in this process. + */ + public static function multiSite() + { + return IoManager::INSTANCE_PER_SITE; + } + + function __construct() + { + $this->site = common_config('site', 'server'); + } + + /** + * 10 seconds? Really? That seems a bit frequent. + */ + function pollInterval() + { + return 10; + } + + /** + * Ping! + * @return boolean true if we found something + */ + function poll() + { + $this->switchSite(); + $confirm = $this->next_confirm(); + if ($confirm) { + $this->handle_confirm($confirm); + return true; + } else { + return false; + } + } + + protected function handle_confirm($confirm) + { + require_once INSTALLDIR . '/lib/jabber.php'; + + common_log(LOG_INFO, 'Sending confirmation for ' . $confirm->address); + $user = User::staticGet($confirm->user_id); + if (!$user) { + common_log(LOG_WARNING, 'Confirmation for unknown user ' . $confirm->user_id); + return; + } + $success = jabber_confirm_address($confirm->code, + $user->nickname, + $confirm->address); + if (!$success) { + common_log(LOG_ERR, 'Confirmation failed for ' . $confirm->address); + # Just let the claim age out; hopefully things work then + return; + } else { + common_log(LOG_INFO, 'Confirmation sent for ' . $confirm->address); + # Mark confirmation sent; need a dupe so we don't have the WHERE clause + $dupe = Confirm_address::staticGet('code', $confirm->code); + if (!$dupe) { + common_log(LOG_WARNING, 'Could not refetch confirm', __FILE__); + return; + } + $orig = clone($dupe); + $dupe->sent = $dupe->claimed; + $result = $dupe->update($orig); + if (!$result) { + common_log_db_error($dupe, 'UPDATE', __FILE__); + # Just let the claim age out; hopefully things work then + return; + } + } + return true; + } + + protected function next_confirm() + { + $confirm = new Confirm_address(); + $confirm->whereAdd('claimed IS null'); + $confirm->whereAdd('sent IS null'); + # XXX: eventually we could do other confirmations in the queue, too + $confirm->address_type = 'jabber'; + $confirm->orderBy('modified DESC'); + $confirm->limit(1); + if ($confirm->find(true)) { + common_log(LOG_INFO, 'Claiming confirmation for ' . $confirm->address); + # working around some weird DB_DataObject behaviour + $confirm->whereAdd(''); # clears where stuff + $original = clone($confirm); + $confirm->claimed = common_sql_now(); + $result = $confirm->update($original); + if ($result) { + common_log(LOG_INFO, 'Succeeded in claim! '. $result); + return $confirm; + } else { + common_log(LOG_INFO, 'Failed in claim!'); + return false; + } + } + return null; + } + + protected function clear_old_confirm_claims() + { + $confirm = new Confirm(); + $confirm->claimed = null; + $confirm->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); + $confirm->update(DB_DATAOBJECT_WHEREADD_ONLY); + $confirm->free(); + unset($confirm); + } + + /** + * Make sure we're on the right site configuration + */ + protected function switchSite() + { + if ($this->site != common_config('site', 'server')) { + common_log(LOG_DEBUG, __METHOD__ . ": switching to site $this->site"); + $this->stats('switch'); + StatusNet::init($this->site); + } + } +} diff --git a/lib/xmppmanager.php b/lib/xmppmanager.php new file mode 100644 index 0000000000..9662e97d15 --- /dev/null +++ b/lib/xmppmanager.php @@ -0,0 +1,273 @@ +. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } + +/** + * XMPP background connection manager for XMPP-using queue handlers, + * allowing them to send outgoing messages on the right connection. + * + * Input is handled during socket select loop, keepalive pings during idle. + * Any incoming messages will be forwarded to the main XmppDaemon process, + * which handles direct user interaction. + * + * In a multi-site queuedaemon.php run, one connection will be instantiated + * for each site being handled by the current process that has XMPP enabled. + */ + +class XmppManager extends IoManager +{ + protected $site = null; + protected $pingid = 0; + protected $lastping = null; + + static protected $singletons = array(); + + const PING_INTERVAL = 120; + + /** + * Fetch the singleton XmppManager for the current site. + * @return mixed XmppManager, or false if unneeded + */ + public static function get() + { + if (common_config('xmpp', 'enabled')) { + $site = common_config('site', 'server'); + if (empty(self::$singletons[$site])) { + self::$singletons[$site] = new XmppManager(); + } + return self::$singletons[$site]; + } else { + return false; + } + } + + /** + * Tell the i/o master we need one instance for each supporting site + * being handled in this process. + */ + public static function multiSite() + { + return IoManager::INSTANCE_PER_SITE; + } + + function __construct() + { + $this->site = common_config('site', 'server'); + } + + /** + * Initialize connection to server. + * @return boolean true on success + */ + public function start($master) + { + parent::start($master); + $this->switchSite(); + + require_once "lib/jabber.php"; + + # Low priority; we don't want to receive messages + + common_log(LOG_INFO, "INITIALIZE"); + $this->conn = jabber_connect($this->resource()); + + if (empty($this->conn)) { + common_log(LOG_ERR, "Couldn't connect to server."); + return false; + } + + $this->conn->addEventHandler('message', 'forward_message', $this); + $this->conn->addEventHandler('reconnect', 'handle_reconnect', $this); + $this->conn->setReconnectTimeout(600); + jabber_send_presence("Send me a message to post a notice", 'available', null, 'available', -1); + + return !is_null($this->conn); + } + + /** + * Message pump is triggered on socket input, so we only need an idle() + * call often enough to trigger our outgoing pings. + */ + function timeout() + { + return self::PING_INTERVAL; + } + + /** + * Lists the XMPP connection socket to allow i/o master to wake + * when input comes in here as well as from the queue source. + * + * @return array of resources + */ + public function getSockets() + { + return array($this->conn->getSocket()); + } + + /** + * Process XMPP events that have come in over the wire. + * Side effects: may switch site configuration + * @fixme may kill process on XMPP error + * @param resource $socket + */ + public function handleInput($socket) + { + $this->switchSite(); + + # Process the queue for as long as needed + try { + if ($this->conn) { + assert($socket === $this->conn->getSocket()); + + common_log(LOG_DEBUG, "Servicing the XMPP queue."); + $this->stats('xmpp_process'); + $this->conn->processTime(0); + } + } catch (XMPPHP_Exception $e) { + common_log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); + die($e->getMessage()); + } + } + + /** + * Idle processing for io manager's execution loop. + * Send keepalive pings to server. + * + * Side effect: kills process on exception from XMPP library. + * + * @fixme non-dying error handling + */ + public function idle($timeout=0) + { + if ($this->conn) { + $now = time(); + if (empty($this->lastping) || $now - $this->lastping > self::PING_INTERVAL) { + $this->switchSite(); + try { + $this->sendPing(); + $this->lastping = $now; + } catch (XMPPHP_Exception $e) { + common_log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); + die($e->getMessage()); + } + } + } + } + + /** + * Send a keepalive ping to the XMPP server. + */ + protected function sendPing() + { + $jid = jabber_daemon_address().'/'.$this->resource(); + $server = common_config('xmpp', 'server'); + + if (!isset($this->pingid)) { + $this->pingid = 0; + } else { + $this->pingid++; + } + + common_log(LOG_DEBUG, "Sending ping #{$this->pingid}"); + + $this->conn->send(""); + } + + /** + * Callback for Jabber reconnect event + * @param $pl + */ + function handle_reconnect(&$pl) + { + common_log(LOG_NOTICE, 'XMPP reconnected'); + + $this->conn->processUntil('session_start'); + $this->conn->presence(null, 'available', null, 'available', -1); + } + + /** + * Callback for Jabber message event. + * + * This connection handles output; if we get a message straight to us, + * forward it on to our XmppDaemon listener for processing. + * + * @param $pl + */ + function forward_message(&$pl) + { + if ($pl['type'] != 'chat') { + common_log(LOG_DEBUG, 'Ignoring message of type ' . $pl['type'] . ' from ' . $pl['from']); + return; + } + $listener = $this->listener(); + if (strtolower($listener) == strtolower($pl['from'])) { + common_log(LOG_WARNING, 'Ignoring loop message.'); + return; + } + common_log(LOG_INFO, 'Forwarding message from ' . $pl['from'] . ' to ' . $listener); + $this->conn->message($this->listener(), $pl['body'], 'chat', null, $this->ofrom($pl['from'])); + } + + /** + * Build an block with an ofrom entry for forwarded messages + * + * @param string $from Jabber ID of original sender + * @return string XML fragment + */ + protected function ofrom($from) + { + $address = "\n"; + $address .= "
\n"; + $address .= "\n"; + return $address; + } + + /** + * Build the complete JID of the XmppDaemon process which + * handles primary XMPP input for this site. + * + * @return string Jabber ID + */ + protected function listener() + { + if (common_config('xmpp', 'listener')) { + return common_config('xmpp', 'listener'); + } else { + return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon'; + } + } + + protected function resource() + { + return 'queue' . posix_getpid(); // @fixme PIDs won't be host-unique + } + + /** + * Make sure we're on the right site configuration + */ + protected function switchSite() + { + if ($this->site != common_config('site', 'server')) { + common_log(LOG_DEBUG, __METHOD__ . ": switching to site $this->site"); + $this->stats('switch'); + StatusNet::init($this->site); + } + } +} diff --git a/lib/xmppqueuehandler.php b/lib/xmppqueuehandler.php deleted file mode 100644 index f28fc9088c..0000000000 --- a/lib/xmppqueuehandler.php +++ /dev/null @@ -1,142 +0,0 @@ -. - */ - -if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } - -require_once(INSTALLDIR.'/lib/queuehandler.php'); - -define('PING_INTERVAL', 120); - -/** - * Common superclass for all XMPP-using queue handlers. They all need to - * service their message queues on idle, and forward any incoming messages - * to the XMPP listener connection. So, we abstract out common code to a - * superclass. - */ - -class XmppQueueHandler extends QueueHandler -{ - var $pingid = 0; - var $lastping = null; - - function start() - { - # Low priority; we don't want to receive messages - - $this->log(LOG_INFO, "INITIALIZE"); - $this->conn = jabber_connect($this->_id.$this->transport()); - - if (empty($this->conn)) { - $this->log(LOG_ERR, "Couldn't connect to server."); - return false; - } - - $this->conn->addEventHandler('message', 'forward_message', $this); - $this->conn->addEventHandler('reconnect', 'handle_reconnect', $this); - $this->conn->setReconnectTimeout(600); - jabber_send_presence("Send me a message to post a notice", 'available', null, 'available', -1); - - return !is_null($this->conn); - } - - function timeout() - { - return 10; - } - - function handle_reconnect(&$pl) - { - $this->log(LOG_NOTICE, 'reconnected'); - - $this->conn->processUntil('session_start'); - $this->conn->presence(null, 'available', null, 'available', -1); - } - - function idle($timeout=0) - { - # Process the queue for as long as needed - try { - if ($this->conn) { - $this->log(LOG_DEBUG, "Servicing the XMPP queue."); - $this->conn->processTime($timeout); - $now = time(); - if (empty($this->lastping) || $now - $this->lastping > PING_INTERVAL) { - $this->sendPing(); - $this->lastping = $now; - } - } - } catch (XMPPHP_Exception $e) { - $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); - die($e->getMessage()); - } - } - - function sendPing() - { - $jid = jabber_daemon_address().'/'.$this->_id.$this->transport(); - $server = common_config('xmpp', 'server'); - - if (!isset($this->pingid)) { - $this->pingid = 0; - } else { - $this->pingid++; - } - - $this->log(LOG_DEBUG, "Sending ping #{$this->pingid}"); - - $this->conn->send(""); - } - - function forward_message(&$pl) - { - if ($pl['type'] != 'chat') { - $this->log(LOG_DEBUG, 'Ignoring message of type ' . $pl['type'] . ' from ' . $pl['from']); - return; - } - $listener = $this->listener(); - if (strtolower($listener) == strtolower($pl['from'])) { - $this->log(LOG_WARNING, 'Ignoring loop message.'); - return; - } - $this->log(LOG_INFO, 'Forwarding message from ' . $pl['from'] . ' to ' . $listener); - $this->conn->message($this->listener(), $pl['body'], 'chat', null, $this->ofrom($pl['from'])); - } - - function ofrom($from) - { - $address = "\n"; - $address .= "
\n"; - $address .= "\n"; - return $address; - } - - function listener() - { - if (common_config('xmpp', 'listener')) { - return common_config('xmpp', 'listener'); - } else { - return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon'; - } - } - - function getSockets() - { - return array($this->conn->getSocket()); - } -} diff --git a/plugins/Enjit/README b/plugins/Enjit/README new file mode 100644 index 0000000000..03f989490b --- /dev/null +++ b/plugins/Enjit/README @@ -0,0 +1,5 @@ +This doesn't seem to have been functional for a while; can't find other references +to the enjit configuration or transport enqueuing. Keeping it in case someone +wants to bring it up to date. + +-- brion vibber 2009-12-03 diff --git a/scripts/enjitqueuehandler.php b/plugins/Enjit/enjitqueuehandler.php old mode 100755 new mode 100644 similarity index 79% rename from scripts/enjitqueuehandler.php rename to plugins/Enjit/enjitqueuehandler.php index afcac539a6..f0e706b929 --- a/scripts/enjitqueuehandler.php +++ b/plugins/Enjit/enjitqueuehandler.php @@ -1,4 +1,3 @@ -#!/usr/bin/env php . */ -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = << 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new EnjitQueueHandler($id); - -if ($handler->start()) { - $handler->handle_queue(); -} - -$handler->finish(); diff --git a/plugins/Facebook/FacebookPlugin.php b/plugins/Facebook/FacebookPlugin.php index de91bf24a1..4266b886d9 100644 --- a/plugins/Facebook/FacebookPlugin.php +++ b/plugins/Facebook/FacebookPlugin.php @@ -114,6 +114,9 @@ class FacebookPlugin extends Plugin case 'FBCSettingsNav': include_once INSTALLDIR . '/plugins/Facebook/FBCSettingsNav.php'; return false; + case 'FacebookQueueHandler': + include_once INSTALLDIR . '/plugins/Facebook/facebookqueuehandler.php'; + return false; default: return true; } @@ -508,50 +511,15 @@ class FacebookPlugin extends Plugin } /** - * broadcast the message when not using queuehandler + * Register Facebook notice queue handler * - * @param Notice &$notice the notice - * @param array $queue destination queue + * @param QueueManager $manager * * @return boolean hook return */ - - function onUnqueueHandleNotice(&$notice, $queue) - { - if (($queue == 'facebook') && ($this->_isLocal($notice))) { - facebookBroadcastNotice($notice); - return false; - } - return true; - } - - /** - * Determine whether the notice was locally created - * - * @param Notice $notice the notice - * - * @return boolean locality - */ - - function _isLocal($notice) - { - return ($notice->is_local == Notice::LOCAL_PUBLIC || - $notice->is_local == Notice::LOCAL_NONPUBLIC); - } - - /** - * Add Facebook queuehandler to the list of daemons to start - * - * @param array $daemons the list fo daemons to run - * - * @return boolean hook return - * - */ - - function onGetValidDaemons($daemons) + function onEndInitializeQueueManager($manager) { - array_push($daemons, INSTALLDIR . - '/plugins/Facebook/facebookqueuehandler.php'); + $manager->connect('facebook', 'FacebookQueueHandler'); return true; } diff --git a/plugins/Facebook/facebookqueuehandler.php b/plugins/Facebook/facebookqueuehandler.php old mode 100755 new mode 100644 index e4ae7d4ee7..1778690e5b --- a/plugins/Facebook/facebookqueuehandler.php +++ b/plugins/Facebook/facebookqueuehandler.php @@ -1,4 +1,3 @@ -#!/usr/bin/env php . */ -define('INSTALLDIR', realpath(dirname(__FILE__) . '/../..')); +if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_INFO, "INITIALIZE"); - return true; - } - function handle_notice($notice) { - return facebookBroadcastNotice($notice); + if ($this->_isLocal($notice)) { + return facebookBroadcastNotice($notice); + } + return true; } - function finish() + /** + * Determine whether the notice was locally created + * + * @param Notice $notice the notice + * + * @return boolean locality + */ + function _isLocal($notice) { + return ($notice->is_local == Notice::LOCAL_PUBLIC || + $notice->is_local == Notice::LOCAL_NONPUBLIC); } - } - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new FacebookQueueHandler($id); - -$handler->runOnce(); diff --git a/plugins/MemcachePlugin.php b/plugins/MemcachePlugin.php index 5f93e9a836..fbc2802f78 100644 --- a/plugins/MemcachePlugin.php +++ b/plugins/MemcachePlugin.php @@ -133,6 +133,23 @@ class MemcachePlugin extends Plugin return false; } + function onStartCacheReconnect(&$success) + { + if (empty($this->_conn)) { + // nothing to do + return true; + } + if ($this->persistent) { + common_log(LOG_ERR, "Cannot close persistent memcached connection"); + $success = false; + } else { + common_log(LOG_INFO, "Closing memcached connection"); + $success = $this->_conn->close(); + $this->_conn = null; + } + return false; + } + /** * Ensure that a connection exists * diff --git a/plugins/TwitterBridge/TwitterBridgePlugin.php b/plugins/TwitterBridge/TwitterBridgePlugin.php index a87ee2894a..57b3c1c995 100644 --- a/plugins/TwitterBridge/TwitterBridgePlugin.php +++ b/plugins/TwitterBridge/TwitterBridgePlugin.php @@ -112,7 +112,9 @@ class TwitterBridgePlugin extends Plugin strtolower(mb_substr($cls, 0, -6)) . '.php'; return false; case 'TwitterOAuthClient': - include_once INSTALLDIR . '/plugins/TwitterBridge/twitteroauthclient.php'; + case 'TwitterQueueHandler': + include_once INSTALLDIR . '/plugins/TwitterBridge/' . + strtolower($cls) . '.php'; return false; default: return true; @@ -138,48 +140,15 @@ class TwitterBridgePlugin extends Plugin return true; } - /** - * broadcast the message when not using queuehandler - * - * @param Notice &$notice the notice - * @param array $queue destination queue - * - * @return boolean hook return - */ - function onUnqueueHandleNotice(&$notice, $queue) - { - if (($queue == 'twitter') && ($this->_isLocal($notice))) { - broadcast_twitter($notice); - return false; - } - return true; - } - - /** - * Determine whether the notice was locally created - * - * @param Notice $notice - * - * @return boolean locality - */ - function _isLocal($notice) - { - return ($notice->is_local == Notice::LOCAL_PUBLIC || - $notice->is_local == Notice::LOCAL_NONPUBLIC); - } - /** * Add Twitter bridge daemons to the list of daemons to start * * @param array $daemons the list fo daemons to run * * @return boolean hook return - * */ function onGetValidDaemons($daemons) { - array_push($daemons, INSTALLDIR . - '/plugins/TwitterBridge/daemons/twitterqueuehandler.php'); array_push($daemons, INSTALLDIR . '/plugins/TwitterBridge/daemons/synctwitterfriends.php'); @@ -191,6 +160,19 @@ class TwitterBridgePlugin extends Plugin return true; } + /** + * Register Twitter notice queue handler + * + * @param QueueManager $manager + * + * @return boolean hook return + */ + function onEndInitializeQueueManager($manager) + { + $manager->connect('twitter', 'TwitterQueueHandler'); + return true; + } + function onPluginVersion(&$versions) { $versions[] = array('name' => 'TwitterBridge', diff --git a/plugins/TwitterBridge/daemons/twitterqueuehandler.php b/plugins/TwitterBridge/twitterqueuehandler.php old mode 100755 new mode 100644 similarity index 57% rename from plugins/TwitterBridge/daemons/twitterqueuehandler.php rename to plugins/TwitterBridge/twitterqueuehandler.php index f0e76bb745..5089ca7b74 --- a/plugins/TwitterBridge/daemons/twitterqueuehandler.php +++ b/plugins/TwitterBridge/twitterqueuehandler.php @@ -1,4 +1,3 @@ -#!/usr/bin/env php . */ -define('INSTALLDIR', realpath(dirname(__FILE__) . '/../../..')); +if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_INFO, "INITIALIZE"); - return true; - } - function handle_notice($notice) { return broadcast_twitter($notice); } - - function finish() - { - } - } - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new TwitterQueueHandler($id); - -$handler->runOnce(); diff --git a/scripts/getvaliddaemons.php b/scripts/getvaliddaemons.php index 99ad41b374..a332e06b58 100755 --- a/scripts/getvaliddaemons.php +++ b/scripts/getvaliddaemons.php @@ -37,19 +37,10 @@ require_once INSTALLDIR.'/scripts/commandline.inc'; $daemons = array(); -$daemons[] = INSTALLDIR.'/scripts/pluginqueuehandler.php'; -$daemons[] = INSTALLDIR.'/scripts/ombqueuehandler.php'; -$daemons[] = INSTALLDIR.'/scripts/pingqueuehandler.php'; +$daemons[] = INSTALLDIR.'/scripts/queuedaemon.php'; if(common_config('xmpp','enabled')) { $daemons[] = INSTALLDIR.'/scripts/xmppdaemon.php'; - $daemons[] = INSTALLDIR.'/scripts/jabberqueuehandler.php'; - $daemons[] = INSTALLDIR.'/scripts/publicqueuehandler.php'; - $daemons[] = INSTALLDIR.'/scripts/xmppconfirmhandler.php'; -} - -if (common_config('sms', 'enabled')) { - $daemons[] = INSTALLDIR.'/scripts/smsqueuehandler.php'; } if (Event::handle('GetValidDaemons', array(&$daemons))) { diff --git a/scripts/pingqueuehandler.php b/scripts/handlequeued.php similarity index 52% rename from scripts/pingqueuehandler.php rename to scripts/handlequeued.php index c92337e36c..9031437aac 100755 --- a/scripts/pingqueuehandler.php +++ b/scripts/handlequeued.php @@ -20,50 +20,37 @@ define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); -$shortoptions = 'i::'; -$longoptions = array('id::'); +$helptext = << +Run a single queued notice through background processing +as if it were being run through the queue. -$helptext = <<log(LOG_INFO, "INITIALIZE"); - return true; - } +if (count($args) != 2) { + show_help(); +} - function handle_notice($notice) { - return ping_broadcast_notice($notice); - } +$queue = trim($args[0]); +$noticeId = intval($args[1]); - function finish() { - } +$qm = QueueManager::get(); +$handler = $qm->getHandler($queue); +if (!$handler) { + print "No handler for queue '$queue'.\n"; + exit(1); } -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; +$notice = Notice::staticGet('id', $noticeId); +if (empty($notice)) { + print "Invalid notice id $noticeId\n"; + exit(1); } -$handler = new PingQueueHandler($id); - -$handler->runOnce(); +if (!$handler->handle_notice($notice)) { + print "Failed to handle notice id $noticeId on queue '$queue'.\n"; + exit(1); +} diff --git a/scripts/queuedaemon.php b/scripts/queuedaemon.php new file mode 100755 index 0000000000..8ef364fe7b --- /dev/null +++ b/scripts/queuedaemon.php @@ -0,0 +1,265 @@ +#!/usr/bin/env php +. + */ + +define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); + +$shortoptions = 'fi:at:'; +$longoptions = array('id=', 'foreground', 'all', 'threads='); + +/** + * Attempts to get a count of the processors available on the current system + * to fan out multiple threads. + * + * Recognizes Linux and Mac OS X; others will return default of 1. + * + * @return intval + */ +function getProcessorCount() +{ + $cpus = 0; + switch (PHP_OS) { + case 'Linux': + $cpuinfo = file('/proc/cpuinfo'); + foreach (file('/proc/cpuinfo') as $line) { + if (preg_match('/^processor\s+:\s+(\d+)\s?$/', $line)) { + $cpus++; + } + } + break; + case 'Darwin': + $cpus = intval(shell_exec("/usr/sbin/sysctl -n hw.ncpu 2>/dev/null")); + break; + } + if ($cpus) { + return $cpus; + } + return 1; +} + +$threads = getProcessorCount(); +$helptext = << Spawn processing threads (default $threads) + + +END_OF_QUEUE_HELP; + +require_once INSTALLDIR.'/scripts/commandline.inc'; + +require_once(INSTALLDIR.'/lib/daemon.php'); +require_once(INSTALLDIR.'/classes/Queue_item.php'); +require_once(INSTALLDIR.'/classes/Notice.php'); + +define('CLAIM_TIMEOUT', 1200); + +/** + * Queue handling daemon... + * + * The queue daemon by default launches in the background, at which point + * it'll pass control to the configured QueueManager class to poll for updates. + * + * We can then pass individual items through the QueueHandler subclasses + * they belong to. + */ +class QueueDaemon extends Daemon +{ + protected $allsites; + protected $threads=1; + + function __construct($id=null, $daemonize=true, $threads=1, $allsites=false) + { + parent::__construct($daemonize); + + if ($id) { + $this->set_id($id); + } + $this->all = $allsites; + $this->threads = $threads; + } + + /** + * How many seconds a polling-based queue manager should wait between + * checks for new items to handle. + * + * Defaults to 60 seconds; override to speed up or slow down. + * + * @return int timeout in seconds + */ + function timeout() + { + return 60; + } + + function name() + { + return strtolower(get_class($this).'.'.$this->get_id()); + } + + function run() + { + if ($this->threads > 1) { + return $this->runThreads(); + } else { + return $this->runLoop(); + } + } + + function runThreads() + { + $children = array(); + for ($i = 1; $i <= $this->threads; $i++) { + $pid = pcntl_fork(); + if ($pid < 0) { + print "Couldn't fork for thread $i; aborting\n"; + exit(1); + } else if ($pid == 0) { + $this->runChild($i); + exit(0); + } else { + $this->log(LOG_INFO, "Spawned thread $i as pid $pid"); + $children[$i] = $pid; + } + } + + $this->log(LOG_INFO, "Waiting for children to complete."); + while (count($children) > 0) { + $status = null; + $pid = pcntl_wait($status); + if ($pid > 0) { + $i = array_search($pid, $children); + if ($i === false) { + $this->log(LOG_ERR, "Unrecognized child pid $pid exited!"); + continue; + } + unset($children[$i]); + $this->log(LOG_INFO, "Thread $i pid $pid exited."); + + $pid = pcntl_fork(); + if ($pid < 0) { + print "Couldn't fork to respawn thread $i; aborting thread.\n"; + } else if ($pid == 0) { + $this->runChild($i); + exit(0); + } else { + $this->log(LOG_INFO, "Respawned thread $i as pid $pid"); + $children[$i] = $pid; + } + } + } + $this->log(LOG_INFO, "All child processes complete."); + return true; + } + + function runChild($thread) + { + $this->set_id($this->get_id() . "." . $thread); + $this->resetDb(); + $this->runLoop(); + } + + /** + * Reconnect to the database for each child process, + * or they'll get very confused trying to use the + * same socket. + */ + function resetDb() + { + // @fixme do we need to explicitly open the db too + // or is this implied? + global $_DB_DATAOBJECT; + unset($_DB_DATAOBJECT['CONNECTIONS']); + + // Reconnect main memcached, or threads will stomp on + // each other and corrupt their requests. + $cache = common_memcache(); + if ($cache) { + $cache->reconnect(); + } + + // Also reconnect memcached for status_network table. + if (!empty(Status_network::$cache)) { + Status_network::$cache->close(); + Status_network::$cache = null; + } + } + + /** + * Setup and start of run loop for this queue handler as a daemon. + * Most of the heavy lifting is passed on to the QueueManager's service() + * method, which passes control on to the QueueHandler's handle_notice() + * method for each notice that comes in on the queue. + * + * Most of the time this won't need to be overridden in a subclass. + * + * @return boolean true on success, false on failure + */ + function runLoop() + { + $this->log(LOG_INFO, 'checking for queued notices'); + + $master = new IoMaster($this->get_id()); + $master->init($this->all); + $master->service(); + + $this->log(LOG_INFO, 'finished servicing the queue'); + + $this->log(LOG_INFO, 'terminating normally'); + + return true; + } + + function log($level, $msg) + { + common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg); + } +} + +if (have_option('i')) { + $id = get_option_value('i'); +} else if (have_option('--id')) { + $id = get_option_value('--id'); +} else if (count($args) > 0) { + $id = $args[0]; +} else { + $id = null; +} + +if (have_option('t')) { + $threads = intval(get_option_value('t')); +} else if (have_option('--threads')) { + $threads = intval(get_option_value('--threads')); +} else { + $threads = 0; +} +if (!$threads) { + $threads = getProcessorCount(); +} + +$daemonize = !(have_option('f') || have_option('--foreground')); +$all = have_option('a') || have_option('--all'); + +$daemon = new QueueDaemon($id, $daemonize, $threads, $all); +$daemon->runOnce(); + diff --git a/scripts/xmppconfirmhandler.php b/scripts/xmppconfirmhandler.php deleted file mode 100755 index 2e39741369..0000000000 --- a/scripts/xmppconfirmhandler.php +++ /dev/null @@ -1,161 +0,0 @@ -#!/usr/bin/env php -. - */ - -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<start()) { - return false; - } - $this->log(LOG_INFO, 'checking for queued confirmations'); - do { - $confirm = $this->next_confirm(); - if ($confirm) { - $this->log(LOG_INFO, 'Sending confirmation for ' . $confirm->address); - $user = User::staticGet($confirm->user_id); - if (!$user) { - $this->log(LOG_WARNING, 'Confirmation for unknown user ' . $confirm->user_id); - continue; - } - $success = jabber_confirm_address($confirm->code, - $user->nickname, - $confirm->address); - if (!$success) { - $this->log(LOG_ERR, 'Confirmation failed for ' . $confirm->address); - # Just let the claim age out; hopefully things work then - continue; - } else { - $this->log(LOG_INFO, 'Confirmation sent for ' . $confirm->address); - # Mark confirmation sent; need a dupe so we don't have the WHERE clause - $dupe = Confirm_address::staticGet('code', $confirm->code); - if (!$dupe) { - common_log(LOG_WARNING, 'Could not refetch confirm', __FILE__); - continue; - } - $orig = clone($dupe); - $dupe->sent = $dupe->claimed; - $result = $dupe->update($orig); - if (!$result) { - common_log_db_error($dupe, 'UPDATE', __FILE__); - # Just let the claim age out; hopefully things work then - continue; - } - $dupe->free(); - unset($dupe); - } - $user->free(); - unset($user); - $confirm->free(); - unset($confirm); - $this->idle(0); - } else { -# $this->clear_old_confirm_claims(); - $this->idle(10); - } - } while (true); - if (!$this->finish()) { - return false; - } - return true; - } - - function next_confirm() - { - $confirm = new Confirm_address(); - $confirm->whereAdd('claimed IS null'); - $confirm->whereAdd('sent IS null'); - # XXX: eventually we could do other confirmations in the queue, too - $confirm->address_type = 'jabber'; - $confirm->orderBy('modified DESC'); - $confirm->limit(1); - if ($confirm->find(true)) { - $this->log(LOG_INFO, 'Claiming confirmation for ' . $confirm->address); - # working around some weird DB_DataObject behaviour - $confirm->whereAdd(''); # clears where stuff - $original = clone($confirm); - $confirm->claimed = common_sql_now(); - $result = $confirm->update($original); - if ($result) { - $this->log(LOG_INFO, 'Succeeded in claim! '. $result); - return $confirm; - } else { - $this->log(LOG_INFO, 'Failed in claim!'); - return false; - } - } - return null; - } - - function clear_old_confirm_claims() - { - $confirm = new Confirm(); - $confirm->claimed = null; - $confirm->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); - $confirm->update(DB_DATAOBJECT_WHEREADD_ONLY); - $confirm->free(); - unset($confirm); - } -} - -// Abort immediately if xmpp is not enabled, otherwise the daemon chews up -// lots of CPU trying to connect to unconfigured servers -if (common_config('xmpp','enabled')==false) { - print "Aborting daemon - xmpp is disabled\n"; - exit(); -} - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new XmppConfirmHandler($id); - -$handler->runOnce(); - -- 2.39.5