+ return $ok;
+ }
+
+ /**
+ * Check if a redelivered message has been run through enough
+ * that we're going to give up on it.
+ *
+ * @param StompFrame $frame
+ * @param array $message unserialized message body
+ * @return boolean true if we should discard
+ */
+ protected function isDeadLetter($frame, $message)
+ {
+ if (isset($frame->headers['redelivered']) && $frame->headers['redelivered'] == 'true') {
+ // Message was redelivered, possibly indicating a previous failure.
+ $msgId = $frame->headers['message-id'];
+ $site = $message['site'];
+ $queue = $message['handler'];
+ $msgInfo = "message $msgId for $site in queue $queue";
+
+ $deliveries = $this->incDeliveryCount($msgId);
+ if ($deliveries > common_config('queue', 'max_retries')) {
+ $info = "DEAD-LETTER FILE: Gave up after retry $deliveries on $msgInfo";
+
+ $outdir = common_config('queue', 'dead_letter_dir');
+ if ($outdir) {
+ $filename = $outdir . "/$site-$queue-" . rawurlencode($msgId);
+ $info .= ": dumping to $filename";
+ file_put_contents($filename, $message['payload']);
+ }
+
+ common_log(LOG_ERR, $info);
+ return true;
+ } else {
+ common_log(LOG_INFO, "retry $deliveries on $msgInfo");
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Update count of times we've re-encountered this message recently,
+ * triggered when we get a message marked as 'redelivered'.
+ *
+ * Requires a CLI-friendly cache configuration.
+ *
+ * @param string $msgId message-id header from message
+ * @return int number of retries recorded
+ */
+ function incDeliveryCount($msgId)
+ {
+ $count = 0;
+ $cache = common_memcache();
+ if ($cache) {
+ $key = 'statusnet:stomp:message-retries:' . $msgId;
+ $count = $cache->increment($key);
+ if (!$count) {
+ $count = 1;
+ $cache->set($key, $count, null, 3600);
+ $got = $cache->get($key);
+ }
+ }
+ return $count;
+ }
+
+ /**
+ * Process a control signal broadcast.
+ *
+ * @param int $idx connection index
+ * @param array $frame Stomp frame
+ * @return bool true to continue; false to stop further processing.
+ */
+ protected function handleControlSignal($idx, $frame)
+ {
+ $message = trim($frame->body);
+ if (strpos($message, ':') !== false) {
+ list($event, $param) = explode(':', $message, 2);
+ } else {
+ $event = $message;
+ $param = '';
+ }
+
+ $shutdown = false;
+
+ if ($event == 'shutdown') {
+ $this->master->requestShutdown();
+ $shutdown = true;
+ } else if ($event == 'restart') {
+ $this->master->requestRestart();
+ $shutdown = true;
+ } else if ($event == 'update') {
+ $this->updateSiteConfig($param);
+ } else {
+ $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message");
+ }
+ return $shutdown;
+ }
+
+ /**
+ * Switch site, if necessary, and reset current handler assignments
+ * @param string $site
+ */
+ function switchSite($site)
+ {
+ if ($site != StatusNet::currentSite()) {
+ $this->stats('switch');
+ StatusNet::switchSite($site);
+ $this->initialize();
+ }
+ }
+
+ /**
+ * (Re)load runtime configuration for a given site by nickname,
+ * triggered by a broadcast to the 'statusnet-control' topic.
+ *
+ * Configuration changes in database should update, but config
+ * files might not.
+ *
+ * @param array $frame Stomp frame
+ * @return bool true to continue; false to stop further processing.
+ */
+ protected function updateSiteConfig($nickname)
+ {
+ $sn = Status_network::staticGet('nickname', $nickname);
+ if ($sn) {
+ $this->switchSite($nickname);
+ if (!in_array($nickname, $this->sites)) {
+ $this->addSite();
+ }
+ $this->stats('siteupdate');
+ } else {
+ $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
+ }