protected $handlers = array();
protected $groups = array();
protected $activeGroups = array();
+ protected $ignoredTransports = array();
/**
* Factory function to pull the appropriate QueueManager object
*/
protected function decode($frame)
{
- return unserialize($frame);
+ $object = unserialize($frame);
+
+ // If it is a string, we really store a JSON object in there
+ // except if it begins with '<', because then it is XML.
+ if (is_string($object) &&
+ substr($object, 0, 1) != '<' &&
+ !is_numeric($object))
+ {
+ $json = json_decode($object);
+ if ($json === null) {
+ throw new Exception('Bad frame in queue item');
+ }
+
+ // The JSON object has a type parameter which contains the class
+ if (empty($json->type)) {
+ throw new Exception('Type not specified for queue item');
+ }
+ if (!is_a($json->type, 'Managed_DataObject', true)) {
+ throw new Exception('Managed_DataObject class does not exist for queue item');
+ }
+
+ // And each of these types should have a unique id (or uri)
+ if (isset($json->id) && !empty($json->id)) {
+ $object = call_user_func(array($json->type, 'getKV'), 'id', $json->id);
+ } elseif (isset($json->uri) && !empty($json->uri)) {
+ $object = call_user_func(array($json->type, 'getKV'), 'uri', $json->uri);
+ }
+
+ // But if no object was found, there's nothing we can handle
+ if (!$object instanceof Managed_DataObject) {
+ throw new Exception('Queue item frame referenced a non-existant object');
+ }
+ }
+
+ // If the frame was not a string, it's either an array or an object.
+
+ return $object;
}
/**
} else {
$this->_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
}
- } else {
- $this->_log(LOG_ERR, "Requested handler for unkown queue '$queue'");
}
- return null;
+ throw new NoQueueHandlerException($queue);
}
/**
return array_keys($queues);
}
+ function getIgnoredTransports()
+ {
+ return array_keys($this->ignoredTransports);
+ }
+
+ function ignoreTransport($transport)
+ {
+ // key is used for uniqueness, value doesn't mean anything
+ $this->ignoredTransports[$transport] = true;
+ }
+
/**
* Initialize the list of queue handlers for the current site.
*