return "$class $object->id";
}
return $class;
- }
- if (is_string($object)) {
+ } elseif (is_string($object)) {
$len = strlen($object);
$fragment = mb_substr($object, 0, 32);
if (mb_strlen($object) > 32) {
$fragment .= '...';
}
return "string '$fragment' ($len bytes)";
+ } elseif (is_array($object)) {
+ return 'array with ' . count($object) .
+ ' elements (keys:[' . implode(',', array_keys($object)) . '])';
}
return strval($object);
}
/**
- * Encode an object or variable for queued storage.
- * Notice objects are currently stored as an id reference;
- * other items are serialized.
+ * Encode an object for queued storage.
*
* @param mixed $item
* @return string
*/
protected function encode($item)
{
- if ($item instanceof Notice) {
- // Backwards compat
- return $item->id;
- } else {
- return serialize($item);
- }
+ return serialize($item);
}
/**
*/
protected function decode($frame)
{
- if (is_numeric($frame)) {
- // Back-compat for notices...
- return Notice::staticGet(intval($frame));
- } elseif (substr($frame, 0, 1) == '<') {
- // Back-compat for XML source
- return $frame;
- } else {
- // Deserialize!
- #$old = error_reporting();
- #error_reporting($old & ~E_NOTICE);
- $out = unserialize($frame);
- #error_reporting($old);
-
- if ($out === false && $frame !== 'b:0;') {
- common_log(LOG_ERR, "Couldn't unserialize queued frame: $frame");
- return false;
+ $object = unserialize($frame);
+
+ // If it is a string, we really store a JSON object in there
+ // except if it begins with '<', because then it is XML.
+ if (is_string($object) && substr($object, 0, 1) != '<') {
+ $json = json_decode($object);
+ if ($json === null) {
+ throw new Exception('Bad frame in queue item');
+ }
+
+ // The JSON object has a type parameter which contains the class
+ if (empty($json->type)) {
+ throw new Exception('Type not specified for queue item');
+ }
+ if (!is_a($json->type, 'Managed_DataObject', true)) {
+ throw new Exception('Managed_DataObject class does not exist for queue item');
+ }
+
+ // And each of these types should have a unique id (or uri)
+ if (isset($json->id) && !empty($json->id)) {
+ $object = call_user_func(array($json->type, 'getKV'), 'id', $json->id);
+ } elseif (isset($json->uri) && !empty($json->uri)) {
+ $object = call_user_func(array($json->type, 'getKV'), 'uri', $json->uri);
+ }
+
+ // But if no object was found, there's nothing we can handle
+ if (!$object instanceof Managed_DataObject) {
+ throw new Exception('Queue item frame referenced a non-existant object');
}
- return $out;
}
+
+ // If the frame was not a string, it's either an array or an object.
+
+ return $object;
}
/**
if (Event::handle('StartInitializeQueueManager', array($this))) {
$this->connect('distrib', 'DistribQueueHandler');
- $this->connect('omb', 'OmbQueueHandler');
$this->connect('ping', 'PingQueueHandler');
if (common_config('sms', 'enabled')) {
$this->connect('sms', 'SmsQueueHandler');
$this->connect('acctmove', 'AccountMover');
$this->connect('actmove', 'ActivityMover');
- // Broadcasting profile updates to OMB remote subscribers
- $this->connect('profile', 'ProfileQueueHandler');
-
- // XMPP output handlers...
- if (common_config('xmpp', 'enabled')) {
- // Delivery prep, read by queuedaemon.php:
- $this->connect('jabber', 'JabberQueueHandler');
- $this->connect('public', 'PublicQueueHandler');
-
- // Raw output, read by xmppdaemon.php:
- $this->connect('xmppout', 'XmppOutQueueHandler', 'xmpp');
- }
-
// For compat with old plugins not registering their own handlers.
$this->connect('plugin', 'PluginQueueHandler');
}