+ $message = trim($frame->body);
+ if (strpos($message, ':') !== false) {
+ list($event, $param) = explode(':', $message, 2);
+ } else {
+ $event = $message;
+ $param = '';
+ }
+
+ $shutdown = false;
+
+ if ($event == 'shutdown') {
+ $this->master->requestShutdown();
+ $shutdown = true;
+ } else if ($event == 'restart') {
+ $this->master->requestRestart();
+ $shutdown = true;
+ } else if ($event == 'update') {
+ $this->updateSiteConfig($param);
+ } else {
+ $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message");
+ }
+ return $shutdown;
+ }
+
+ /**
+ * Switch site, if necessary, and reset current handler assignments
+ * @param string $site
+ */
+ function switchSite($site)
+ {
+ if ($site != StatusNet::currentSite()) {
+ $this->stats('switch');
+ StatusNet::switchSite($site);
+ $this->initialize();
+ }
+ }
+
+ /**
+ * (Re)load runtime configuration for a given site by nickname,
+ * triggered by a broadcast to the 'statusnet-control' topic.
+ *
+ * Configuration changes in database should update, but config
+ * files might not.
+ *
+ * @param array $frame Stomp frame
+ * @return bool true to continue; false to stop further processing.
+ */
+ protected function updateSiteConfig($nickname)
+ {
+ $sn = Status_network::staticGet($nickname);
+ if ($sn) {
+ $this->switchSite($nickname);
+ if (!in_array($nickname, $this->sites)) {
+ $this->addSite();
+ }
+ $this->stats('siteupdate');
+ } else {
+ $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
+ }
+ }
+
+ /**
+ * Combines the queue_basename from configuration with the
+ * group name for this queue to give eg:
+ *
+ * /queue/statusnet/main
+ * /queue/statusnet/main/distrib
+ * /queue/statusnet/xmpp/xmppout/site01
+ *
+ * @param string $queue
+ * @return string
+ */
+ protected function queueName($queue)
+ {
+ $group = $this->queueGroup($queue);
+ $site = StatusNet::currentSite();
+
+ $specs = array("$group/$queue/$site",
+ "$group/$queue");
+ foreach ($specs as $spec) {
+ if (in_array($spec, $this->breakout)) {
+ return $this->base . $spec;
+ }
+ }
+ return $this->base . $group;
+ }
+
+ /**
+ * Get the breakout mode for the given queue on the current site.
+ *
+ * @param string $queue
+ * @return string one of 'shared', 'handler', 'site'
+ */
+ protected function breakoutMode($queue)
+ {
+ $breakout = common_config('queue', 'breakout');
+ if (isset($breakout[$queue])) {
+ return $breakout[$queue];
+ } else if (isset($breakout['*'])) {
+ return $breakout['*'];
+ } else {
+ return 'shared';
+ }
+ }
+
+ protected function begin($idx)
+ {
+ if ($this->useTransactions) {
+ if (!empty($this->transaction[$idx])) {
+ throw new Exception("Tried to start transaction in the middle of a transaction");
+ }
+ $this->transactionCount[$idx]++;
+ $this->transaction[$idx] = $this->master->id . '-' . $this->transactionCount[$idx] . '-' . time();
+ $this->cons[$idx]->begin($this->transaction[$idx]);
+ }
+ }
+
+ protected function ack($idx, $frame)
+ {
+ if ($this->useTransactions) {
+ if (empty($this->transaction[$idx])) {
+ throw new Exception("Tried to ack but not in a transaction");
+ }
+ $this->cons[$idx]->ack($frame, $this->transaction[$idx]);
+ } else {
+ $this->cons[$idx]->ack($frame);
+ }
+ }
+
+ protected function commit($idx)
+ {
+ if ($this->useTransactions) {
+ if (empty($this->transaction[$idx])) {
+ throw new Exception("Tried to commit but not in a transaction");
+ }
+ $this->cons[$idx]->commit($this->transaction[$idx]);
+ $this->transaction[$idx] = null;
+ }
+ }
+
+ protected function rollback($idx)
+ {
+ if ($this->useTransactions) {
+ if (empty($this->transaction[$idx])) {
+ throw new Exception("Tried to rollback but not in a transaction");
+ }
+ $this->cons[$idx]->commit($this->transaction[$idx]);
+ $this->transaction[$idx] = null;
+ }