X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=lib%2Fqueuemanager.php;h=0829c8a8bcb8321d1aba5deaa55d8e4640d4d9eb;hb=69e621a3e882cd060eb4314554aada7167edd897;hp=291174d3c4729a7ac11e5f561fc497ff0ce3b17a;hpb=02a6006bafd663443b512c5c283b64c7dacfbbb1;p=quix0rs-gnu-social.git diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 291174d3c4..0829c8a8bc 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -39,6 +39,11 @@ abstract class QueueManager extends IoManager { static $qm = null; + protected $master = null; + protected $handlers = array(); + protected $groups = array(); + protected $activeGroups = array(); + /** * Factory function to pull the appropriate QueueManager object * for this site's configuration. It can then be used to queue @@ -96,6 +101,23 @@ abstract class QueueManager extends IoManager $this->initialize(); } + /** + * Optional; ping any running queue handler daemons with a notification + * such as announcing a new site to handle or requesting clean shutdown. + * This avoids having to restart all the daemons manually to update configs + * and such. + * + * Called from scripts/queuectl.php controller utility. + * + * @param string $event event key + * @param string $param optional parameter to append to key + * @return boolean success + */ + public function sendControlSignal($event, $param='') + { + throw new Exception(get_class($this) . " does not support control signals."); + } + /** * Store an object (usually/always a Notice) into the given queue * for later processing. No guarantee is made on when it will be @@ -109,6 +131,78 @@ abstract class QueueManager extends IoManager */ abstract function enqueue($object, $queue); + /** + * Build a representation for an object for logging + * @param mixed + * @return string + */ + function logrep($object) { + if (is_object($object)) { + $class = get_class($object); + if (isset($object->id)) { + return "$class $object->id"; + } + return $class; + } + if (is_string($object)) { + $len = strlen($object); + $fragment = mb_substr($object, 0, 32); + if (mb_strlen($object) > 32) { + $fragment .= '...'; + } + return "string '$fragment' ($len bytes)"; + } + return strval($object); + } + + /** + * Encode an object or variable for queued storage. + * Notice objects are currently stored as an id reference; + * other items are serialized. + * + * @param mixed $item + * @return string + */ + protected function encode($item) + { + if ($item instanceof Notice) { + // Backwards compat + return $item->id; + } else { + return serialize($item); + } + } + + /** + * Decode an object from queued storage. + * Accepts notice reference entries and serialized items. + * + * @param string + * @return mixed + */ + protected function decode($frame) + { + if (is_numeric($frame)) { + // Back-compat for notices... + return Notice::staticGet(intval($frame)); + } elseif (substr($frame, 0, 1) == '<') { + // Back-compat for XML source + return $frame; + } else { + // Deserialize! + #$old = error_reporting(); + #error_reporting($old & ~E_NOTICE); + $out = unserialize($frame); + #error_reporting($old); + + if ($out === false && $frame !== 'b:0;') { + common_log(LOG_ERR, "Couldn't unserialize queued frame: $frame"); + return false; + } + return $out; + } + } + /** * Instantiate the appropriate QueueHandler class for the given queue. * @@ -119,62 +213,77 @@ abstract class QueueManager extends IoManager { if (isset($this->handlers[$queue])) { $class = $this->handlers[$queue]; - if (class_exists($class)) { + if(is_object($class)) { + return $class; + } else if (class_exists($class)) { return new $class(); } else { - common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); + $this->_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); } } else { - common_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); + $this->_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); } return null; } /** - * Get a list of all registered queue transport names. + * Get a list of registered queue transport names to be used + * for listening in this daemon. * * @return array of strings */ - function getQueues() + function activeQueues() { - return array_keys($this->handlers); + $queues = array(); + foreach ($this->activeGroups as $group) { + if (isset($this->groups[$group])) { + $queues = array_merge($queues, $this->groups[$group]); + } + } + + return array_keys($queues); } /** - * Initialize the list of queue handlers + * Initialize the list of queue handlers for the current site. * * @event StartInitializeQueueManager * @event EndInitializeQueueManager */ function initialize() { + $this->handlers = array(); + $this->groups = array(); + $this->groupsByTransport = array(); + if (Event::handle('StartInitializeQueueManager', array($this))) { - if (!defined('XMPP_ONLY_FLAG')) { // hack! - $this->connect('plugin', 'PluginQueueHandler'); - $this->connect('omb', 'OmbQueueHandler'); - $this->connect('ping', 'PingQueueHandler'); - if (common_config('sms', 'enabled')) { - $this->connect('sms', 'SmsQueueHandler'); - } + $this->connect('distrib', 'DistribQueueHandler'); + $this->connect('omb', 'OmbQueueHandler'); + $this->connect('ping', 'PingQueueHandler'); + if (common_config('sms', 'enabled')) { + $this->connect('sms', 'SmsQueueHandler'); } + // Background user management tasks... + $this->connect('deluser', 'DelUserQueueHandler'); + + // Broadcasting profile updates to OMB remote subscribers + $this->connect('profile', 'ProfileQueueHandler'); + // XMPP output handlers... - if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) { + if (common_config('xmpp', 'enabled')) { + // Delivery prep, read by queuedaemon.php: $this->connect('jabber', 'JabberQueueHandler'); $this->connect('public', 'PublicQueueHandler'); - - // @fixme this should move up a level or should get an actual queue - $this->connect('confirm', 'XmppConfirmHandler'); - } - if (!defined('XMPP_ONLY_FLAG')) { // hack! - // For compat with old plugins not registering their own handlers. - $this->connect('plugin', 'PluginQueueHandler'); + // Raw output, read by xmppdaemon.php: + $this->connect('xmppout', 'XmppOutQueueHandler', 'xmpp'); } + + // For compat with old plugins not registering their own handlers. + $this->connect('plugin', 'PluginQueueHandler'); } - if (!defined('XMPP_ONLY_FLAG')) { // hack! - Event::handle('EndInitializeQueueManager', array($this)); - } + Event::handle('EndInitializeQueueManager', array($this)); } /** @@ -182,11 +291,44 @@ abstract class QueueManager extends IoManager * Only registered transports will be reliably picked up! * * @param string $transport - * @param string $class + * @param string $class class name or object instance + * @param string $group */ - public function connect($transport, $class) + public function connect($transport, $class, $group='main') { $this->handlers[$transport] = $class; + $this->groups[$group][$transport] = $class; + $this->groupsByTransport[$transport] = $group; + } + + /** + * Set the active group which will be used for listening. + * @param string $group + */ + function setActiveGroup($group) + { + $this->activeGroups = array($group); + } + + /** + * Set the active group(s) which will be used for listening. + * @param array $groups + */ + function setActiveGroups($groups) + { + $this->activeGroups = $groups; + } + + /** + * @return string queue group for this queue + */ + function queueGroup($queue) + { + if (isset($this->groupsByTransport[$queue])) { + return $this->groupsByTransport[$queue]; + } else { + throw new Exception("Requested group for unregistered transport $queue"); + } } /** @@ -210,4 +352,15 @@ abstract class QueueManager extends IoManager $monitor->stats($key, $owners); } } + + protected function _log($level, $msg) + { + $class = get_class($this); + if ($this->activeGroups) { + $groups = ' (' . implode(',', $this->activeGroups) . ')'; + } else { + $groups = ''; + } + common_log($level, "$class$groups: $msg"); + } }