]> git.mxchange.org Git - quix0rs-gnu-social.git/blobdiff - lib/queuemanager.php
- break OMB profile update pings to a background queue
[quix0rs-gnu-social.git] / lib / queuemanager.php
index 291174d3c4729a7ac11e5f561fc497ff0ce3b17a..9fdc801100f46f58df9e9ab22c71529da55af6bd 100644 (file)
@@ -39,6 +39,11 @@ abstract class QueueManager extends IoManager
 {
     static $qm = null;
 
+    protected $master = null;
+    protected $handlers = array();
+    protected $groups = array();
+    protected $activeGroups = array();
+
     /**
      * Factory function to pull the appropriate QueueManager object
      * for this site's configuration. It can then be used to queue
@@ -96,6 +101,23 @@ abstract class QueueManager extends IoManager
         $this->initialize();
     }
 
+    /**
+     * Optional; ping any running queue handler daemons with a notification
+     * such as announcing a new site to handle or requesting clean shutdown.
+     * This avoids having to restart all the daemons manually to update configs
+     * and such.
+     *
+     * Called from scripts/queuectl.php controller utility.
+     *
+     * @param string $event event key
+     * @param string $param optional parameter to append to key
+     * @return boolean success
+     */
+    public function sendControlSignal($event, $param='')
+    {
+        throw new Exception(get_class($this) . " does not support control signals.");
+    }
+
     /**
      * Store an object (usually/always a Notice) into the given queue
      * for later processing. No guarantee is made on when it will be
@@ -109,6 +131,78 @@ abstract class QueueManager extends IoManager
      */
     abstract function enqueue($object, $queue);
 
+    /**
+     * Build a representation for an object for logging
+     * @param mixed
+     * @return string
+     */
+    function logrep($object) {
+        if (is_object($object)) {
+            $class = get_class($object);
+            if (isset($object->id)) {
+                return "$class $object->id";
+            }
+            return $class;
+        }
+        if (is_string($object)) {
+            $len = strlen($object);
+            $fragment = mb_substr($object, 0, 32);
+            if (mb_strlen($object) > 32) {
+                $fragment .= '...';
+            }
+            return "string '$fragment' ($len bytes)";
+        }
+        return strval($object);
+    }
+
+    /**
+     * Encode an object or variable for queued storage.
+     * Notice objects are currently stored as an id reference;
+     * other items are serialized.
+     *
+     * @param mixed $item
+     * @return string
+     */
+    protected function encode($item)
+    {
+        if ($item instanceof Notice) {
+            // Backwards compat
+            return $item->id;
+        } else {
+            return serialize($item);
+        }
+    }
+
+    /**
+     * Decode an object from queued storage.
+     * Accepts notice reference entries and serialized items.
+     *
+     * @param string
+     * @return mixed
+     */
+    protected function decode($frame)
+    {
+        if (is_numeric($frame)) {
+            // Back-compat for notices...
+            return Notice::staticGet(intval($frame));
+        } elseif (substr($frame, 0, 1) == '<') {
+            // Back-compat for XML source
+            return $frame;
+        } else {
+            // Deserialize!
+            #$old = error_reporting();
+            #error_reporting($old & ~E_NOTICE);
+            $out = unserialize($frame);
+            #error_reporting($old);
+
+            if ($out === false && $frame !== 'b:0;') {
+                common_log(LOG_ERR, "Couldn't unserialize queued frame: $frame");
+                return false;
+            }
+            return $out;
+        }
+    }
+
     /**
      * Instantiate the appropriate QueueHandler class for the given queue.
      *
@@ -122,59 +216,69 @@ abstract class QueueManager extends IoManager
             if (class_exists($class)) {
                 return new $class();
             } else {
-                common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
+                $this->_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
             }
         } else {
-            common_log(LOG_ERR, "Requested handler for unkown queue '$queue'");
+            $this->_log(LOG_ERR, "Requested handler for unkown queue '$queue'");
         }
         return null;
     }
 
     /**
-     * Get a list of all registered queue transport names.
+     * Get a list of registered queue transport names to be used
+     * for listening in this daemon.
      *
      * @return array of strings
      */
-    function getQueues()
+    function activeQueues()
     {
-        return array_keys($this->handlers);
+        $queues = array();
+        foreach ($this->activeGroups as $group) {
+            if (isset($this->groups[$group])) {
+                $queues = array_merge($queues, $this->groups[$group]);
+            }
+        }
+
+        return array_keys($queues);
     }
 
     /**
-     * Initialize the list of queue handlers
+     * Initialize the list of queue handlers for the current site.
      *
      * @event StartInitializeQueueManager
      * @event EndInitializeQueueManager
      */
     function initialize()
     {
+        $this->handlers = array();
+        $this->groups = array();
+        $this->groupsByTransport = array();
+
         if (Event::handle('StartInitializeQueueManager', array($this))) {
-            if (!defined('XMPP_ONLY_FLAG')) { // hack!
-                $this->connect('plugin', 'PluginQueueHandler');
-                $this->connect('omb', 'OmbQueueHandler');
-                $this->connect('ping', 'PingQueueHandler');
-                if (common_config('sms', 'enabled')) {
-                    $this->connect('sms', 'SmsQueueHandler');
-                }
+            $this->connect('distrib', 'DistribQueueHandler');
+            $this->connect('omb', 'OmbQueueHandler');
+            $this->connect('ping', 'PingQueueHandler');
+            if (common_config('sms', 'enabled')) {
+                $this->connect('sms', 'SmsQueueHandler');
             }
 
+            // Broadcasting profile updates to OMB remote subscribers
+            $this->connect('profile', 'ProfileQueueHandler');
+
             // XMPP output handlers...
-            if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
+            if (common_config('xmpp', 'enabled')) {
+                // Delivery prep, read by queuedaemon.php:
                 $this->connect('jabber', 'JabberQueueHandler');
                 $this->connect('public', 'PublicQueueHandler');
-                
-                // @fixme this should move up a level or should get an actual queue
-                $this->connect('confirm', 'XmppConfirmHandler');
-            }
 
-            if (!defined('XMPP_ONLY_FLAG')) { // hack!
-                // For compat with old plugins not registering their own handlers.
-                $this->connect('plugin', 'PluginQueueHandler');
+                // Raw output, read by xmppdaemon.php:
+                $this->connect('xmppout', 'XmppOutQueueHandler', 'xmpp');
             }
+
+            // For compat with old plugins not registering their own handlers.
+            $this->connect('plugin', 'PluginQueueHandler');
         }
-        if (!defined('XMPP_ONLY_FLAG')) { // hack!
-            Event::handle('EndInitializeQueueManager', array($this));
-        }
+        Event::handle('EndInitializeQueueManager', array($this));
     }
 
     /**
@@ -183,10 +287,43 @@ abstract class QueueManager extends IoManager
      *
      * @param string $transport
      * @param string $class
+     * @param string $group
      */
-    public function connect($transport, $class)
+    public function connect($transport, $class, $group='main')
     {
         $this->handlers[$transport] = $class;
+        $this->groups[$group][$transport] = $class;
+        $this->groupsByTransport[$transport] = $group;
+    }
+
+    /**
+     * Set the active group which will be used for listening.
+     * @param string $group
+     */
+    function setActiveGroup($group)
+    {
+        $this->activeGroups = array($group);
+    }
+
+    /**
+     * Set the active group(s) which will be used for listening.
+     * @param array $groups
+     */
+    function setActiveGroups($groups)
+    {
+        $this->activeGroups = $groups;
+    }
+
+    /**
+     * @return string queue group for this queue
+     */
+    function queueGroup($queue)
+    {
+        if (isset($this->groupsByTransport[$queue])) {
+            return $this->groupsByTransport[$queue];
+        } else {
+            throw new Exception("Requested group for unregistered transport $queue");
+        }
     }
 
     /**
@@ -210,4 +347,15 @@ abstract class QueueManager extends IoManager
             $monitor->stats($key, $owners);
         }
     }
+
+    protected function _log($level, $msg)
+    {
+        $class = get_class($this);
+        if ($this->activeGroups) {
+            $groups = ' (' . implode(',', $this->activeGroups) . ')';
+        } else {
+            $groups = '';
+        }
+        common_log($level, "$class$groups: $msg");
+    }
 }