+ if (!empty($this->subscriptions[$site])) {
+ foreach ($this->subscriptions[$site] as $queue => $rawqueue) {
+ $this->_log(LOG_INFO, "Unsubscribing from $rawqueue");
+ $this->con->unsubscribe($rawqueue);
+ unset($this->subscriptions[$site][$queue]);
+ }
+ }
+ }
+
+ /**
+ * Handle and acknowledge an event that's come in through a queue.
+ *
+ * If the queue handler reports failure, the message is requeued for later.
+ * Missing notices or handler classes will drop the message.
+ *
+ * Side effects: in multi-site mode, may reset site configuration to
+ * match the site that queued the event.
+ *
+ * @param StompFrame $frame
+ * @return bool
+ */
+ protected function handleItem($frame)
+ {
+ list($site, $queue) = $this->parseDestination($frame->headers['destination']);
+ if ($site != $this->currentSite()) {
+ $this->stats('switch');
+ StatusNet::init($site);
+ }
+
+ if (is_numeric($frame->body)) {
+ $id = intval($frame->body);
+ $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
+
+ $notice = Notice::staticGet('id', $id);
+ if (empty($notice)) {
+ $this->_log(LOG_WARNING, "Skipping missing $info");
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
+ $this->stats('badnotice', $queue);
+ return false;
+ }
+
+ $item = $notice;
+ } else {
+ // @fixme should we serialize, or json, or what here?
+ $info = "string posted at {$frame->headers['created']} in queue $queue";
+ $item = $frame->body;
+ }
+
+ $handler = $this->getHandler($queue);
+ if (!$handler) {
+ $this->_log(LOG_ERR, "Missing handler class; skipping $info");
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
+ $this->stats('badhandler', $queue);
+ return false;
+ }
+
+ $ok = $handler->handle($item);
+
+ if (!$ok) {
+ $this->_log(LOG_WARNING, "Failed handling $info");
+ // FIXME we probably shouldn't have to do
+ // this kind of queue management ourselves;
+ // if we don't ack, it should resend...
+ $this->ack($frame);
+ $this->enqueue($item, $queue);
+ $this->commit();
+ $this->begin();
+ $this->stats('requeued', $queue);
+ return false;
+ }
+
+ $this->_log(LOG_INFO, "Successfully handled $info");
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
+ $this->stats('handled', $queue);
+ return true;
+ }
+
+ /**
+ * Process a control signal broadcast.
+ *
+ * @param array $frame Stomp frame
+ * @return bool true to continue; false to stop further processing.
+ */
+ protected function handleControlSignal($frame)
+ {
+ $message = trim($frame->body);
+ if (strpos($message, ':') !== false) {
+ list($event, $param) = explode(':', $message, 2);
+ } else {
+ $event = $message;
+ $param = '';
+ }