]> git.mxchange.org Git - friendica.git/commitdiff
Fix delivery counter / archive relay contacts
authorMichael <heluecht@pirati.ca>
Mon, 2 Sep 2019 03:25:05 +0000 (03:25 +0000)
committerMichael <heluecht@pirati.ca>
Mon, 2 Sep 2019 03:25:05 +0000 (03:25 +0000)
src/Core/Worker.php
src/Model/Contact.php
src/Model/ItemDeliveryData.php
src/Protocol/Diaspora.php
src/Worker/APDelivery.php
src/Worker/Delivery.php
src/Worker/Notifier.php

index 3da17d4a6e481d7099b9eb8e4e683477bf7f8f9b..c64b0ebc6bdaa681e631e46e25dddb8526aa5bf2 100644 (file)
@@ -1104,7 +1104,7 @@ class Worker
         * or: Worker::add(PRIORITY_HIGH, "Notifier", Delivery::DELETION, $drop_id);
         * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "CreateShadowEntry", $post_id);
         *
-        * @return boolean "false" if proc_run couldn't be executed
+        * @return boolean "false" if worker queue entry already existed or there had been an error
         * @throws \Friendica\Network\HTTPException\InternalServerErrorException
         * @note $cmd and string args are surrounded with ""
         *
@@ -1154,6 +1154,7 @@ class Worker
 
                $parameters = json_encode($args);
                $found = DBA::exists('workerqueue', ['parameter' => $parameters, 'done' => false]);
+               $added = false;
 
                // Quit if there was a database error - a precaution for the update process to 3.5.3
                if (DBA::errorNo() != 0) {
@@ -1161,19 +1162,22 @@ class Worker
                }
 
                if (!$found) {
-                       DBA::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, 'priority' => $priority]);
+                       $added = DBA::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, 'priority' => $priority]);
+                       if (!$added) {
+                               return false;
+                       }
                } elseif ($force_priority) {
                        DBA::update('workerqueue', ['priority' => $priority], ['parameter' => $parameters, 'done' => false, 'pid' => 0]);
                }
 
                // Should we quit and wait for the worker to be called as a cronjob?
                if ($dont_fork) {
-                       return true;
+                       return $added;
                }
 
                // If there is a lock then we don't have to check for too much worker
                if (!Lock::acquire('worker', 0)) {
-                       return true;
+                       return $added;
                }
 
                // If there are already enough workers running, don't fork another one
@@ -1181,19 +1185,19 @@ class Worker
                Lock::release('worker');
 
                if ($quit) {
-                       return true;
+                       return $added;
                }
 
                // We tell the daemon that a new job entry exists
                if (Config::get('system', 'worker_daemon_mode', false)) {
                        // We don't have to set the IPC flag - this is done in "tooMuchWorkers"
-                       return true;
+                       return $added;
                }
 
                // Now call the worker to execute the jobs that we just added to the queue
                self::spawnWorker();
 
