]> git.mxchange.org Git - quix0rs-gnu-social.git/blobdiff - lib/queuemanager.php
DB updated to 1.8.2
[quix0rs-gnu-social.git] / lib / queuemanager.php
index a0b13fe556f6a93596c91580084ab9352080d1dc..d42e4b4b57e88ac31f7b35ddd6f22e46088ea467 100644 (file)
@@ -39,9 +39,10 @@ abstract class QueueManager extends IoManager
 {
     static $qm = null;
 
-    public $master = null;
-    public $handlers = array();
-    public $groups = array();
+    protected $master = null;
+    protected $handlers = array();
+    protected $groups = array();
+    protected $activeGroups = array();
 
     /**
      * Factory function to pull the appropriate QueueManager object
@@ -142,14 +143,16 @@ abstract class QueueManager extends IoManager
                 return "$class $object->id";
             }
             return $class;
-        }
-        if (is_string($object)) {
+        } elseif (is_string($object)) {
             $len = strlen($object);
             $fragment = mb_substr($object, 0, 32);
             if (mb_strlen($object) > 32) {
                 $fragment .= '...';
             }
             return "string '$fragment' ($len bytes)";
+        } elseif (is_array($object)) {
+            return 'array with ' . count($object) .
+                   ' elements (keys:[' .  implode(',', array_keys($object)) . '])';
         }
         return strval($object);
     }
@@ -157,24 +160,60 @@ abstract class QueueManager extends IoManager
     /**
      * Encode an object for queued storage.
      *
-     * @param mixed $object
+     * @param mixed $item
      * @return string
      */
-    protected function encode($object)
+    protected function encode($item)
     {
-        return serialize($object);
+        return serialize($item);
     }
 
     /**
      * Decode an object from queued storage.
-     * Accepts back-compat notice reference entries and strings for now.
+     * Accepts notice reference entries and serialized items.
      *
      * @param string
      * @return mixed
      */
     protected function decode($frame)
     {
-        return unserialize($frame);
+        $object = unserialize($frame);
+
+        // If it is a string, we really store a JSON object in there
+        // except if it begins with '<', because then it is XML.
+        if (is_string($object) &&
+            substr($object, 0, 1) != '<' &&
+            !is_numeric($object))
+        {
+            $json = json_decode($object);
+            if ($json === null) {
+                throw new Exception('Bad frame in queue item');
+            }
+
+            // The JSON object has a type parameter which contains the class
+            if (empty($json->type)) {
+                throw new Exception('Type not specified for queue item');
+            }
+            if (!is_a($json->type, 'Managed_DataObject', true)) {
+                throw new Exception('Managed_DataObject class does not exist for queue item');
+            }
+
+            // And each of these types should have a unique id (or uri)
+            if (isset($json->id) && !empty($json->id)) {
+                $object = call_user_func(array($json->type, 'getKV'), 'id', $json->id);
+            } elseif (isset($json->uri) && !empty($json->uri)) {
+                $object = call_user_func(array($json->type, 'getKV'), 'uri', $json->uri);
+            }
+
+            // But if no object was found, there's nothing we can handle
+            if (!$object instanceof Managed_DataObject) {
+                throw new Exception('Queue item frame referenced a non-existant object');
+            }
+        }
+
+        // If the frame was not a string, it's either an array or an object.
+
+        return $object;
     }
 
     /**
@@ -192,44 +231,58 @@ abstract class QueueManager extends IoManager
             } else if (class_exists($class)) {
                 return new $class();
             } else {
-                common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
+                $this->_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
             }
         } else {
-            common_log(LOG_ERR, "Requested handler for unkown queue '$queue'");
+            $this->_log(LOG_ERR, "Requested handler for unkown queue '$queue'");
         }
         return null;
     }
 
     /**
      * Get a list of registered queue transport names to be used
-     * for this daemon.
+     * for listening in this daemon.
      *
      * @return array of strings
      */
-    function getQueues()
+    function activeQueues()
     {
-        $group = $this->activeGroup();
-        return array_keys($this->groups[$group]);
+        $queues = array();
+        foreach ($this->activeGroups as $group) {
+            if (isset($this->groups[$group])) {
+                $queues = array_merge($queues, $this->groups[$group]);
+            }
+        }
+
+        return array_keys($queues);
     }
 
     /**
-     * Initialize the list of queue handlers
+     * Initialize the list of queue handlers for the current site.
      *
      * @event StartInitializeQueueManager
      * @event EndInitializeQueueManager
      */
     function initialize()
     {
-        // @fixme we'll want to be able to listen to particular queues...
+        $this->handlers = array();
+        $this->groups = array();
+        $this->groupsByTransport = array();
+
         if (Event::handle('StartInitializeQueueManager', array($this))) {
-            $this->connect('plugin', 'PluginQueueHandler');
-            $this->connect('omb', 'OmbQueueHandler');
-            $this->connect('ping', 'PingQueueHandler');
             $this->connect('distrib', 'DistribQueueHandler');
+            $this->connect('ping', 'PingQueueHandler');
             if (common_config('sms', 'enabled')) {
                 $this->connect('sms', 'SmsQueueHandler');
             }
 
+            // Background user management tasks...
+            $this->connect('deluser', 'DelUserQueueHandler');
+            $this->connect('feedimp', 'FeedImporter');
+            $this->connect('actimp', 'ActivityImporter');
+            $this->connect('acctmove', 'AccountMover');
+            $this->connect('actmove', 'ActivityMover');
+
             // For compat with old plugins not registering their own handlers.
             $this->connect('plugin', 'PluginQueueHandler');
         }
@@ -244,25 +297,41 @@ abstract class QueueManager extends IoManager
      * @param string $class class name or object instance
      * @param string $group
      */
-    public function connect($transport, $class, $group='queuedaemon')
+    public function connect($transport, $class, $group='main')
     {
         $this->handlers[$transport] = $class;
         $this->groups[$group][$transport] = $class;
+        $this->groupsByTransport[$transport] = $group;
     }
 
     /**
-     * @return string queue group to use for this request
+     * Set the active group which will be used for listening.
+     * @param string $group
      */
-    function activeGroup()
+    function setActiveGroup($group)
     {
-        $group = 'queuedaemon';
-        if ($this->master) {
-            // hack hack
-            if ($this->master instanceof ImMaster) {
-                return 'imdaemon';
-            }
+        $this->activeGroups = array($group);
+    }
+
+    /**
+     * Set the active group(s) which will be used for listening.
+     * @param array $groups
+     */
+    function setActiveGroups($groups)
+    {
+        $this->activeGroups = $groups;
+    }
+
+    /**
+     * @return string queue group for this queue
+     */
+    function queueGroup($queue)
+    {
+        if (isset($this->groupsByTransport[$queue])) {
+            return $this->groupsByTransport[$queue];
+        } else {
+            throw new Exception("Requested group for unregistered transport $queue");
         }
-        return $group;
     }
 
     /**
@@ -286,4 +355,15 @@ abstract class QueueManager extends IoManager
             $monitor->stats($key, $owners);
         }
     }
+
+    protected function _log($level, $msg)
+    {
+        $class = get_class($this);
+        if ($this->activeGroups) {
+            $groups = ' (' . implode(',', $this->activeGroups) . ')';
+        } else {
+            $groups = '';
+        }
+        common_log($level, "$class$groups: $msg");
+    }
 }