+ $this->_log(LOG_WARNING, "Skipping missing $info");
+ $this->ack($idx, $frame);
+ $this->commit($idx);
+ $this->begin($idx);
+ $this->stats('badnotice', $queue);
+ return false;
+ }
+
+ $item = $notice;
+ } else {
+ // @fixme should we serialize, or json, or what here?
+ $info = "string posted at {$frame->headers['created']} in queue $queue from $host";
+ $item = $frame->body;
+ }
+
+ $handler = $this->getHandler($queue);
+ if (!$handler) {
+ $this->_log(LOG_ERR, "Missing handler class; skipping $info");
+ $this->ack($idx, $frame);
+ $this->commit($idx);
+ $this->begin($idx);
+ $this->stats('badhandler', $queue);
+ return false;
+ }
+
+ // If there's an exception when handling,
+ // log the error and let it get requeued.
+
+ try {
+ $ok = $handler->handle($item);
+ } catch (Exception $e) {
+ $this->_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage());
+ $ok = false;
+ }
+
+ if (!$ok) {
+ $this->_log(LOG_WARNING, "Failed handling $info");
+ // FIXME we probably shouldn't have to do
+ // this kind of queue management ourselves;
+ // if we don't ack, it should resend...
+ $this->ack($idx, $frame);
+ $this->enqueue($item, $queue);
+ $this->commit($idx);
+ $this->begin($idx);
+ $this->stats('requeued', $queue);
+ return false;
+ }
+
+ $this->_log(LOG_INFO, "Successfully handled $info");
+ $this->ack($idx, $frame);
+ $this->commit($idx);
+ $this->begin($idx);
+ $this->stats('handled', $queue);
+ return true;
+ }
+
+ /**
+ * Process a control signal broadcast.
+ *
+ * @param int $idx connection index
+ * @param array $frame Stomp frame
+ * @return bool true to continue; false to stop further processing.
+ */
+ protected function handleControlSignal($idx, $frame)
+ {
+ $message = trim($frame->body);
+ if (strpos($message, ':') !== false) {
+ list($event, $param) = explode(':', $message, 2);
+ } else {
+ $event = $message;
+ $param = '';
+ }
+
+ $shutdown = false;
+
+ if ($event == 'shutdown') {
+ $this->master->requestShutdown();
+ $shutdown = true;
+ } else if ($event == 'restart') {
+ $this->master->requestRestart();
+ $shutdown = true;
+ } else if ($event == 'update') {
+ $this->updateSiteConfig($param);
+ } else {
+ $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message");
+ }
+
+ $this->ack($idx, $frame);
+ $this->commit($idx);
+ $this->begin($idx);
+ return $shutdown;
+ }
+
+ /**
+ * Set us up with queue subscriptions for a new site added at runtime,
+ * triggered by a broadcast to the 'statusnet-control' topic.
+ *
+ * @param array $frame Stomp frame
+ * @return bool true to continue; false to stop further processing.
+ */
+ protected function updateSiteConfig($nickname)
+ {
+ if (empty($this->sites)) {
+ if ($nickname == common_config('site', 'nickname')) {
+ StatusNet::init(common_config('site', 'server'));
+ $this->doUnsubscribe();
+ $this->doSubscribe();