X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=lib%2Fqueuemanager.php;h=bc18e1fc56fe9e39ca6a769c8eec9dcdd46ff599;hb=33e2f5b449d477e55bda7029f9e826d889e41eb5;hp=0829c8a8bcb8321d1aba5deaa55d8e4640d4d9eb;hpb=cd29d3d646379aa9a1352035973c8e379cc7f42b;p=quix0rs-gnu-social.git diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 0829c8a8bc..bc18e1fc56 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -143,34 +143,29 @@ abstract class QueueManager extends IoManager return "$class $object->id"; } return $class; - } - if (is_string($object)) { + } elseif (is_string($object)) { $len = strlen($object); $fragment = mb_substr($object, 0, 32); if (mb_strlen($object) > 32) { $fragment .= '...'; } return "string '$fragment' ($len bytes)"; + } elseif (is_array($object)) { + return 'array with ' . count($object) . + ' elements (keys:[' . implode(',', array_keys($object)) . '])'; } return strval($object); } /** - * Encode an object or variable for queued storage. - * Notice objects are currently stored as an id reference; - * other items are serialized. + * Encode an object for queued storage. * * @param mixed $item * @return string */ protected function encode($item) { - if ($item instanceof Notice) { - // Backwards compat - return $item->id; - } else { - return serialize($item); - } + return serialize($item); } /** @@ -182,25 +177,40 @@ abstract class QueueManager extends IoManager */ protected function decode($frame) { - if (is_numeric($frame)) { - // Back-compat for notices... - return Notice::staticGet(intval($frame)); - } elseif (substr($frame, 0, 1) == '<') { - // Back-compat for XML source - return $frame; - } else { - // Deserialize! - #$old = error_reporting(); - #error_reporting($old & ~E_NOTICE); - $out = unserialize($frame); - #error_reporting($old); - - if ($out === false && $frame !== 'b:0;') { - common_log(LOG_ERR, "Couldn't unserialize queued frame: $frame"); - return false; + $object = unserialize($frame); + + // If it is a string, we really store a JSON object in there + // except if it begins with '<', because then it is XML. + if (is_string($object) && substr($object, 0, 1) != '<') { + $json = json_decode($object); + if ($json === null) { + throw new Exception('Bad frame in queue item'); + } + + // The JSON object has a type parameter which contains the class + if (empty($json->type)) { + throw new Exception('Type not specified for queue item'); + } + if (!is_a($json->type, 'Managed_DataObject', true)) { + throw new Exception('Managed_DataObject class does not exist for queue item'); + } + + // And each of these types should have a unique id (or uri) + if (isset($json->id) && !empty($json->id)) { + $object = call_user_func(array($json->type, 'getKV'), 'id', $json->id); + } elseif (isset($json->uri) && !empty($json->uri)) { + $object = call_user_func(array($json->type, 'getKV'), 'uri', $json->uri); + } + + // But if no object was found, there's nothing we can handle + if (!$object instanceof Managed_DataObject) { + throw new Exception('Queue item frame referenced a non-existant object'); } - return $out; } + + // If the frame was not a string, it's either an array or an object. + + return $object; } /** @@ -258,7 +268,6 @@ abstract class QueueManager extends IoManager if (Event::handle('StartInitializeQueueManager', array($this))) { $this->connect('distrib', 'DistribQueueHandler'); - $this->connect('omb', 'OmbQueueHandler'); $this->connect('ping', 'PingQueueHandler'); if (common_config('sms', 'enabled')) { $this->connect('sms', 'SmsQueueHandler'); @@ -266,19 +275,10 @@ abstract class QueueManager extends IoManager // Background user management tasks... $this->connect('deluser', 'DelUserQueueHandler'); - - // Broadcasting profile updates to OMB remote subscribers - $this->connect('profile', 'ProfileQueueHandler'); - - // XMPP output handlers... - if (common_config('xmpp', 'enabled')) { - // Delivery prep, read by queuedaemon.php: - $this->connect('jabber', 'JabberQueueHandler'); - $this->connect('public', 'PublicQueueHandler'); - - // Raw output, read by xmppdaemon.php: - $this->connect('xmppout', 'XmppOutQueueHandler', 'xmpp'); - } + $this->connect('feedimp', 'FeedImporter'); + $this->connect('actimp', 'ActivityImporter'); + $this->connect('acctmove', 'AccountMover'); + $this->connect('actmove', 'ActivityMover'); // For compat with old plugins not registering their own handlers. $this->connect('plugin', 'PluginQueueHandler');