}
$queue = $qi->transport;
- $item = $this->decode($qi->frame);
+ try {
+ $item = $this->decode($qi->frame);
+ } catch (Exception $e) {
+ $this->_log(LOG_INFO, "[$queue] Discarding: ".$e->getMessage());
+ $this->_done($qi);
+ return true;
+ }
- if ($item) {
- $rep = $this->logrep($item);
- $this->_log(LOG_DEBUG, "Got $rep for transport $queue");
-
- $handler = $this->getHandler($queue);
- if ($handler) {
- if ($handler->handle($item)) {
- $this->_log(LOG_INFO, "[$queue:$rep] Successfully handled item");
- $this->_done($qi);
- } else {
- $this->_log(LOG_INFO, "[$queue:$rep] Failed to handle item");
- $this->_fail($qi);
- }
- } else {
- $this->_log(LOG_INFO, "[$queue:$rep] No handler for queue $queue; discarding.");
+ $rep = $this->logrep($item);
+ $this->_log(LOG_DEBUG, "Got $rep for transport $queue");
+
+ $handler = $this->getHandler($queue);
+ if ($handler) {
+ if ($handler->handle($item)) {
+ $this->_log(LOG_INFO, "[$queue:$rep] Successfully handled item");
$this->_done($qi);
+ } else {
+ $this->_log(LOG_INFO, "[$queue:$rep] Failed to handle item");
+ $this->_fail($qi);
}
} else {
- $this->_log(LOG_INFO, "[$queue] Got empty/deleted item, discarding");
+ $this->_log(LOG_INFO, "[$queue:$rep] No handler for queue $queue; discarding.");
$this->_done($qi);
}
return true;
*/
protected function decode($frame)
{
- return unserialize($frame);
+ $object = unserialize($frame);
+
+ // If it is a string, we really store a JSON object in there
+ if (is_string($object)) {
+ $json = json_decode($object);
+ if ($json === null) {
+ throw new Exception('Bad frame in queue item');
+ }
+
+ // The JSON object has a type parameter which contains the class
+ if (empty($json->type)) {
+ throw new Exception('Type not specified for queue item');
+ }
+ if (!is_a($json->type, 'Managed_DataObject', true)) {
+ throw new Exception('Managed_DataObject class does not exist for queue item');
+ }
+
+ // And each of these types should have a unique id (or uri)
+ if (isset($json->id) && !empty($json->id)) {
+ $object = call_user_func(array($json->type, 'getKV'), 'id', $json->id);
+ } elseif (isset($json->uri) && !empty($json->uri)) {
+ $object = call_user_func(array($json->type, 'getKV'), 'uri', $json->uri);
+ }
+
+ // But if no object was found, there's nothing we can handle
+ if (!$object instanceof Managed_DataObject) {
+ throw new Exception('Queue item frame referenced a non-existant object');
+ }
+ }
+
+ // If the frame was not a string, it's either an array or an object.
+
+ return $object;
}
/**
// @fixme detect failing site switches
$this->switchSite($site);
- $item = $this->decode($message['payload']);
- if (empty($item)) {
+ try {
+ $item = $this->decode($message['payload']);
+ } catch (Exception $e) {
$this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host");
$this->stats('baditem', $queue);
return false;