From: Michael Date: Thu, 17 May 2018 22:17:03 +0000 (+0000) Subject: Fix to OStatus delivery to be not so blocking to other tasks X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=5a1e1c1ec9a59412eab33234f4b88b5be4f1baae;p=friendica.git Fix to OStatus delivery to be not so blocking to other tasks --- diff --git a/boot.php b/boot.php index bbc4e9d298..ad162705be 100644 --- a/boot.php +++ b/boot.php @@ -41,7 +41,7 @@ define('FRIENDICA_PLATFORM', 'Friendica'); define('FRIENDICA_CODENAME', 'The Tazmans Flax-lily'); define('FRIENDICA_VERSION', '2018.05-rc'); define('DFRN_PROTOCOL_VERSION', '2.23'); -define('DB_UPDATE_VERSION', 1262); +define('DB_UPDATE_VERSION', 1263); define('NEW_UPDATE_ROUTINE_VERSION', 1170); /** diff --git a/src/Database/DBStructure.php b/src/Database/DBStructure.php index d13ba19df5..c94690a4c8 100644 --- a/src/Database/DBStructure.php +++ b/src/Database/DBStructure.php @@ -1553,12 +1553,14 @@ class DBStructure "callback_url" => ["type" => "varchar(255)", "not null" => "1", "default" => "", "comment" => ""], "topic" => ["type" => "varchar(255)", "not null" => "1", "default" => "", "comment" => ""], "nickname" => ["type" => "varchar(255)", "not null" => "1", "default" => "", "comment" => ""], - "push" => ["type" => "tinyint unsigned", "not null" => "1", "default" => "0", "comment" => ""], - "last_update" => ["type" => "datetime", "not null" => "1", "default" => NULL_DATE, "comment" => ""], + "push" => ["type" => "tinyint unsigned", "not null" => "1", "default" => "0", "comment" => "Retrial counter"], + "last_update" => ["type" => "datetime", "not null" => "1", "default" => NULL_DATE, "comment" => "Date of last successful trial"], + "next_try" => ["type" => "datetime", "not null" => "1", "default" => NULL_DATE, "comment" => "Next retrial date"], "secret" => ["type" => "varchar(255)", "not null" => "1", "default" => "", "comment" => ""], ], "indexes" => [ "PRIMARY" => ["id"], + "next_try" => ["next_try"], ] ]; $database["queue"] = [ diff --git a/src/Worker/Notifier.php b/src/Worker/Notifier.php index e22e8a1cd1..f8b37c2f90 100644 --- a/src/Worker/Notifier.php +++ b/src/Worker/Notifier.php @@ -501,12 +501,12 @@ class Notifier { // Set push flag for PuSH subscribers to this topic, // they will be notified in queue.php $condition = ['push' => false, 'nickname' => $owner['nickname']]; - dba::update('push_subscriber', ['push' => true], $condition); + dba::update('push_subscriber', ['push' => true, 'next_try' => NULL_DATE], $condition); logger('Activating internal PuSH for item '.$item_id, LOGGER_DEBUG); // Handling the pubsubhubbub requests - Worker::add(['priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true], + Worker::add(['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true], 'PubSubPublish'); } diff --git a/src/Worker/PubSubPublish.php b/src/Worker/PubSubPublish.php index bd4aa0390b..1b2dbbef9f 100644 --- a/src/Worker/PubSubPublish.php +++ b/src/Worker/PubSubPublish.php @@ -12,6 +12,7 @@ use Friendica\Core\Worker; use Friendica\Database\DBM; use Friendica\Protocol\OStatus; use Friendica\Util\Network; +use Friendica\Util\DateTimeFormat; use dba; require_once 'include/items.php'; @@ -21,76 +22,86 @@ class PubSubPublish { { global $a; - if ($pubsubpublish_id == 0) { - // We'll push to each subscriber that has push > 0, - // i.e. there has been an update (set in notifier.php). - $r = q("SELECT `id`, `callback_url` FROM `push_subscriber` WHERE `push` > 0 ORDER BY `last_update` DESC"); - - foreach ($r as $rr) { - logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG); - Worker::add(['priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true], - 'PubSubPublish', (int)$rr["id"]); - } + if ($pubsubpublish_id != 0) { + self::publish($pubsubpublish_id); + return; } - self::publish($pubsubpublish_id); + // We'll push to each subscriber that has push > 0, + // i.e. there has been an update (set in notifier.php). + $subscribers = dba::select('push_subscriber', ['id', 'callback_url'], ["`push` > 0 AND `next_try` < UTC_TIMESTAMP()"]); + + while ($subscriber = dba::fetch($subscribers)) { + logger("Publish feed to " . $subscriber["callback_url"], LOGGER_DEBUG); + Worker::add(['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true], + 'PubSubPublish', (int)$subscriber["id"]); + } - return; + dba::close($subscribers); } private static function publish($id) { global $a; - $r = q("SELECT * FROM `push_subscriber` WHERE `id` = %d", intval($id)); - if (!DBM::is_result($r)) { + $subscriber = dba::selectFirst('push_subscriber', [], ['id' => $id]); + if (!DBM::is_result($subscriber)) { return; } - $rr = $r[0]; - /// @todo Check server status with PortableContact::checkServer() // Before this can be done we need a way to safely detect the server url. - logger("Generate feed of user ".$rr['nickname']." to ".$rr['callback_url']." - last updated ".$rr['last_update'], LOGGER_DEBUG); + logger("Generate feed of user " . $subscriber['nickname']. " to " . $subscriber['callback_url']. " - last updated " . $subscriber['last_update'], LOGGER_DEBUG); - $last_update = $rr['last_update']; - $params = OStatus::feed($rr['nickname'], $last_update); + $last_update = $subscriber['last_update']; + $params = OStatus::feed($subscriber['nickname'], $last_update); if (!$params) { return; } - $hmac_sig = hash_hmac("sha1", $params, $rr['secret']); + $hmac_sig = hash_hmac("sha1", $params, $subscriber['secret']); $headers = ["Content-type: application/atom+xml", sprintf("Link: <%s>;rel=hub,<%s>;rel=self", - System::baseUrl().'/pubsubhubbub/'.$rr['nickname'], - $rr['topic']), - "X-Hub-Signature: sha1=".$hmac_sig]; + System::baseUrl() . '/pubsubhubbub/' . $subscriber['nickname'], + $subscriber['topic']), + "X-Hub-Signature: sha1=" . $hmac_sig]; - logger('POST '.print_r($headers, true)."\n".$params, LOGGER_DATA); + logger('POST ' . print_r($headers, true) . "\n" . $params, LOGGER_DATA); - Network::post($rr['callback_url'], $params, $headers); + Network::post($subscriber['callback_url'], $params, $headers); $ret = $a->get_curl_code(); + $condition = ['id' => $subscriber['id']]; + if ($ret >= 200 && $ret <= 299) { - logger('successfully pushed to '.$rr['callback_url']); + logger('Successfully pushed to ' . $subscriber['callback_url']); // set last_update to the "created" date of the last item, and reset push=0 - $fields = ['push' => 0, 'last_update' => $last_update]; - dba::update('push_subscriber', $fields, ['id' => $rr['id']]); + $fields = ['push' => 0, 'next_try' => NULL_DATE, 'last_update' => $last_update]; + dba::update('push_subscriber', $fields, $condition); } else { - logger('error when pushing to '.$rr['callback_url'].' HTTP: '.$ret); + logger('Delivery error when pushing to ' . $subscriber['callback_url'] . ' HTTP: ' . $ret); // we use the push variable also as a counter, if we failed we // increment this until some upper limit where we give up - $new_push = intval($rr['push']) + 1; + $retrial = $subscriber['push']; - if ($new_push > 30) // OK, let's give up - $new_push = 0; + if ($retrial > 14) { + dba::update('push_subscriber', ['push' => 0, 'next_try' => NULL_DATE], $condition); + logger('Delivery error: Giving up for ' . $subscriber['callback_url'], LOGGER_DEBUG); + } else { + // Calculate the delay until the next trial + $delay = (($retrial + 3) ** 4) + (rand(1, 30) * ($retrial + 1)); + $next = DateTimeFormat::utc('now + ' . $delay . ' seconds'); - dba::update('push_subscriber', ['push' => $new_push], ['id' => $rr['id']]); + $retrial = $retrial + 1; + + dba::update('push_subscriber', ['push' => $retrial, 'next_try' => $next], $condition); + logger('Delivery error: Next try (' . $retrial . ') for ' . $subscriber['callback_url'] . ' at ' . $next, LOGGER_DEBUG); + } } } } diff --git a/src/Worker/Queue.php b/src/Worker/Queue.php index 345da39abb..20ade6dfae 100644 --- a/src/Worker/Queue.php +++ b/src/Worker/Queue.php @@ -34,7 +34,7 @@ class Queue logger('filling queue jobs - start'); // Handling the pubsubhubbub requests - Worker::add(['priority' => PRIORITY_HIGH, 'dont_fork' => true], 'PubSubPublish'); + Worker::add(['priority' => PRIORITY_LOW, 'dont_fork' => true], 'PubSubPublish'); $r = dba::inArray(dba::p("SELECT `id` FROM `queue` WHERE `next` < UTC_TIMESTAMP() ORDER BY `batch`, `cid`"));