]> git.mxchange.org Git - quix0rs-gnu-social.git/blobdiff - lib/queuemanager.php
Allow for instances as well as class names to be passed as queue handlers and iomanagers.
[quix0rs-gnu-social.git] / lib / queuemanager.php
index 291174d3c4729a7ac11e5f561fc497ff0ce3b17a..b2e86b127ec9efdb9e04eb31594420f09766d079 100644 (file)
@@ -39,6 +39,10 @@ abstract class QueueManager extends IoManager
 {
     static $qm = null;
 
+    public $master = null;
+    public $handlers = array();
+    public $groups = array();
+
     /**
      * Factory function to pull the appropriate QueueManager object
      * for this site's configuration. It can then be used to queue
@@ -109,6 +113,64 @@ abstract class QueueManager extends IoManager
      */
     abstract function enqueue($object, $queue);
 
+    /**
+     * Build a representation for an object for logging
+     * @param mixed
+     * @return string
+     */
+    function logrep($object) {
+        if (is_object($object)) {
+            $class = get_class($object);
+            if (isset($object->id)) {
+                return "$class $object->id";
+            }
+            return $class;
+        }
+        if (is_string($object)) {
+            $len = strlen($object);
+            $fragment = mb_substr($object, 0, 32);
+            if (mb_strlen($object) > 32) {
+                $fragment .= '...';
+            }
+            return "string '$fragment' ($len bytes)";
+        }
+        return strval($object);
+    }
+
+    /**
+     * Encode an object for queued storage.
+     * Next gen may use serialization.
+     *
+     * @param mixed $object
+     * @return string
+     */
+    protected function encode($object)
+    {
+        if ($object instanceof Notice) {
+            return $object->id;
+        } else if (is_string($object)) {
+            return $object;
+        } else {
+            throw new ServerException("Can't queue this type", 500);
+        }
+    }
+
+    /**
+     * Decode an object from queued storage.
+     * Accepts back-compat notice reference entries and strings for now.
+     *
+     * @param string
+     * @return mixed
+     */
+    protected function decode($frame)
+    {
+        if (is_numeric($frame)) {
+            return Notice::staticGet(intval($frame));
+        } else {
+            return $frame;
+        }
+    }
+
     /**
      * Instantiate the appropriate QueueHandler class for the given queue.
      *
@@ -119,7 +181,9 @@ abstract class QueueManager extends IoManager
     {
         if (isset($this->handlers[$queue])) {
             $class = $this->handlers[$queue];
-            if (class_exists($class)) {
+            if(is_object($class)) {
+                return $class;
+            } else if (class_exists($class)) {
                 return new $class();
             } else {
                 common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
@@ -131,13 +195,15 @@ abstract class QueueManager extends IoManager
     }
 
     /**
-     * Get a list of all registered queue transport names.
+     * Get a list of registered queue transport names to be used
+     * for this daemon.
      *
      * @return array of strings
      */
     function getQueues()
     {
-        return array_keys($this->handlers);
+        $group = $this->activeGroup();
+        return array_keys($this->groups[$group]);
     }
 
     /**
@@ -148,33 +214,29 @@ abstract class QueueManager extends IoManager
      */
     function initialize()
     {
+        // @fixme we'll want to be able to listen to particular queues...
         if (Event::handle('StartInitializeQueueManager', array($this))) {
-            if (!defined('XMPP_ONLY_FLAG')) { // hack!
-                $this->connect('plugin', 'PluginQueueHandler');
-                $this->connect('omb', 'OmbQueueHandler');
-                $this->connect('ping', 'PingQueueHandler');
-                if (common_config('sms', 'enabled')) {
-                    $this->connect('sms', 'SmsQueueHandler');
-                }
+            $this->connect('plugin', 'PluginQueueHandler');
+            $this->connect('omb', 'OmbQueueHandler');
+            $this->connect('ping', 'PingQueueHandler');
+            if (common_config('sms', 'enabled')) {
+                $this->connect('sms', 'SmsQueueHandler');
             }
 
             // XMPP output handlers...
-            if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
-                $this->connect('jabber', 'JabberQueueHandler');
-                $this->connect('public', 'PublicQueueHandler');
-                
-                // @fixme this should move up a level or should get an actual queue
-                $this->connect('confirm', 'XmppConfirmHandler');
-            }
+            $this->connect('jabber', 'JabberQueueHandler');
+            $this->connect('public', 'PublicQueueHandler');
+            
+            // @fixme this should get an actual queue
+            //$this->connect('confirm', 'XmppConfirmHandler');
+
+            // For compat with old plugins not registering their own handlers.
+            $this->connect('plugin', 'PluginQueueHandler');
+
+            $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon');
 
-            if (!defined('XMPP_ONLY_FLAG')) { // hack!
-                // For compat with old plugins not registering their own handlers.
-                $this->connect('plugin', 'PluginQueueHandler');
-            }
-        }
-        if (!defined('XMPP_ONLY_FLAG')) { // hack!
-            Event::handle('EndInitializeQueueManager', array($this));
         }
+        Event::handle('EndInitializeQueueManager', array($this));
     }
 
     /**
@@ -182,11 +244,28 @@ abstract class QueueManager extends IoManager
      * Only registered transports will be reliably picked up!
      *
      * @param string $transport
-     * @param string $class
+     * @param string $class class name or object instance
+     * @param string $group
      */
-    public function connect($transport, $class)
+    public function connect($transport, $class, $group='queuedaemon')
     {
         $this->handlers[$transport] = $class;
+        $this->groups[$group][$transport] = $class;
+    }
+
+    /**
+     * @return string queue group to use for this request
+     */
+    function activeGroup()
+    {
+        $group = 'queuedaemon';
+        if ($this->master) {
+            // hack hack
+            if ($this->master instanceof XmppMaster) {
+                return 'xmppdaemon';
+            }
+        }
+        return $group;
     }
 
     /**