return "$class $object->id";
}
return $class;
- }
- if (is_string($object)) {
+ } elseif (is_string($object)) {
$len = strlen($object);
$fragment = mb_substr($object, 0, 32);
if (mb_strlen($object) > 32) {
$fragment .= '...';
}
return "string '$fragment' ($len bytes)";
+ } elseif (is_array($object)) {
+ return 'array with ' . count($object) .
+ ' elements (keys:[' . implode(',', array_keys($object)) . '])';
}
return strval($object);
}
*/
protected function decode($frame)
{
- return unserialize($frame);
+ $object = unserialize($frame);
+
+ // If it is a string, we really store a JSON object in there
+ // except if it begins with '<', because then it is XML.
+ if (is_string($object) && substr($object, 0, 1) != '<') {
+ $json = json_decode($object);
+ if ($json === null) {
+ throw new Exception('Bad frame in queue item');
+ }
+
+ // The JSON object has a type parameter which contains the class
+ if (empty($json->type)) {
+ throw new Exception('Type not specified for queue item');
+ }
+ if (!is_a($json->type, 'Managed_DataObject', true)) {
+ throw new Exception('Managed_DataObject class does not exist for queue item');
+ }
+
+ // And each of these types should have a unique id (or uri)
+ if (isset($json->id) && !empty($json->id)) {
+ $object = call_user_func(array($json->type, 'getKV'), 'id', $json->id);
+ } elseif (isset($json->uri) && !empty($json->uri)) {
+ $object = call_user_func(array($json->type, 'getKV'), 'uri', $json->uri);
+ }
+
+ // But if no object was found, there's nothing we can handle
+ if (!$object instanceof Managed_DataObject) {
+ throw new Exception('Queue item frame referenced a non-existant object');
+ }
+ }
+
+ // If the frame was not a string, it's either an array or an object.
+
+ return $object;
}
/**
if (Event::handle('StartInitializeQueueManager', array($this))) {
$this->connect('distrib', 'DistribQueueHandler');
- $this->connect('omb', 'OmbQueueHandler');
$this->connect('ping', 'PingQueueHandler');
if (common_config('sms', 'enabled')) {
$this->connect('sms', 'SmsQueueHandler');
// Background user management tasks...
$this->connect('deluser', 'DelUserQueueHandler');
-
- // Broadcasting profile updates to OMB remote subscribers
- $this->connect('profile', 'ProfileQueueHandler');
+ $this->connect('feedimp', 'FeedImporter');
+ $this->connect('actimp', 'ActivityImporter');
+ $this->connect('acctmove', 'AccountMover');
+ $this->connect('actmove', 'ActivityMover');
// For compat with old plugins not registering their own handlers.
$this->connect('plugin', 'PluginQueueHandler');