-               return true;
+               return $added;
        }
 
        /**
index 9d9ebe95f11334614c188879451eef9618d0fbf6..4bdaa9c7136c2b4a60ebf10f69837e6ae145bcba 100644 (file)
@@ -906,7 +906,7 @@ class Contact extends BaseObject
                // Always unarchive the relay contact entry
                if (!empty($contact['batch']) && !empty($contact['term-date']) && ($contact['term-date'] > DBA::NULL_DATETIME)) {
                        $fields = ['term-date' => DBA::NULL_DATETIME, 'archive' => false];
-                       $condition = ['batch' => $contact['batch'], 'contact-type' => self::TYPE_RELAY];
+                       $condition = ['batch' => $contact['batch'], 'contact-type' => self::TYPE_RELAY, 'uid' => 0];
                        DBA::update('contact', $fields, $condition);
                }
 
@@ -1620,7 +1620,7 @@ class Contact extends BaseObject
 
                // Check status of Diaspora endpoints
                if (!empty($contact['batch'])) {
-                       return DBA::exists('contact', ['archive' => true, 'batch' => $contact['batch'], 'contact-type' => self::TYPE_RELAY]);
+                       return DBA::exists('contact', ['archive' => true, 'batch' => $contact['batch'], 'contact-type' => self::TYPE_RELAY, 'uid' => 0]);
                 }
 
                return false;
index f007537f99c02d687754ae50af8dfaf24996b779..5a181eb1b188f01e81acbc772e21aa5b602671fe 100644 (file)
@@ -103,6 +103,19 @@ class ItemDeliveryData
                return DBA::e('UPDATE `item-delivery-data` SET `queue_failed` = `queue_failed` + 1 WHERE `iid` = ?', $item_id);
        }
 
+       /**
+        * Increments the queue_count for the given item ID.
+        *
+        * @param integer $item_id
+        * @param integer $increment
+        * @return bool
+        * @throws \Exception
+        */
+       public static function incrementQueueCount($item_id, $increment)
+       {
+               return DBA::e('UPDATE `item-delivery-data` SET `queue_count` = `queue_count` + ? WHERE `iid` = ?', $increment, $item_id);
+       }
+
        /**
         * Insert a new item delivery data entry
         *
index b694ab9172109d1363d51190a860c8a8fea35149..a0bb0d9fa3a7796a3efdac08067a66aa60b4509b 100644 (file)
@@ -27,6 +27,7 @@ use Friendica\Model\Conversation;
 use Friendica\Model\GContact;
 use Friendica\Model\Group;
 use Friendica\Model\Item;
+use Friendica\Model\ItemDeliveryData;
 use Friendica\Model\Mail;
 use Friendica\Model\Profile;
 use Friendica\Model\User;
@@ -46,6 +47,35 @@ use SimpleXMLElement;
  */
 class Diaspora
 {
+       /**
+        * Mark the relay contact of the given contact for archival
+        * This is called whenever there is a communication issue with the server.
+        * It avoids sending stuff to servers who don't exist anymore.
+        * The relay contact is a technical contact entry that exists once per server.
+        *
+        * @param array $contact of the relay contact
+        */
+       public static function markRelayForArchival($contact)
+       {
+               if (!empty($contact['contact-type']) && ($contact['contact-type'] == Contact::TYPE_RELAY)) {
+                       // This is already the relay contact, we don't need to fetch it
+                       $relay_contact = $contact;
+               } elseif (empty($contact['baseurl'])) {
+                       if (!empty($contact['batch'])) {
+                               $relay_contact = DBA::selectFirst('contact', [], ['batch' => $contact['batch'], 'contact-type' => Contact::TYPE_RELAY, 'uid' => 0]);
+                       } else {
+                               return;
+                       }
+               } else {
+                       $relay_contact = self::getRelayContact($contact['baseurl'], []);
+               }
+
+               if (!empty($relay_contact)) {
+                       Logger::info('Relay contact will be marked for archival', ['id' => $relay_contact['id'], 'url' => $relay_contact['url']]);
+                       Contact::markForArchival($relay_contact);
+               }
+       }
+
        /**
         * @brief Return a list of relay servers
         *
@@ -140,13 +170,12 @@ class Diaspora
         * @brief Return a contact for a given server address or creates a dummy entry
         *
         * @param string $server_url The url of the server
+        * @param array $fields Fieldlist
         * @return array with the contact
         * @throws \Exception
         */
-       private static function getRelayContact($server_url)
+       private static function getRelayContact($server_url, $fields = ['batch', 'id', 'name', 'network', 'protocol', 'archive', 'blocked'])
        {
-               $fields = ['batch', 'id', 'name', 'network', 'protocol', 'archive', 'blocked'];
-
                // Fetch the relay contact
                $condition = ['uid' => 0, 'nurl' => Strings::normaliseLink($server_url),
                        'contact-type' => Contact::TYPE_RELAY];
@@ -2208,7 +2237,9 @@ class Diaspora
                        }
 
                        Logger::info('Deliver participation', ['item' => $comment['id'], 'contact' => $contact_id]);
-                       Worker::add(PRIORITY_HIGH, 'Delivery', Delivery::POST, $comment['id'], $contact_id);
+                       if (Worker::add(PRIORITY_HIGH, 'Delivery', Delivery::POST, $comment['id'], $contact_id)) {
+                               ItemDeliveryData::incrementQueueCount($comment['id'], 1);
+                       }
                }
                DBA::close($comments);
 
@@ -3181,8 +3212,6 @@ class Diaspora
 
                $logid = Strings::getRandomHex(4);
 
-               $dest_url = ($public_batch ? $contact["batch"] : $contact["notify"]);
-
                // We always try to use the data from the fcontact table.
                // This is important for transmitting data to Friendica servers.
                if (!empty($contact['addr'])) {
@@ -3192,6 +3221,10 @@ class Diaspora
                        }
                }
 
