X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=lib%2Fqueuemanager.php;h=149617eb508f1139c2502a110f57756b1a251744;hb=9465a4d5c6b77bd77e78bc277a817e2b5724ec76;hp=43105b7a86ed19ac1fed1aa66b9bf672b15cc5f2;hpb=bbb830e14c718c687f0636710a1827c90b11f4cc;p=quix0rs-gnu-social.git diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 43105b7a86..149617eb50 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -2,7 +2,7 @@ /** * StatusNet, the distributed open-source microblogging tool * - * Abstract class for queue managers + * Abstract class for i/o managers * * PHP version 5 * @@ -23,16 +23,36 @@ * @package StatusNet * @author Evan Prodromou * @author Sarven Capadisli - * @copyright 2009 StatusNet, Inc. + * @author Brion Vibber + * @copyright 2009-2010 StatusNet, Inc. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 * @link http://status.net/ */ -class QueueManager +/** + * Completed child classes must implement the enqueue() method. + * + * For background processing, classes should implement either socket-based + * input (handleInput(), getSockets()) or idle-loop polling (idle()). + */ +abstract class QueueManager extends IoManager { static $qm = null; - static function get() + public $master = null; + public $handlers = array(); + public $groups = array(); + + /** + * Factory function to pull the appropriate QueueManager object + * for this site's configuration. It can then be used to queue + * events for later processing or to spawn a processing loop. + * + * Plugins can add to the built-in types by hooking StartNewQueueManager. + * + * @return QueueManager + */ + public static function get() { if (empty(self::$qm)) { @@ -62,13 +82,240 @@ class QueueManager return self::$qm; } - function enqueue($object, $queue) + /** + * @fixme wouldn't necessarily work with other class types. + * Better to change the interface...? + */ + public static function multiSite() + { + if (common_config('queue', 'subsystem') == 'stomp') { + return IoManager::INSTANCE_PER_PROCESS; + } else { + return IoManager::SINGLE_ONLY; + } + } + + function __construct() + { + $this->initialize(); + } + + /** + * Optional; ping any running queue handler daemons with a notification + * such as announcing a new site to handle or requesting clean shutdown. + * This avoids having to restart all the daemons manually to update configs + * and such. + * + * Called from scripts/queuectl.php controller utility. + * + * @param string $event event key + * @param string $param optional parameter to append to key + * @return boolean success + */ + public function sendControlSignal($event, $param='') + { + throw new Exception(get_class($this) . " does not support control signals."); + } + + /** + * Store an object (usually/always a Notice) into the given queue + * for later processing. No guarantee is made on when it will be + * processed; it could be immediately or at some unspecified point + * in the future. + * + * Must be implemented by any queue manager. + * + * @param Notice $object + * @param string $queue + */ + abstract function enqueue($object, $queue); + + /** + * Build a representation for an object for logging + * @param mixed + * @return string + */ + function logrep($object) { + if (is_object($object)) { + $class = get_class($object); + if (isset($object->id)) { + return "$class $object->id"; + } + return $class; + } + if (is_string($object)) { + $len = strlen($object); + $fragment = mb_substr($object, 0, 32); + if (mb_strlen($object) > 32) { + $fragment .= '...'; + } + return "string '$fragment' ($len bytes)"; + } + return strval($object); + } + + /** + * Encode an object or variable for queued storage. + * Notice objects are currently stored as an id reference; + * other items are serialized. + * + * @param mixed $item + * @return string + */ + protected function encode($item) { - throw ServerException("Unimplemented function 'enqueue' called"); + if ($item instanceof Notice) { + // Backwards compat + return $item->id; + } else { + return serialize($item); + } + } + + /** + * Decode an object from queued storage. + * Accepts notice reference entries and serialized items. + * + * @param string + * @return mixed + */ + protected function decode($frame) + { + if (is_numeric($frame)) { + // Back-compat for notices... + return Notice::staticGet(intval($frame)); + } elseif (substr($frame, 0, 1) == '<') { + // Back-compat for XML source + return $frame; + } else { + // Deserialize! + #$old = error_reporting(); + #error_reporting($old & ~E_NOTICE); + $out = unserialize($frame); + #error_reporting($old); + + if ($out === false && $frame !== 'b:0;') { + common_log(LOG_ERR, "Couldn't unserialize queued frame: $frame"); + return false; + } + return $out; + } } - function service($queue, $handler) + /** + * Instantiate the appropriate QueueHandler class for the given queue. + * + * @param string $queue + * @return mixed QueueHandler or null + */ + function getHandler($queue) { - throw ServerException("Unimplemented function 'service' called"); + if (isset($this->handlers[$queue])) { + $class = $this->handlers[$queue]; + if (class_exists($class)) { + return new $class(); + } else { + common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); + } + } else { + common_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); + } + return null; + } + + /** + * Get a list of registered queue transport names to be used + * for this daemon. + * + * @return array of strings + */ + function getQueues() + { + $group = $this->activeGroup(); + return array_keys($this->groups[$group]); + } + + /** + * Initialize the list of queue handlers + * + * @event StartInitializeQueueManager + * @event EndInitializeQueueManager + */ + function initialize() + { + // @fixme we'll want to be able to listen to particular queues... + if (Event::handle('StartInitializeQueueManager', array($this))) { + $this->connect('plugin', 'PluginQueueHandler'); + $this->connect('omb', 'OmbQueueHandler'); + $this->connect('ping', 'PingQueueHandler'); + $this->connect('distrib', 'DistribQueueHandler'); + if (common_config('sms', 'enabled')) { + $this->connect('sms', 'SmsQueueHandler'); + } + + // XMPP output handlers... + $this->connect('jabber', 'JabberQueueHandler'); + $this->connect('public', 'PublicQueueHandler'); + // @fixme this should get an actual queue + //$this->connect('confirm', 'XmppConfirmHandler'); + + // For compat with old plugins not registering their own handlers. + $this->connect('plugin', 'PluginQueueHandler'); + + $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon'); + + } + Event::handle('EndInitializeQueueManager', array($this)); + } + + /** + * Register a queue transport name and handler class for your plugin. + * Only registered transports will be reliably picked up! + * + * @param string $transport + * @param string $class + * @param string $group + */ + public function connect($transport, $class, $group='queuedaemon') + { + $this->handlers[$transport] = $class; + $this->groups[$group][$transport] = $class; + } + + /** + * @return string queue group to use for this request + */ + function activeGroup() + { + $group = 'queuedaemon'; + if ($this->master) { + // hack hack + if ($this->master instanceof XmppMaster) { + return 'xmppdaemon'; + } + } + return $group; + } + + /** + * Send a statistic ping to the queue monitoring system, + * optionally with a per-queue id. + * + * @param string $key + * @param string $queue + */ + function stats($key, $queue=false) + { + $owners = array(); + if ($queue) { + $owners[] = "queue:$queue"; + $owners[] = "site:" . common_config('site', 'server'); + } + if (isset($this->master)) { + $this->master->stats($key, $owners); + } else { + $monitor = new QueueMonitor(); + $monitor->stats($key, $owners); + } } }