From e9848d7b8f67f5ce4983c25cd1d4d956dfe80a19 Mon Sep 17 00:00:00 2001
From: Michael <heluecht@pirati.ca>
Date: Sat, 19 May 2018 03:56:29 +0000
Subject: [PATCH] End subscription for unreachable subscribers

---
 boot.php                     |   2 +-
 database.sql                 |   4 +-
 mod/pubsubhubbub.php         |  31 +--------
 src/Database/DBStructure.php |   2 +-
 src/Model/PushSubscriber.php | 127 +++++++++++++++++++++++++++++++++--
 src/Worker/Notifier.php      |   7 +-
 src/Worker/PubSubPublish.php |  27 +-------
 src/Worker/Queue.php         |   2 +-
 8 files changed, 133 insertions(+), 69 deletions(-)

diff --git a/boot.php b/boot.php
index b4dca49a5c..78bd68f3e4 100644
--- a/boot.php
+++ b/boot.php
@@ -41,7 +41,7 @@ define('FRIENDICA_PLATFORM',     'Friendica');
 define('FRIENDICA_CODENAME',     'The Tazmans Flax-lily');
 define('FRIENDICA_VERSION',      '2018.05-rc');
 define('DFRN_PROTOCOL_VERSION',  '2.23');
-define('DB_UPDATE_VERSION',      1264);
+define('DB_UPDATE_VERSION',      1265);
 define('NEW_UPDATE_ROUTINE_VERSION', 1170);
 
 /**
diff --git a/database.sql b/database.sql
index bc002cfed5..64ac656fd9 100644
--- a/database.sql
+++ b/database.sql
@@ -1,6 +1,6 @@
 -- ------------------------------------------
 -- Friendica 2018.05-rc (The Tazmans Flax-lily)
--- DB_UPDATE_VERSION 1264
+-- DB_UPDATE_VERSION 1265
 -- ------------------------------------------
 
 
@@ -855,7 +855,7 @@ CREATE TABLE IF NOT EXISTS `push_subscriber` (
 	`callback_url` varchar(255) NOT NULL DEFAULT '' COMMENT '',
 	`topic` varchar(255) NOT NULL DEFAULT '' COMMENT '',
 	`nickname` varchar(255) NOT NULL DEFAULT '' COMMENT '',
-	`push` tinyint unsigned NOT NULL DEFAULT 0 COMMENT 'Retrial counter',
+	`push` tinyint NOT NULL DEFAULT 0 COMMENT 'Retrial counter',
 	`last_update` datetime NOT NULL DEFAULT '0001-01-01 00:00:00' COMMENT 'Date of last successful trial',
 	`next_try` datetime NOT NULL DEFAULT '0001-01-01 00:00:00' COMMENT 'Next retrial date',
 	`renewed` datetime NOT NULL DEFAULT '0001-01-01 00:00:00' COMMENT 'Date of last subscription renewal',
diff --git a/mod/pubsubhubbub.php b/mod/pubsubhubbub.php
index 1abf3fbe2b..6ee7955e07 100644
--- a/mod/pubsubhubbub.php
+++ b/mod/pubsubhubbub.php
@@ -4,8 +4,8 @@ use Friendica\App;
 use Friendica\Core\Config;
 use Friendica\Core\System;
 use Friendica\Database\DBM;
-use Friendica\Util\DateTimeFormat;
 use Friendica\Util\Network;
+use Friendica\Model\PushSubscriber;
 
 function post_var($name) {
 	return (x($_POST, $name)) ? notags(trim($_POST[$name])) : '';
@@ -120,35 +120,8 @@ function pubsubhubbub_init(App $a) {
 			System::httpExit(404);
 		}
 
-		// fetch the old subscription if it exists
-		$subscriber = dba::selectFirst('push_subscriber', ['last_update', 'push'], ['callback_url' => $hub_callback]);
+		PushSubscriber::renew($owner['uid'], $nick, $subscribe, $hub_callback, $hub_topic, $hub_secret);
 
-		// delete old subscription if it exists
-		dba::delete('push_subscriber', ['callback_url' => $hub_callback]);
-
-		if ($subscribe) {
-			$last_update = DateTimeFormat::utcNow();
-			$push_flag = 0;
-
-			// if we are just updating an old subscription, keep the
-			// old values for last_update but reset the push
-			if (DBM::is_result($subscriber)) {
-				$last_update = $subscriber['last_update'];
-				$push_flag = min($subscriber['push'], 1);
-			}
-
-			// subscribe means adding the row to the table
-			$fields = ['uid' => $owner['uid'], 'callback_url' => $hub_callback,
-				'topic' => $hub_topic, 'nickname' => $nick, 'push' => $push_flag,
-				'last_update' => $last_update, 'renewed' => DateTimeFormat::utcNow(),
-				'secret' => $hub_secret];
-			dba::insert('push_subscriber', $fields);
-
-			logger("Successfully subscribed [$hub_callback].");
-		} else {
-			logger("Successfully unsubscribed [$hub_callback].");
-			// we do nothing here, since the row was already deleted
-		}
 		System::httpExit(202);
 	}
 	killme();
diff --git a/src/Database/DBStructure.php b/src/Database/DBStructure.php
index 21301947cf..6db786dfc8 100644
--- a/src/Database/DBStructure.php
+++ b/src/Database/DBStructure.php
@@ -1553,7 +1553,7 @@ class DBStructure
 						"callback_url" => ["type" => "varchar(255)", "not null" => "1", "default" => "", "comment" => ""],
 						"topic" => ["type" => "varchar(255)", "not null" => "1", "default" => "", "comment" => ""],
 						"nickname" => ["type" => "varchar(255)", "not null" => "1", "default" => "", "comment" => ""],
-						"push" => ["type" => "tinyint unsigned", "not null" => "1", "default" => "0", "comment" => "Retrial counter"],
+						"push" => ["type" => "tinyint", "not null" => "1", "default" => "0", "comment" => "Retrial counter"],
 						"last_update" => ["type" => "datetime", "not null" => "1", "default" => NULL_DATE, "comment" => "Date of last successful trial"],
 						"next_try" => ["type" => "datetime", "not null" => "1", "default" => NULL_DATE, "comment" => "Next retrial date"],
 						"renewed" => ["type" => "datetime", "not null" => "1", "default" => NULL_DATE, "comment" => "Date of last subscription renewal"],
diff --git a/src/Model/PushSubscriber.php b/src/Model/PushSubscriber.php
index 76a7e06312..925a461e2a 100644
--- a/src/Model/PushSubscriber.php
+++ b/src/Model/PushSubscriber.php
@@ -5,6 +5,8 @@
 namespace Friendica\Model;
 
 use Friendica\Core\Worker;
+use Friendica\Util\DateTimeFormat;
+use Friendica\Database\DBM;
 use dba;
 
 require_once 'include/dba.php';
@@ -12,26 +14,141 @@ require_once 'include/dba.php';
 class PushSubscriber
 {
 	/**
+	 * @brief Send subscription notifications for the given user
+	 *
+	 * @param integer $uid      User ID
+	 * @param string  $priority Priority for push workers
+	 */
+	public static function publishFeed($uid, $default_priority = PRIORITY_HIGH)
+	{
+		$condition = ['push' => 0, 'uid' => $uid];
+		dba::update('push_subscriber', ['push' => 1, 'next_try' => NULL_DATE], $condition);
+
+		self::requeue($default_priority);
+	}
+
+	/**
+	 * @brief start workers to transmit the feed data
+	 *
 	 * @param string $priority Priority for push workers
 	 */