+               if (empty($dest_url)) {
+                       $dest_url = ($public_batch ? $contact["batch"] : $contact["notify"]);
+               }
+
                if (!$dest_url) {
                        Logger::log("no url for contact: ".$contact["id"]." batch mode =".$public_batch);
                        return 0;
index 3c12e45491813eab51952da2647e91d625937c52..812db57013c7154d5a519d63be342b206f5e503a 100644 (file)
@@ -48,14 +48,13 @@ class APDelivery extends BaseObject
                        $data = ActivityPub\Transmitter::createCachedActivityFromItem($target_id);
                        if (!empty($data)) {
                                $success = HTTPSignature::transmit($data, $inbox, $uid);
-                               if ($success && in_array($cmd, [Delivery::POST])) {
-                                       ItemDeliveryData::incrementQueueDone($target_id, ItemDeliveryData::ACTIVITYPUB);
-                               }
                        }
                }
 
                if (!$success && !Worker::defer() && in_array($cmd, [Delivery::POST])) {
                        ItemDeliveryData::incrementQueueFailed($target_id);
+               } elseif ($success && in_array($cmd, [Delivery::POST])) {
+                       ItemDeliveryData::incrementQueueDone($target_id, ItemDeliveryData::ACTIVITYPUB);
                }
        }
 }
index eb86f910dd9dcb03eca57b66c2a21ae7a7cf234d..99731544884f7d002cdfc494471f4aae9b009a2b 100644 (file)
@@ -43,12 +43,14 @@ class Delivery extends BaseObject
                if ($cmd == self::MAIL) {
                        $target_item = DBA::selectFirst('mail', [], ['id' => $target_id]);
                        if (!DBA::isResult($target_item)) {
+                               self::setFailedQueue($cmd, $target_id);
                                return;
                        }
                        $uid = $target_item['uid'];
                } elseif ($cmd == self::SUGGESTION) {
                        $target_item = DBA::selectFirst('fsuggest', [], ['id' => $target_id]);
                        if (!DBA::isResult($target_item)) {
+                               self::setFailedQueue($cmd, $target_id);
                                return;
                        }
                        $uid = $target_item['uid'];
@@ -58,6 +60,7 @@ class Delivery extends BaseObject
                } else {
                        $item = Model\Item::selectFirst(['parent'], ['id' => $target_id]);
                        if (!DBA::isResult($item) || empty($item['parent'])) {
+                               self::setFailedQueue($cmd, $target_id);
                                return;
                        }
                        $parent_id = intval($item['parent']);
@@ -79,11 +82,13 @@ class Delivery extends BaseObject
 
                        if (empty($target_item)) {
                                Logger::log('Item ' . $target_id . "wasn't found. Quitting here.");
+                               self::setFailedQueue($cmd, $target_id);
                                return;
                        }
 
                        if (empty($parent)) {
                                Logger::log('Parent ' . $parent_id . ' for item ' . $target_id . "wasn't found. Quitting here.");
+                               self::setFailedQueue($cmd, $target_id);
                                return;
                        }
 
@@ -93,14 +98,13 @@ class Delivery extends BaseObject
                                $uid = $target_item['uid'];
                        } else {
                                Logger::log('Only public users for item ' . $target_id, Logger::DEBUG);
+                               self::setFailedQueue($cmd, $target_id);
                                return;
                        }
 
                        if (!empty($contact_id) && Model\Contact::isArchived($contact_id)) {
                                Logger::info('Contact is archived', ['id' => $contact_id, 'cmd' => $cmd, 'item' => $target_item['id']]);
-                               if (in_array($cmd, [Delivery::POST, Delivery::POKE])) {
-                                       Model\ItemDeliveryData::incrementQueueFailed($target_item['id']);
-                               }
+                               self::setFailedQueue($cmd, $target_id);
                                return;
                        }
 
@@ -160,6 +164,7 @@ class Delivery extends BaseObject
 
                $owner = Model\User::getOwnerDataById($uid);
                if (!DBA::isResult($owner)) {
+                       self::setFailedQueue($cmd, $target_id);
                        return;
                }
 
@@ -168,10 +173,12 @@ class Delivery extends BaseObject
                        ['id' => $contact_id, 'blocked' => false, 'pending' => false, 'self' => false]
                );
                if (!DBA::isResult($contact)) {
+                       self::setFailedQueue($cmd, $target_id);
                        return;
                }
 
                if (Network::isUrlBlocked($contact['url'])) {
+                       self::setFailedQueue($cmd, $target_id);
                        return;
                }
 
@@ -204,6 +211,15 @@ class Delivery extends BaseObject
                return;
        }
 
