{
$headers = array('ack' => 'client');
$headers['activemq.prefetchSize'] = $this->prefetchSize;
- if ($this->clientId != null) {
+ $headers['prefetch-count'] = '1';
+
+ if ($this->clientId != null) {
$headers["activemq.subcriptionName"] = $this->clientId;
}
if (isset($properties)) {
$envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'),
'handler' => $queue,
'payload' => $this->encode($object));
- $msg = serialize($envelope);
+ $msg = base64_encode(serialize($envelope));
$props = array('created' => common_sql_now());
if ($this->isPersistent($queue)) {
protected function handleItem($frame)
{
$host = $this->cons[$this->defaultIdx]->getServer();
- $message = unserialize($frame->body);
+ $message = unserialize(base64_decode($frame->body));
if ($message === false) {
$this->_log(LOG_ERR, "Can't unserialize frame: {$frame->body}");
$site = $message['site'];
$queue = $message['handler'];
- if ($this->isDeadletter($frame, $message)) {
+ if ($this->isDeadLetter($frame, $message)) {
$this->stats('deadletter', $queue);
return false;
}