X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=plugins%2FOStatus%2Fclasses%2FHubSub.php;h=e7ac23af5821ee5dfdfa4e480d0ddc1c4b301b94;hb=4101de7dd7cf059816c29c666c816f260a84c252;hp=c420b3eef8414826101910e08ec9f8e07bb1a642;hpb=d644f4148b5856c55ecc3770552c330d5f33348c;p=quix0rs-gnu-social.git diff --git a/plugins/OStatus/classes/HubSub.php b/plugins/OStatus/classes/HubSub.php index c420b3eef8..e7ac23af58 100644 --- a/plugins/OStatus/classes/HubSub.php +++ b/plugins/OStatus/classes/HubSub.php @@ -17,6 +17,10 @@ * along with this program. If not, see . */ +if (!defined('STATUSNET')) { + exit(1); +} + /** * PuSH feed subscription record * @package Hub @@ -54,7 +58,6 @@ class HubSub extends Memcached_DataObject * * @return array array of column definitions */ - function table() { return array('hashkey' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, @@ -77,7 +80,7 @@ class HubSub extends Memcached_DataObject new ColumnDef('topic', 'varchar', /*size*/255, /*nullable*/false, - /*key*/'KEY'), + /*key*/'MUL'), new ColumnDef('callback', 'varchar', 255, false), new ColumnDef('secret', 'text', @@ -112,7 +115,6 @@ class HubSub extends Memcached_DataObject * * @return array key definitions */ - function keyTypes() { return array('hashkey' => 'K'); @@ -206,7 +208,8 @@ class HubSub extends Memcached_DataObject if ($status >= 200 && $status < 300) { common_log(LOG_INFO, "Verified $mode of $this->callback:$this->topic"); } else { - throw new ClientException("Hub subscriber verification returned HTTP $status"); + // TRANS: Client exception. %s is a HTTP status code. + throw new ClientException(sprintf(_m('Hub subscriber verification returned HTTP %s.'),$status)); } $old = HubSub::staticGet($this->topic, $this->callback); @@ -260,6 +263,37 @@ class HubSub extends Memcached_DataObject $retries = intval(common_config('ostatus', 'hub_retries')); } + if (common_config('ostatus', 'local_push_bypass')) { + // If target is a local site, bypass the web server and drop the + // item directly into the target's input queue. + $url = parse_url($this->callback); + $wildcard = common_config('ostatus', 'local_wildcard'); + $site = Status_network::getFromHostname($url['host'], $wildcard); + + if ($site) { + if ($this->secret) { + $hmac = 'sha1=' . hash_hmac('sha1', $atom, $this->secret); + } else { + $hmac = ''; + } + + // Hack: at the moment we stick the subscription ID in the callback + // URL so we don't have to look inside the Atom to route the subscription. + // For now this means we need to extract that from the target URL + // so we can include it in the data. + $parts = explode('/', $url['path']); + $subId = intval(array_pop($parts)); + + $data = array('feedsub_id' => $subId, + 'post' => $atom, + 'hmac' => $hmac); + common_log(LOG_DEBUG, "Cross-site PuSH bypass enqueueing straight to $site->nickname feed $subId"); + $qm = QueueManager::get(); + $qm->enqueue($data, 'pushin', $site->nickname); + return; + } + } + // We dare not clone() as when the clone is discarded it'll // destroy the result data for the parent query. // @fixme use clone() again when it's safe to copy an @@ -273,6 +307,26 @@ class HubSub extends Memcached_DataObject $qm->enqueue($data, 'hubout'); } + /** + * Queue up a large batch of pushes to multiple subscribers + * for this same topic update. + * + * If queues are disabled, this will run immediately. + * + * @param string $atom well-formed Atom feed + * @param array $pushCallbacks list of callback URLs + */ + function bulkDistribute($atom, $pushCallbacks) + { + $data = array('atom' => $atom, + 'topic' => $this->topic, + 'pushCallbacks' => $pushCallbacks); + common_log(LOG_INFO, "Queuing PuSH batch: $this->topic to " . + count($pushCallbacks) . " sites"); + $qm = QueueManager::get(); + $qm->enqueue($data, 'hubprep'); + } + /** * Send a 'fat ping' to the subscriber's callback endpoint * containing the given Atom feed chunk. @@ -301,11 +355,9 @@ class HubSub extends Memcached_DataObject if ($response->isOk()) { return true; } else { - throw new Exception("Callback returned status: " . - $response->getStatus() . - "; body: " . - trim($response->getBody())); + // TRANS: Exception. %1$s is a response status code, %2$s is the body of the response. + throw new Exception(sprintf(_m('Callback returned status: %1$s. Body: %2$s'), + $response->getStatus(),trim($response->getBody()))); } } } -