X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=lib%2Fqueuemanager.php;h=d42e4b4b57e88ac31f7b35ddd6f22e46088ea467;hb=10f2cde0b1d75fa023b00400162cb525e8719514;hp=4eb39bfa8c72ecf75ef76630244b3a513fe42a97;hpb=611924e814320c768771af21a923e615118c1feb;p=quix0rs-gnu-social.git diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 4eb39bfa8c..d42e4b4b57 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -39,9 +39,10 @@ abstract class QueueManager extends IoManager { static $qm = null; - public $master = null; - public $handlers = array(); - public $groups = array(); + protected $master = null; + protected $handlers = array(); + protected $groups = array(); + protected $activeGroups = array(); /** * Factory function to pull the appropriate QueueManager object @@ -100,6 +101,23 @@ abstract class QueueManager extends IoManager $this->initialize(); } + /** + * Optional; ping any running queue handler daemons with a notification + * such as announcing a new site to handle or requesting clean shutdown. + * This avoids having to restart all the daemons manually to update configs + * and such. + * + * Called from scripts/queuectl.php controller utility. + * + * @param string $event event key + * @param string $param optional parameter to append to key + * @return boolean success + */ + public function sendControlSignal($event, $param='') + { + throw new Exception(get_class($this) . " does not support control signals."); + } + /** * Store an object (usually/always a Notice) into the given queue * for later processing. No guarantee is made on when it will be @@ -125,50 +143,77 @@ abstract class QueueManager extends IoManager return "$class $object->id"; } return $class; - } - if (is_string($object)) { + } elseif (is_string($object)) { $len = strlen($object); $fragment = mb_substr($object, 0, 32); if (mb_strlen($object) > 32) { $fragment .= '...'; } return "string '$fragment' ($len bytes)"; + } elseif (is_array($object)) { + return 'array with ' . count($object) . + ' elements (keys:[' . implode(',', array_keys($object)) . '])'; } return strval($object); } /** * Encode an object for queued storage. - * Next gen may use serialization. * - * @param mixed $object + * @param mixed $item * @return string */ - protected function encode($object) + protected function encode($item) { - if ($object instanceof Notice) { - return $object->id; - } else if (is_string($object)) { - return $object; - } else { - throw new ServerException("Can't queue this type", 500); - } + return serialize($item); } /** * Decode an object from queued storage. - * Accepts back-compat notice reference entries and strings for now. + * Accepts notice reference entries and serialized items. * * @param string * @return mixed */ protected function decode($frame) { - if (is_numeric($frame)) { - return Notice::staticGet(intval($frame)); - } else { - return $frame; + $object = unserialize($frame); + + // If it is a string, we really store a JSON object in there + // except if it begins with '<', because then it is XML. + if (is_string($object) && + substr($object, 0, 1) != '<' && + !is_numeric($object)) + { + $json = json_decode($object); + if ($json === null) { + throw new Exception('Bad frame in queue item'); + } + + // The JSON object has a type parameter which contains the class + if (empty($json->type)) { + throw new Exception('Type not specified for queue item'); + } + if (!is_a($json->type, 'Managed_DataObject', true)) { + throw new Exception('Managed_DataObject class does not exist for queue item'); + } + + // And each of these types should have a unique id (or uri) + if (isset($json->id) && !empty($json->id)) { + $object = call_user_func(array($json->type, 'getKV'), 'id', $json->id); + } elseif (isset($json->uri) && !empty($json->uri)) { + $object = call_user_func(array($json->type, 'getKV'), 'uri', $json->uri); + } + + // But if no object was found, there's nothing we can handle + if (!$object instanceof Managed_DataObject) { + throw new Exception('Queue item frame referenced a non-existant object'); + } } + + // If the frame was not a string, it's either an array or an object. + + return $object; } /** @@ -181,58 +226,65 @@ abstract class QueueManager extends IoManager { if (isset($this->handlers[$queue])) { $class = $this->handlers[$queue]; - if (class_exists($class)) { + if(is_object($class)) { + return $class; + } else if (class_exists($class)) { return new $class(); } else { - common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); + $this->_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); } } else { - common_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); + $this->_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); } return null; } /** * Get a list of registered queue transport names to be used - * for this daemon. + * for listening in this daemon. * * @return array of strings */ - function getQueues() + function activeQueues() { - $group = $this->activeGroup(); - return array_keys($this->groups[$group]); + $queues = array(); + foreach ($this->activeGroups as $group) { + if (isset($this->groups[$group])) { + $queues = array_merge($queues, $this->groups[$group]); + } + } + + return array_keys($queues); } /** - * Initialize the list of queue handlers + * Initialize the list of queue handlers for the current site. * * @event StartInitializeQueueManager * @event EndInitializeQueueManager */ function initialize() { - // @fixme we'll want to be able to listen to particular queues... + $this->handlers = array(); + $this->groups = array(); + $this->groupsByTransport = array(); + if (Event::handle('StartInitializeQueueManager', array($this))) { - $this->connect('plugin', 'PluginQueueHandler'); - $this->connect('omb', 'OmbQueueHandler'); + $this->connect('distrib', 'DistribQueueHandler'); $this->connect('ping', 'PingQueueHandler'); if (common_config('sms', 'enabled')) { $this->connect('sms', 'SmsQueueHandler'); } - // XMPP output handlers... - $this->connect('jabber', 'JabberQueueHandler'); - $this->connect('public', 'PublicQueueHandler'); - - // @fixme this should get an actual queue - //$this->connect('confirm', 'XmppConfirmHandler'); + // Background user management tasks... + $this->connect('deluser', 'DelUserQueueHandler'); + $this->connect('feedimp', 'FeedImporter'); + $this->connect('actimp', 'ActivityImporter'); + $this->connect('acctmove', 'AccountMover'); + $this->connect('actmove', 'ActivityMover'); // For compat with old plugins not registering their own handlers. $this->connect('plugin', 'PluginQueueHandler'); - - $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon'); - } Event::handle('EndInitializeQueueManager', array($this)); } @@ -242,28 +294,44 @@ abstract class QueueManager extends IoManager * Only registered transports will be reliably picked up! * * @param string $transport - * @param string $class + * @param string $class class name or object instance * @param string $group */ - public function connect($transport, $class, $group='queuedaemon') + public function connect($transport, $class, $group='main') { $this->handlers[$transport] = $class; $this->groups[$group][$transport] = $class; + $this->groupsByTransport[$transport] = $group; + } + + /** + * Set the active group which will be used for listening. + * @param string $group + */ + function setActiveGroup($group) + { + $this->activeGroups = array($group); } /** - * @return string queue group to use for this request + * Set the active group(s) which will be used for listening. + * @param array $groups */ - function activeGroup() + function setActiveGroups($groups) { - $group = 'queuedaemon'; - if ($this->master) { - // hack hack - if ($this->master instanceof XmppMaster) { - return 'xmppdaemon'; - } + $this->activeGroups = $groups; + } + + /** + * @return string queue group for this queue + */ + function queueGroup($queue) + { + if (isset($this->groupsByTransport[$queue])) { + return $this->groupsByTransport[$queue]; + } else { + throw new Exception("Requested group for unregistered transport $queue"); } - return $group; } /** @@ -287,4 +355,15 @@ abstract class QueueManager extends IoManager $monitor->stats($key, $owners); } } + + protected function _log($level, $msg) + { + $class = get_class($this); + if ($this->activeGroups) { + $groups = ' (' . implode(',', $this->activeGroups) . ')'; + } else { + $groups = ''; + } + common_log($level, "$class$groups: $msg"); + } }