-	public static function publishFeed($default_priority = PRIORITY_HIGH)
+	public static function requeue($default_priority = PRIORITY_HIGH)
 	{
 		// We'll push to each subscriber that has push > 0,
 		// i.e. there has been an update (set in notifier.php).
-		$subscribers = dba::select('push_subscriber', ['id', 'push', 'callback_url'], ["`push` > 0 AND `next_try` < UTC_TIMESTAMP()"]);
+		$subscribers = dba::select('push_subscriber', ['id', 'push', 'callback_url', 'nickname'], ["`push` > 0 AND `next_try` < UTC_TIMESTAMP()"]);
 
 		while ($subscriber = dba::fetch($subscribers)) {
 			// We always handle retries with low priority
-			if ($subscriber["push"] > 1) {
+			if ($subscriber['push'] > 1) {
 				$priority = PRIORITY_LOW;
 			} else {
 				$priority = $default_priority;
 			}
 
-			logger("Publish feed to " . $subscriber["callback_url"] . " with priority " . $priority, LOGGER_DEBUG);
-			Worker::add($priority, 'PubSubPublish', (int)$subscriber["id"]);
+			logger('Publish feed to ' . $subscriber['callback_url'] . ' for ' . $subscriber['nickname'] . ' with priority ' . $priority, LOGGER_DEBUG);
+			Worker::add($priority, 'PubSubPublish', (int)$subscriber['id']);
 		}
 
 		dba::close($subscribers);
 	}
+
+	/**
+	 * @brief Renew the feed subscription
+	 *
+	 * @param integer $uid          User ID
+	 * @param string  $nick         Priority for push workers
+	 * @param integer $subscribe    Subscribe (Unsubscribe = false)
+	 * @param string  $hub_callback Callback address
+	 * @param string  $hub_topic    Feed topic
+	 * @param string  $hub_secret   Subscription secret
+	 */
+	public static function renew($uid, $nick, $subscribe, $hub_callback, $hub_topic, $hub_secret)
+	{
+		// fetch the old subscription if it exists
+		$subscriber = dba::selectFirst('push_subscriber', ['last_update', 'push'], ['callback_url' => $hub_callback]);
+
+		// delete old subscription if it exists
+		dba::delete('push_subscriber', ['callback_url' => $hub_callback]);
+
+		if ($subscribe) {
+			// if we are just updating an old subscription, keep the
+			// old values for last_update but reset the push
+			if (DBM::is_result($subscriber)) {
+				$last_update = $subscriber['last_update'];
+				$push_flag = min($subscriber['push'], 1);
+			} else {
+				$last_update = DateTimeFormat::utcNow();
+				$push_flag = 0;
+			}
+
+			// subscribe means adding the row to the table
+			$fields = ['uid' => $uid, 'callback_url' => $hub_callback,
+				'topic' => $hub_topic, 'nickname' => $nick, 'push' => $push_flag,
+				'last_update' => $last_update, 'renewed' => DateTimeFormat::utcNow(),
+				'secret' => $hub_secret];
+			dba::insert('push_subscriber', $fields);
+
+			logger("Successfully subscribed [$hub_callback] for $nick");
+		} else {
+			logger("Successfully unsubscribed [$hub_callback] for $nick");
+			// we do nothing here, since the row was already deleted
+		}
+	}
+
+	/**
+	 * @brief Delay the push subscriber
+	 *
+	 * @param integer $id Subscriber ID
+	 */
+	public static function delay($id)
+	{
+		$subscriber = dba::selectFirst('push_subscriber', ['push', 'callback_url', 'renewed', 'nickname'], ['id' => $id]);
+		if (!DBM::is_result($subscriber)) {
+			return;
+		}
+
+		$retrial = $subscriber['push'];
+
+		if ($retrial > 14) {
+			// End subscriptions if they weren't renewed for more than two months
+			$days = round((time() -  strtotime($subscriber['renewed'])) / (60 * 60 * 24));
+
+			if ($days > 60) {
+				dba::update('push_subscriber', ['push' => -1, 'next_try' => NULL_DATE], ['id' => $id]);
+				logger('Delivery error: Subscription ' . $subscriber['callback_url'] . ' for ' . $subscriber['nickname'] . ' is marked as ended.', LOGGER_DEBUG);
+			} else {
+				dba::update('push_subscriber', ['push' => 0, 'next_try' => NULL_DATE], ['id' => $id]);
+				logger('Delivery error: Giving up ' . $subscriber['callback_url'] . ' for ' . $subscriber['nickname'] . ' for now.', LOGGER_DEBUG);
+			}
+		} else {
+			// Calculate the delay until the next trial
+			$delay = (($retrial + 3) ** 4) + (rand(1, 30) * ($retrial + 1));
+			$next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
+
+			$retrial = $retrial + 1;
+
+			dba::update('push_subscriber', ['push' => $retrial, 'next_try' => $next], ['id' => $id]);
+			logger('Delivery error: Next try (' . $retrial . ') ' . $subscriber['callback_url'] . ' for ' . $subscriber['nickname'] . ' at ' . $next, LOGGER_DEBUG);
+		}
+	}
+
+	/**
+	 * @brief Reset the push subscriber
+	 *
+	 * @param integer $id          Subscriber ID
+	 * @param date    $last_update Date of last transmitted item
+	 */
+	public static function reset($id, $last_update)
+	{
+		$subscriber = dba::selectFirst('push_subscriber', ['callback_url', 'nickname'], ['id' => $id]);
+		if (!DBM::is_result($subscriber)) {
+			return;
+		}
+
+		// set last_update to the 'created' date of the last item, and reset push=0
+		$fields = ['push' => 0, 'next_try' => NULL_DATE, 'last_update' => $last_update];
+		dba::update('push_subscriber', $fields, ['id' => $id]);
+		logger('Subscriber ' . $subscriber['callback_url'] . ' for ' . $subscriber['nickname'] . ' is marked as vital', LOGGER_DEBUG);
+	}
 }
