]> git.mxchange.org Git - quix0rs-gnu-social.git/blobdiff - lib/queuemanager.php
DB updated to 1.8.2
[quix0rs-gnu-social.git] / lib / queuemanager.php
index 43105b7a86ed19ac1fed1aa66b9bf672b15cc5f2..d42e4b4b57e88ac31f7b35ddd6f22e46088ea467 100644 (file)
@@ -2,7 +2,7 @@
 /**
  * StatusNet, the distributed open-source microblogging tool
  *
- * Abstract class for queue managers
+ * Abstract class for i/o managers
  *
  * PHP version 5
  *
  * @package   StatusNet
  * @author    Evan Prodromou <evan@status.net>
  * @author    Sarven Capadisli <csarven@status.net>
- * @copyright 2009 StatusNet, Inc.
+ * @author    Brion Vibber <brion@status.net>
+ * @copyright 2009-2010 StatusNet, Inc.
  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
  * @link      http://status.net/
  */
 
-class QueueManager
+/**
+ * Completed child classes must implement the enqueue() method.
+ *
+ * For background processing, classes should implement either socket-based
+ * input (handleInput(), getSockets()) or idle-loop polling (idle()).
+ */
+abstract class QueueManager extends IoManager
 {
     static $qm = null;
 
-    static function get()
+    protected $master = null;
+    protected $handlers = array();
+    protected $groups = array();
+    protected $activeGroups = array();
+
+    /**
+     * Factory function to pull the appropriate QueueManager object
+     * for this site's configuration. It can then be used to queue
+     * events for later processing or to spawn a processing loop.
+     *
+     * Plugins can add to the built-in types by hooking StartNewQueueManager.
+     *
+     * @return QueueManager
+     */
+    public static function get()
     {
         if (empty(self::$qm)) {
 
@@ -62,13 +83,287 @@ class QueueManager
         return self::$qm;
     }
 
-    function enqueue($object, $queue)
+    /**
+     * @fixme wouldn't necessarily work with other class types.
+     * Better to change the interface...?
+     */
+    public static function multiSite()
+    {
+        if (common_config('queue', 'subsystem') == 'stomp') {
+            return IoManager::INSTANCE_PER_PROCESS;
+        } else {
+            return IoManager::SINGLE_ONLY;
+        }
+    }
+
+    function __construct()
     {
-        throw ServerException("Unimplemented function 'enqueue' called");
+        $this->initialize();
     }
 
-    function service($queue, $handler)
+    /**
+     * Optional; ping any running queue handler daemons with a notification
+     * such as announcing a new site to handle or requesting clean shutdown.
+     * This avoids having to restart all the daemons manually to update configs
+     * and such.
+     *
+     * Called from scripts/queuectl.php controller utility.
+     *
+     * @param string $event event key
+     * @param string $param optional parameter to append to key
+     * @return boolean success
+     */
+    public function sendControlSignal($event, $param='')
     {
-        throw ServerException("Unimplemented function 'service' called");
+        throw new Exception(get_class($this) . " does not support control signals.");
+    }
+
+    /**
+     * Store an object (usually/always a Notice) into the given queue
+     * for later processing. No guarantee is made on when it will be
+     * processed; it could be immediately or at some unspecified point
+     * in the future.
+     *
+     * Must be implemented by any queue manager.
+     *
+     * @param Notice $object
+     * @param string $queue
+     */
+    abstract function enqueue($object, $queue);
+
+    /**
+     * Build a representation for an object for logging
+     * @param mixed
+     * @return string
+     */
+    function logrep($object) {
+        if (is_object($object)) {
+            $class = get_class($object);
+            if (isset($object->id)) {
+                return "$class $object->id";
+            }
+            return $class;
+        } elseif (is_string($object)) {
+            $len = strlen($object);
+            $fragment = mb_substr($object, 0, 32);
+            if (mb_strlen($object) > 32) {
+                $fragment .= '...';
+            }
+            return "string '$fragment' ($len bytes)";
+        } elseif (is_array($object)) {
+            return 'array with ' . count($object) .
+                   ' elements (keys:[' .  implode(',', array_keys($object)) . '])';
+        }
+        return strval($object);
+    }
+
+    /**
+     * Encode an object for queued storage.
+     *
+     * @param mixed $item
+     * @return string
+     */
+    protected function encode($item)
+    {
+        return serialize($item);
+    }
+
+    /**
+     * Decode an object from queued storage.
+     * Accepts notice reference entries and serialized items.
+     *
+     * @param string
+     * @return mixed
+     */
+    protected function decode($frame)
+    {
+        $object = unserialize($frame);
+
+        // If it is a string, we really store a JSON object in there
+        // except if it begins with '<', because then it is XML.
+        if (is_string($object) &&
+            substr($object, 0, 1) != '<' &&
+            !is_numeric($object))
+        {
+            $json = json_decode($object);
+            if ($json === null) {
+                throw new Exception('Bad frame in queue item');
+            }
+
+            // The JSON object has a type parameter which contains the class
+            if (empty($json->type)) {
+                throw new Exception('Type not specified for queue item');
+            }
+            if (!is_a($json->type, 'Managed_DataObject', true)) {
+                throw new Exception('Managed_DataObject class does not exist for queue item');
+            }
+
+            // And each of these types should have a unique id (or uri)
+            if (isset($json->id) && !empty($json->id)) {
+                $object = call_user_func(array($json->type, 'getKV'), 'id', $json->id);
+            } elseif (isset($json->uri) && !empty($json->uri)) {
+                $object = call_user_func(array($json->type, 'getKV'), 'uri', $json->uri);
+            }
+
+            // But if no object was found, there's nothing we can handle
+            if (!$object instanceof Managed_DataObject) {
+                throw new Exception('Queue item frame referenced a non-existant object');
+            }
+        }
+
+        // If the frame was not a string, it's either an array or an object.
+
+        return $object;
+    }
+
+    /**
+     * Instantiate the appropriate QueueHandler class for the given queue.
+     *
+     * @param string $queue
+     * @return mixed QueueHandler or null
+     */
+    function getHandler($queue)
+    {
+        if (isset($this->handlers[$queue])) {
+            $class = $this->handlers[$queue];
+            if(is_object($class)) {
+                return $class;
+            } else if (class_exists($class)) {
+                return new $class();
+            } else {
+                $this->_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
+            }
+        } else {
+            $this->_log(LOG_ERR, "Requested handler for unkown queue '$queue'");
+        }
+        return null;
+    }
+
+    /**
+     * Get a list of registered queue transport names to be used
+     * for listening in this daemon.
+     *
+     * @return array of strings
+     */
+    function activeQueues()
+    {
+        $queues = array();
+        foreach ($this->activeGroups as $group) {
+            if (isset($this->groups[$group])) {
+                $queues = array_merge($queues, $this->groups[$group]);
+            }
+        }
+
+        return array_keys($queues);
+    }
+
+    /**
+     * Initialize the list of queue handlers for the current site.
+     *
+     * @event StartInitializeQueueManager
+     * @event EndInitializeQueueManager
+     */
+    function initialize()
+    {
+        $this->handlers = array();
+        $this->groups = array();
+        $this->groupsByTransport = array();
+
+        if (Event::handle('StartInitializeQueueManager', array($this))) {
+            $this->connect('distrib', 'DistribQueueHandler');
+            $this->connect('ping', 'PingQueueHandler');
+            if (common_config('sms', 'enabled')) {
+                $this->connect('sms', 'SmsQueueHandler');
+            }
+
+            // Background user management tasks...
+            $this->connect('deluser', 'DelUserQueueHandler');
+            $this->connect('feedimp', 'FeedImporter');
+            $this->connect('actimp', 'ActivityImporter');
+            $this->connect('acctmove', 'AccountMover');
+            $this->connect('actmove', 'ActivityMover');
+
+            // For compat with old plugins not registering their own handlers.
+            $this->connect('plugin', 'PluginQueueHandler');
+        }
+        Event::handle('EndInitializeQueueManager', array($this));
+    }
+
+    /**
+     * Register a queue transport name and handler class for your plugin.
+     * Only registered transports will be reliably picked up!
+     *
+     * @param string $transport
+     * @param string $class class name or object instance
+     * @param string $group
+     */
+    public function connect($transport, $class, $group='main')
+    {
+        $this->handlers[$transport] = $class;
+        $this->groups[$group][$transport] = $class;
+        $this->groupsByTransport[$transport] = $group;
+    }
+
+    /**
+     * Set the active group which will be used for listening.
+     * @param string $group
+     */
+    function setActiveGroup($group)
+    {
+        $this->activeGroups = array($group);
+    }
+
+    /**
+     * Set the active group(s) which will be used for listening.
+     * @param array $groups
+     */
+    function setActiveGroups($groups)
+    {
+        $this->activeGroups = $groups;
+    }
+
+    /**
+     * @return string queue group for this queue
+     */
+    function queueGroup($queue)
+    {
+        if (isset($this->groupsByTransport[$queue])) {
+            return $this->groupsByTransport[$queue];
+        } else {
+            throw new Exception("Requested group for unregistered transport $queue");
+        }
+    }
+
+    /**
+     * Send a statistic ping to the queue monitoring system,
+     * optionally with a per-queue id.
+     *
+     * @param string $key
+     * @param string $queue
+     */
+    function stats($key, $queue=false)
+    {
+        $owners = array();
+        if ($queue) {
+            $owners[] = "queue:$queue";
+            $owners[] = "site:" . common_config('site', 'server');
+        }
+        if (isset($this->master)) {
+            $this->master->stats($key, $owners);
+        } else {
+            $monitor = new QueueMonitor();
+            $monitor->stats($key, $owners);
+        }
+    }
+
+    protected function _log($level, $msg)
+    {
+        $class = get_class($this);
+        if ($this->activeGroups) {
+            $groups = ' (' . implode(',', $this->activeGroups) . ')';
+        } else {
+            $groups = '';
+        }
+        common_log($level, "$class$groups: $msg");
     }
 }