. * * @category QueueManager * @package StatusNet * @author Evan Prodromou * @author Sarven Capadisli * @author Brion Vibber * @copyright 2009-2010 StatusNet, Inc. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 * @link http://status.net/ */ /** * Completed child classes must implement the enqueue() method. * * For background processing, classes should implement either socket-based * input (handleInput(), getSockets()) or idle-loop polling (idle()). */ abstract class QueueManager extends IoManager { static $qm = null; protected $master = null; protected $handlers = array(); protected $groups = array(); protected $activeGroups = array(); /** * Factory function to pull the appropriate QueueManager object * for this site's configuration. It can then be used to queue * events for later processing or to spawn a processing loop. * * Plugins can add to the built-in types by hooking StartNewQueueManager. * * @return QueueManager */ public static function get() { if (empty(self::$qm)) { if (Event::handle('StartNewQueueManager', array(&self::$qm))) { $enabled = common_config('queue', 'enabled'); $type = common_config('queue', 'subsystem'); if (!$enabled) { // does everything immediately self::$qm = new UnQueueManager(); } else { switch ($type) { case 'db': self::$qm = new DBQueueManager(); break; case 'stomp': self::$qm = new StompQueueManager(); break; default: throw new ServerException("No queue manager class for type '$type'"); } } } } return self::$qm; } /** * @fixme wouldn't necessarily work with other class types. * Better to change the interface...? */ public static function multiSite() { if (common_config('queue', 'subsystem') == 'stomp') { return IoManager::INSTANCE_PER_PROCESS; } else { return IoManager::SINGLE_ONLY; } } function __construct() { $this->initialize(); } /** * Optional; ping any running queue handler daemons with a notification * such as announcing a new site to handle or requesting clean shutdown. * This avoids having to restart all the daemons manually to update configs * and such. * * Called from scripts/queuectl.php controller utility. * * @param string $event event key * @param string $param optional parameter to append to key * @return boolean success */ public function sendControlSignal($event, $param='') { throw new Exception(get_class($this) . " does not support control signals."); } /** * Store an object (usually/always a Notice) into the given queue * for later processing. No guarantee is made on when it will be * processed; it could be immediately or at some unspecified point * in the future. * * Must be implemented by any queue manager. * * @param Notice $object * @param string $queue */ abstract function enqueue($object, $queue); /** * Build a representation for an object for logging * @param mixed * @return string */ function logrep($object) { if (is_object($object)) { $class = get_class($object); if (isset($object->id)) { return "$class $object->id"; } return $class; } if (is_string($object)) { $len = strlen($object); $fragment = mb_substr($object, 0, 32); if (mb_strlen($object) > 32) { $fragment .= '...'; } return "string '$fragment' ($len bytes)"; } return strval($object); } /** * Encode an object or variable for queued storage. * Notice objects are currently stored as an id reference; * other items are serialized. * * @param mixed $item * @return string */ protected function encode($item) { if ($item instanceof Notice) { // Backwards compat return $item->id; } else { return serialize($item); } } /** * Decode an object from queued storage. * Accepts notice reference entries and serialized items. * * @param string * @return mixed */ protected function decode($frame) { if (is_numeric($frame)) { // Back-compat for notices... return Notice::staticGet(intval($frame)); } elseif (substr($frame, 0, 1) == '<') { // Back-compat for XML source return $frame; } else { // Deserialize! #$old = error_reporting(); #error_reporting($old & ~E_NOTICE); $out = unserialize($frame); #error_reporting($old); if ($out === false && $frame !== 'b:0;') { common_log(LOG_ERR, "Couldn't unserialize queued frame: $frame"); return false; } return $out; } } /** * Instantiate the appropriate QueueHandler class for the given queue. * * @param string $queue * @return mixed QueueHandler or null */ function getHandler($queue) { if (isset($this->handlers[$queue])) { $class = $this->handlers[$queue]; if(is_object($class)) { return $class; } else if (class_exists($class)) { return new $class(); } else { $this->_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); } } else { $this->_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); } return null; } /** * Get a list of registered queue transport names to be used * for listening in this daemon. * * @return array of strings */ function activeQueues() { $queues = array(); foreach ($this->activeGroups as $group) { if (isset($this->groups[$group])) { $queues = array_merge($queues, $this->groups[$group]); } } return array_keys($queues); } /** * Initialize the list of queue handlers for the current site. * * @event StartInitializeQueueManager * @event EndInitializeQueueManager */ function initialize() { $this->handlers = array(); $this->groups = array(); $this->groupsByTransport = array(); if (Event::handle('StartInitializeQueueManager', array($this))) { $this->connect('distrib', 'DistribQueueHandler'); $this->connect('omb', 'OmbQueueHandler'); $this->connect('ping', 'PingQueueHandler'); if (common_config('sms', 'enabled')) { $this->connect('sms', 'SmsQueueHandler'); } // Broadcasting profile updates to OMB remote subscribers $this->connect('profile', 'ProfileQueueHandler'); // XMPP output handlers... if (common_config('xmpp', 'enabled')) { // Delivery prep, read by queuedaemon.php: $this->connect('jabber', 'JabberQueueHandler'); $this->connect('public', 'PublicQueueHandler'); // Raw output, read by xmppdaemon.php: $this->connect('xmppout', 'XmppOutQueueHandler', 'xmpp'); } // For compat with old plugins not registering their own handlers. $this->connect('plugin', 'PluginQueueHandler'); } Event::handle('EndInitializeQueueManager', array($this)); } /** * Register a queue transport name and handler class for your plugin. * Only registered transports will be reliably picked up! * * @param string $transport * @param string $class class name or object instance * @param string $group */ public function connect($transport, $class, $group='main') { $this->handlers[$transport] = $class; $this->groups[$group][$transport] = $class; $this->groupsByTransport[$transport] = $group; } /** * Set the active group which will be used for listening. * @param string $group */ function setActiveGroup($group) { $this->activeGroups = array($group); } /** * Set the active group(s) which will be used for listening. * @param array $groups */ function setActiveGroups($groups) { $this->activeGroups = $groups; } /** * @return string queue group for this queue */ function queueGroup($queue) { if (isset($this->groupsByTransport[$queue])) { return $this->groupsByTransport[$queue]; } else { throw new Exception("Requested group for unregistered transport $queue"); } } /** * Send a statistic ping to the queue monitoring system, * optionally with a per-queue id. * * @param string $key * @param string $queue */ function stats($key, $queue=false) { $owners = array(); if ($queue) { $owners[] = "queue:$queue"; $owners[] = "site:" . common_config('site', 'server'); } if (isset($this->master)) { $this->master->stats($key, $owners); } else { $monitor = new QueueMonitor(); $monitor->stats($key, $owners); } } protected function _log($level, $msg) { $class = get_class($this); if ($this->activeGroups) { $groups = ' (' . implode(',', $this->activeGroups) . ')'; } else { $groups = ''; } common_log($level, "$class$groups: $msg"); } }