diff --git a/src/Worker/Notifier.php b/src/Worker/Notifier.php
index f7b7aeae14..11a6f6f68a 100644
--- a/src/Worker/Notifier.php
+++ b/src/Worker/Notifier.php
@@ -499,15 +499,10 @@ class Notifier {
 
 		// Notify PuSH subscribers (Used for OStatus distribution of regular posts)
 		if ($push_notify) {
-			// Set push flag for PuSH subscribers to this topic,
-			// they will be notified in queue.php
-			$condition = ['push' => false, 'nickname' => $owner['nickname']];
-			dba::update('push_subscriber', ['push' => true, 'next_try' => NULL_DATE], $condition);
-
 			logger('Activating internal PuSH for item '.$item_id, LOGGER_DEBUG);
 
 			// Handling the pubsubhubbub requests
-			PushSubscriber::publishFeed($a->queue['priority']);
+			PushSubscriber::publishFeed($owner['uid'], $a->queue['priority']);
 		}
 
 		logger('notifier: calling hooks for ' . $cmd . ' ' . $item_id, LOGGER_DEBUG);
diff --git a/src/Worker/PubSubPublish.php b/src/Worker/PubSubPublish.php
index 27b6f3d9a1..0a60e5a599 100644
--- a/src/Worker/PubSubPublish.php
+++ b/src/Worker/PubSubPublish.php
@@ -7,12 +7,10 @@ namespace Friendica\Worker;
 
 use Friendica\App;
 use Friendica\Core\System;
-use Friendica\Core\Config;
-use Friendica\Core\Worker;
 use Friendica\Database\DBM;
 use Friendica\Protocol\OStatus;
 use Friendica\Util\Network;
-use Friendica\Util\DateTimeFormat;
+use Friendica\Model\PushSubscriber;
 use dba;
 
 require_once 'include/items.php';
@@ -65,30 +63,11 @@ class PubSubPublish {
 		if ($ret >= 200 && $ret <= 299) {
 			logger('Successfully pushed to ' . $subscriber['callback_url']);
 
-			// set last_update to the "created" date of the last item, and reset push=0
-			$fields = ['push' => 0, 'next_try' => NULL_DATE, 'last_update' => $last_update];
-			dba::update('push_subscriber', $fields, $condition);
-
+			PushSubscriber::reset($subscriber['id'], $last_update);
 		} else {
 			logger('Delivery error when pushing to ' . $subscriber['callback_url'] . ' HTTP: ' . $ret);
 
-			// we use the push variable also as a counter, if we failed we
-			// increment this until some upper limit where we give up
-			$retrial = $subscriber['push'];
-
-			if ($retrial > 14) {
-				dba::update('push_subscriber', ['push' => 0, 'next_try' => NULL_DATE], $condition);
-				logger('Delivery error: Giving up for ' . $subscriber['callback_url'], LOGGER_DEBUG);
-			} else {
-				// Calculate the delay until the next trial
-				$delay = (($retrial + 3) ** 4) + (rand(1, 30) * ($retrial + 1));
-				$next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
-
-				$retrial = $retrial + 1;
-
-				dba::update('push_subscriber', ['push' => $retrial, 'next_try' => $next], $condition);
-				logger('Delivery error: Next try (' . $retrial . ') for ' . $subscriber['callback_url'] . ' at ' . $next, LOGGER_DEBUG);
-			}
+			PushSubscriber::delay($subscriber['id']);
 		}
 	}
 }
diff --git a/src/Worker/Queue.php b/src/Worker/Queue.php
index 256227fb62..89ec282627 100644
--- a/src/Worker/Queue.php
+++ b/src/Worker/Queue.php
@@ -35,7 +35,7 @@ class Queue
 			logger('filling queue jobs - start');
 
 			// Handling the pubsubhubbub requests
-			PushSubscriber::publishFeed();
+			PushSubscriber::requeue();
 
 			$r = dba::inArray(dba::p("SELECT `id` FROM `queue` WHERE `next` < UTC_TIMESTAMP() ORDER BY `batch`, `cid`"));
 
-- 
2.39.5