+       private static function setFailedQueue($cmd, $id)
+       {
+               if (!in_array($cmd, [Delivery::POST, Delivery::POKE])) {
+                       return;
+               }
+
+               Model\ItemDeliveryData::incrementQueueFailed($id);
+       }
+
        /**
         * @brief Deliver content via DFRN
         *
@@ -300,6 +316,10 @@ class Delivery extends BaseObject
                        // We never spool failed relay deliveries
                        if ($public_dfrn) {
                                Logger::log('Relay delivery to ' . $contact["url"] . ' with guid ' . $target_item["guid"] . ' returns ' . $deliver_status);
+
+                               if (in_array($cmd, [Delivery::POST, Delivery::POKE])) {
+                                       Model\ItemDeliveryData::incrementQueueDone($target_item['id'], $protocol);
+                               }
                                return;
                        }
 
@@ -419,6 +439,11 @@ class Delivery extends BaseObject
                        // The message could not be delivered. We mark the contact as "dead"
                        Model\Contact::markForArchival($contact);
 
+                       // When it is delivered to the public endpoint, we do mark the relay contact for archival as well
+                       if ($public_message) {
+                               Diaspora::markRelayForArchival($contact);
+                       }
+
                        if (empty($contact['contact-type']) || ($contact['contact-type'] != Model\Contact::TYPE_RELAY)) {
                                Logger::info('Delivery failed: defer message', ['id' => defaults($target_item, 'guid', $target_item['id'])]);
                                // defer message for redelivery
index 20023792d51925ca550b1b3685a7f49f284ab535..102b24d0ad64234787e1903443553422e1564cd5 100644 (file)
@@ -446,8 +446,6 @@ class Notifier
 
                                        $conversants[] = $rr['id'];
 
-                                       $delivery_queue_count++;
-
                                        Logger::log('Public delivery of item ' . $target_item["guid"] . ' (' . $target_id . ') to ' . json_encode($rr), Logger::DEBUG);
 
                                        // Ensure that posts with our own protocol arrives before Diaspora posts arrive.
@@ -459,7 +457,10 @@ class Notifier
                                        } else {
                                                $deliver_options = ['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true];
                                        }
-                                       Worker::add($deliver_options, 'Delivery', $cmd, $target_id, (int)$rr['id']);
+
+                                       if (Worker::add($deliver_options, 'Delivery', $cmd, $target_id, (int)$rr['id'])) {
+                                               $delivery_queue_count++;
+                                       }
                                }
                        }
 
@@ -495,8 +496,6 @@ class Notifier
                                continue;
                        }
 
-                       $delivery_queue_count++;
-
                        Logger::log('Delivery of item ' . $target_id . ' to ' . json_encode($contact), Logger::DEBUG);
 
                        // Ensure that posts with our own protocol arrives before Diaspora posts arrive.
@@ -507,7 +506,10 @@ class Notifier
                        } else {
                                $deliver_options = ['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true];
                        }
-                       Worker::add($deliver_options, 'Delivery', $cmd, $target_id, (int)$contact['id']);
+
+                       if (Worker::add($deliver_options, 'Delivery', $cmd, $target_id, (int)$contact['id'])) {
+                               $delivery_queue_count++;
+                       }
                }
                DBA::close($delivery_contacts_stmt);
 
@@ -515,11 +517,11 @@ class Notifier
                // send salmon slaps to mentioned remote tags (@foo@example.com) in OStatus posts
                // They are especially used for notifications to OStatus users that don't follow us.
                if (!Config::get('system', 'dfrn_only') && count($url_recipients) && ($public_message || $push_notify) && !empty($target_item)) {
-                       $delivery_queue_count += count($url_recipients);
                        $slap = OStatus::salmon($target_item, $owner);
                        foreach ($url_recipients as $url) {
                                Logger::log('Salmon delivery of item ' . $target_id . ' to ' . $url);
                                /// @TODO Redeliver/queue these items on failure, though there is no contact record
+                               $delivery_queue_count++;
                                Salmon::slapper($owner, $url, $slap);
                                ItemDeliveryData::incrementQueueDone($target_id, ItemDeliveryData::OSTATUS);
                        }
@@ -677,13 +679,17 @@ class Notifier
                // Fill the item cache
                ActivityPub\Transmitter::createCachedActivityFromItem($target_item['id'], true);
 
+               $delivery_queue_count = 0;
+
                foreach ($inboxes as $inbox) {
                        Logger::info('Delivery via ActivityPub', ['cmd' => $cmd, 'id' => $target_item['id'], 'inbox' => $inbox]);
 
-                       Worker::add(['priority' => $priority, 'created' => $created, 'dont_fork' => true],
-                                       'APDelivery', $cmd, $target_item['id'], $inbox, $uid);
+                       if (Worker::add(['priority' => $priority, 'created' => $created, 'dont_fork' => true],
+                                       'APDelivery', $cmd, $target_item['id'], $inbox, $uid)) {
+                               $delivery_queue_count++;
+                       }
                }
 
-               return count($inboxes);
+               return $delivery_queue_count;
        }
 }