]> git.mxchange.org Git - friendica.git/commitdiff
Improved queue processing
authorMichael <heluecht@pirati.ca>
Sun, 7 Aug 2022 19:24:50 +0000 (19:24 +0000)
committerMichael <heluecht@pirati.ca>
Sun, 7 Aug 2022 19:24:50 +0000 (19:24 +0000)
src/Model/Item.php
src/Model/ItemURI.php
src/Protocol/ActivityPub/Processor.php
src/Protocol/ActivityPub/Queue.php
src/Protocol/ActivityPub/Receiver.php
src/Util/ParseUrl.php

index 3bacd9cc6abf002e873b9a13f664383de5ca0b3d..8566643ed2eb8a4127a681b5428e51dd1fb07e6c 100644 (file)
@@ -905,14 +905,7 @@ class Item
                        'photo' => $item['owner-avatar'], 'network' => $item['network']];
                $item['owner-id'] = ($item['owner-id'] ?? 0) ?: Contact::getIdForURL($item['owner-link'], 0, null, $default);
 
-               $actor = ($item['gravity'] == GRAVITY_PARENT) ? $item['owner-id'] : $item['author-id'];
-               if (!$item['origin'] && ($item['uid'] != 0) && Contact::isSharing($actor, $item['uid'])) {
-                       $item['post-reason'] = self::PR_FOLLOWER;
-               }
-
-               if ($item['origin'] && empty($item['post-reason'])) {
-                       $item['post-reason'] = self::PR_LOCAL;
-               }
+               $item['post-reason'] = self::getPostReason($item);
 
                // Ensure that there is an avatar cache
                Contact::checkAvatarCache($item['author-id']);
@@ -1291,6 +1284,27 @@ class Item
                return $post_user_id;
        }
 
+       /**
+        * Fetch the post reason for a given item array
+        *
+        * @param array $item
+        *
+        * @return integer
+        */
+       public static function getPostReason(array $item): int
+       {
+               $actor = ($item['gravity'] == GRAVITY_PARENT) ? $item['owner-id'] : $item['author-id'];
+               if (empty($item['origin']) && ($item['uid'] != 0) && Contact::isSharing($actor, $item['uid'])) {
+                       return self::PR_FOLLOWER;
+               }
+
+               if (!empty($item['origin']) && empty($item['post-reason'])) {
+                       return self::PR_LOCAL;
+               }
+
+               return $item['post-reason'] ?? self::PR_NONE;
+       }
+
        /**
         * Update the display cache
         *
@@ -1495,12 +1509,13 @@ class Item
 
                $item = array_merge($item, $fields);
 
+               $item['post-reason'] = self::getPostReason($item);
+
                $is_reshare = ($item['gravity'] == GRAVITY_ACTIVITY) && ($item['verb'] == Activity::ANNOUNCE);
 
                if ((($item['gravity'] == GRAVITY_PARENT) || $is_reshare) &&
                        DI::pConfig()->get($uid, 'system', 'accept_only_sharer') == self::COMPLETION_NONE &&
-                       !Contact::isSharingByURL($item['author-link'], $uid) &&
-                       !Contact::isSharingByURL($item['owner-link'], $uid)) {
+                       !in_array($item['post-reason'], [self::PR_FOLLOWER, self::PR_TAG, self::PR_TO, self::PR_CC])) {
                        Logger::info('Contact is not a follower, thread will not be stored', ['author' => $item['author-link'], 'uid' => $uid]);
                        return 0;
                }
index 020c468d23a81700bdd2c31e6db56ec2548bd3f6..86d23ed541afd659f12f68d7760384804f30e8d9 100644 (file)
@@ -60,10 +60,13 @@ class ItemURI
         * Searched for an id of a given uri. Adds it, if not existing yet.
         *
         * @param string $uri
+        * @param bool   $insert
+        *
         * @return integer item-uri id
+        *
         * @throws \Exception
         */
-       public static function getIdByURI(string $uri): int
+       public static function getIdByURI(string $uri, bool $insert = true): int
        {
                if (empty($uri)) {
                        return 0;
@@ -74,12 +77,13 @@ class ItemURI
 
                $itemuri = DBA::selectFirst('item-uri', ['id'], ['uri' => $uri]);
 
-               if (!DBA::isResult($itemuri)) {
+               if (!DBA::isResult($itemuri) && $insert) {
                        return self::insert(['uri' => $uri]);
                }
 
-               return $itemuri['id'];
+               return $itemuri['id'] ?? 0;
        }
+
        /**
         * Searched for an id of a given guid.
         *
index 1f336b214d4dcc0629d437313f21682bca95406c..7b2f2ab0c80bf3ea44818398bec2c6c3afccee0f 100644 (file)
@@ -231,6 +231,7 @@ class Processor
                        Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]);
                        $item = self::createItem($activity, false);
                        if (empty($item)) {
+                               Queue::remove($activity);
                                return;
                        }
 
@@ -243,6 +244,7 @@ class Processor
 
                $item = self::processContent($activity, $item);
                if (empty($item)) {
+                       Queue::remove($activity);
                        return;
                }
 
@@ -301,7 +303,7 @@ class Processor
         */
        public static function createItem(array $activity, bool $fetch_parents): array
        {
-               if (self::isProcessed($activity['id'])) {
+               if (self::isProcessed($activity['id']) && !Post::exists(['uri' => $activity['id']])) {
                        Logger::info('Id is already processed', ['id' => $activity['id']]);
                        return [];
                }
@@ -339,6 +341,7 @@ class Processor
                Logger::debug('Create Item', ['id' => $activity['id'], 'conversation' => $item['conversation'] ?? '']);
                if (empty($activity['author']) && empty($activity['actor'])) {
                        Logger::notice('Missing author and actor. We quit here.', ['activity' => $activity]);
+                       Queue::remove($activity);
                        return [];
                }
 
@@ -361,6 +364,9 @@ class Processor
 
                if (empty($conversation) && empty($activity['directmessage']) && ($item['gravity'] != GRAVITY_PARENT) && !Post::exists(['uri' => $item['thr-parent']])) {
                        Logger::info('Parent not found, message will be discarded.', ['thr-parent' => $item['thr-parent']]);
+                       if (!$fetch_parents) {
+                               Queue::remove($activity);
+                       }
                        return [];
                }
 
@@ -454,6 +460,7 @@ class Processor
                $item = self::processContent($activity, $item);
                if (empty($item)) {
                        Logger::info('Message was not processed');
+                       Queue::remove($activity);
                        return [];
                }
 
@@ -560,16 +567,23 @@ class Processor
                }
 
                // @todo To ensure that the remote system is working correctly, we can check if the "Content-Type" contains JSON
-               if (in_array($curlResult->getReturnCode(), [404])) {
+               if (in_array($curlResult->getReturnCode(), [401, 404])) {
                        return true;
                }
 
-               $object = json_decode($curlResult->getBody(), true);
-               if (!empty($object)) {
-                       $activity = JsonLD::compact($object);
-                       if (JsonLD::fetchElement($activity, '@type') == 'as:Tombstone') {
+               if ($curlResult->isSuccess()) {
+                       $object = json_decode($curlResult->getBody(), true);
+                       if (!empty($object)) {
+                               $activity = JsonLD::compact($object);
+                               if (JsonLD::fetchElement($activity, '@type') == 'as:Tombstone') {
+                                       return true;
+                               }
+                       }
+               } elseif ($curlResult->getReturnCode() == 0) {
+                       $host = parse_url($url, PHP_URL_HOST);
+                       if (!(filter_var($host, FILTER_VALIDATE_IP) || @dns_get_record($host . '.', DNS_A + DNS_AAAA))) {
                                return true;
-                       }                       
+                       }
                }
 
                return false;
@@ -814,7 +828,7 @@ class Processor
                                        Logger::warning('Unknown parent item.', ['uri' => $parent_uri]);
                                        return false;
                                }
-                               if (($item['private'] == Item::PRIVATE) && ($parent['private'] != Item::PRIVATE)) {
+                               if (!empty($activity['type']) && in_array($activity['type'], Receiver::CONTENT_TYPES) && ($item['private'] == Item::PRIVATE) && ($parent['private'] != Item::PRIVATE)) {
                                        Logger::warning('Item is private but the parent is not. Dropping.', ['item-uri' => $item['uri'], 'thr-parent' => $item['thr-parent']]);
                                        return false;
                                }
@@ -944,6 +958,9 @@ class Processor
 
                if (!self::isSolicitedMessage($activity, $item)) {
                        DBA::delete('item-uri', ['id' => $item['uri-id']]);
+                       if (!empty($activity['entry-id'])) {
+                               Queue::deleteById($activity['entry-id']);
+                       }
                        return;
                }
 
@@ -981,12 +998,16 @@ class Processor
                                        $item['post-reason'] = Item::PR_NONE;
                        }
 
-                       if (!empty($activity['from-relay'])) {
-                               $item['post-reason'] = Item::PR_RELAY;
-                       } elseif (!empty($activity['thread-completion'])) {
-                               $item['post-reason'] = Item::PR_FETCHED;
-                       } elseif (in_array($item['post-reason'], [Item::PR_GLOBAL, Item::PR_NONE]) && !empty($activity['push'])) {
-                               $item['post-reason'] = Item::PR_PUSHED;
+                       $item['post-reason'] = Item::getPostReason($item);
+
+                       if (in_array($item['post-reason'], [Item::PR_GLOBAL, Item::PR_NONE])) {
+                               if (!empty($activity['from-relay'])) {
+                                       $item['post-reason'] = Item::PR_RELAY;
+                               } elseif (!empty($activity['thread-completion'])) {
+                                       $item['post-reason'] = Item::PR_FETCHED;
+                               } elseif (!empty($activity['push'])) {
+                                       $item['post-reason'] = Item::PR_PUSHED;
+                               }
                        }
 
                        if ($item['isForum'] ?? false) {
@@ -1004,41 +1025,31 @@ class Processor
                                continue;
                        }
 
-                       if (!($item['isForum'] ?? false) && ($receiver != 0) && ($item['gravity'] == GRAVITY_PARENT) && !Contact::isSharingByURL($activity['author'], $receiver)) {
-                               if ($item['post-reason'] == Item::PR_BCC) {
-                                       Logger::info('Top level post via BCC from a non sharer, ignoring', ['uid' => $receiver, 'contact' => $item['contact-id']]);
-                                       continue;
-                               }
+                       if (($receiver != 0) && ($item['gravity'] == GRAVITY_PARENT) && !in_array($item['post-reason'], [Item::PR_FOLLOWER, Item::PR_TAG, item::PR_TO, Item::PR_CC])) {
+                               if (!($item['isForum'] ?? false)) {
+                                       if ($item['post-reason'] == Item::PR_BCC) {
+                                               Logger::info('Top level post via BCC from a non sharer, ignoring', ['uid' => $receiver, 'contact' => $item['contact-id'], 'url' => $item['uri']]);
+                                               continue;
+                                       }
 
-                               if (
-                                       !empty($activity['thread-children-type'])
-                                       && in_array($activity['thread-children-type'], Receiver::ACTIVITY_TYPES)
-                                       && DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') != Item::COMPLETION_LIKE
-                               ) {
-                                       Logger::info('Top level post from thread completion from a non sharer had been initiated via an activity, ignoring',
-                                               ['type' => $activity['thread-children-type'], 'user' => $item['uid'], 'causer' => $item['causer-link'], 'author' => $activity['author'], 'url' => $item['uri']]);
-                                       continue;
+                                       if ((DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') != Item::COMPLETION_LIKE)
+                                               && in_array($activity['thread-children-type'] ?? '', Receiver::ACTIVITY_TYPES)) {
+                                               Logger::info('Top level post from thread completion from a non sharer had been initiated via an activity, ignoring',
+                                                       ['type' => $activity['thread-children-type'], 'user' => $item['uid'], 'causer' => $item['causer-link'], 'author' => $activity['author'], 'url' => $item['uri']]);
+                                               continue;
+                                       }
                                }
-                       }
-
-                       $is_forum = false;
 
-                       if ($receiver != 0) {
+                               $is_forum = false;
                                $user = User::getById($receiver, ['account-type']);
                                if (!empty($user['account-type'])) {
                                        $is_forum = ($user['account-type'] == User::ACCOUNT_TYPE_COMMUNITY);
                                }
-                       }
-
-                       if (!$is_forum && DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') == Item::COMPLETION_NONE && ($receiver != 0) && ($item['gravity'] == GRAVITY_PARENT)) {
-                               $skip = !Contact::isSharingByURL($activity['author'], $receiver);
-
-                               if ($skip && (($activity['type'] == 'as:Announce') || ($item['isForum'] ?? false))) {
-                                       $skip = !Contact::isSharingByURL($activity['actor'], $receiver);
-                               }
 
-                               if ($skip) {
-                                       Logger::info('Skipping post', ['uid' => $receiver, 'url' => $item['uri']]);
+                               if ((DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') == Item::COMPLETION_NONE)
+                                       && ((!$is_forum && !($item['isForum'] ?? false) && ($activity['type'] != 'as:Announce'))
+                                       || !Contact::isSharingByURL($activity['actor'], $receiver))) {
+                                       Logger::info('Actor is a non sharer, is no forum or it is no announce', ['uid' => $receiver, 'actor' => $activity['actor'], 'url' => $item['uri'], 'type' => $activity['type']]);
                                        continue;
                                }
 
@@ -1333,13 +1344,7 @@ class Processor
         */
        public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string
        {
-               if (!empty($child['receiver'])) {
-                       $uid = ActivityPub\Receiver::getFirstUserFromReceivers($child['receiver']);
-               } else {
-                       $uid = 0;
-               }
-
-               $object = self::fetchCachedActivity($url, $uid);
+               $object = self::fetchCachedActivity($url, 0);
                if (empty($object)) {
                        return '';
                }
@@ -1352,7 +1357,7 @@ class Processor
                                $compacted = JsonLD::compact($object);
                                $attributed_to = JsonLD::fetchElement($compacted, 'as:attributedTo', '@id');
                        }
-                       $signer[] = $attributed_to;     
+                       $signer[] = $attributed_to;
                }
 
                if (!empty($object['actor'])) {
@@ -1407,8 +1412,12 @@ class Processor
                        $ldactivity['completion-mode']   = $completion;
                }
 
-               if (!empty($child['type'])) {
+               if (!empty($child['thread-children-type'])) {
+                       $ldactivity['thread-children-type'] = $child['thread-children-type'];
+               } elseif (!empty($child['type'])) {
                        $ldactivity['thread-children-type'] = $child['type'];
+               } else {
+                       $ldactivity['thread-children-type'] = 'as:Create';
                }
 
                if (!empty($relay_actor) && !self::acceptIncomingMessage($ldactivity, $object['id'])) {
@@ -1417,7 +1426,7 @@ class Processor
 
                if (($completion == Receiver::COMPLETION_RELAY) && Queue::exists($url, 'as:Create')) {
                        Logger::notice('Activity has already been queued.', ['url' => $url, 'object' => $activity['id']]);
-               } elseif (ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer, '', $completion)) {
+               } elseif (ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), 0, true, false, $signer, '', $completion)) {
                        Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]);
                } else {
                        Logger::notice('Activity had been fetched and will be processed later.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]);
index df4236b5bdbdf97df0db57e50810124ff2af1b26..a95226d10d81783d5d1874d85f7e617eb669ceca 100644 (file)
@@ -26,6 +26,7 @@ use Friendica\Core\System;
 use Friendica\Database\Database;
 use Friendica\Database\DBA;
 use Friendica\DI;
+use Friendica\Model\ItemURI;
 use Friendica\Model\Post;
 use Friendica\Util\DateTimeFormat;
 use Friendica\Util\JsonLD;
@@ -140,7 +141,15 @@ class Queue
                        return;
                }
 
+               Logger::debug('Delete inbox-entry', ['id' => $entry['id']]);
+
                DBA::delete('inbox-entry', ['id' => $entry['id']]);
+
+               $children = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $entry['object-id']]);
+               while ($child = DBA::fetch($children)) {
+                       self::deleteById($child['id']);
+               }
+               DBA::close($children);
        }
 
        /**
@@ -188,6 +197,11 @@ class Queue
                        return false;
                }
 
+               if (!self::isProcessable($id)) {
+                       Logger::debug('Other queue entries need to be processed first.', ['id' => $id]);
+                       return false;
+               }
+
                if (!empty($entry['wid'])) {
                        $worker = DI::app()->getQueue();
                        $wid = $worker['id'] ?? 0;
@@ -210,6 +224,10 @@ class Queue
                $activity['worker-id']       = $entry['wid'];
                $activity['recursion-depth'] = 0;
 
+               if (empty($activity['thread-children-type'])) {
+                       $activity['thread-children-type'] = $type;
+               }
+
                $receivers = DBA::select('inbox-entry-receiver', ['uid'], ["`queue-id` = ? AND `uid` != ?", $entry['id'], 0]);
                while ($receiver = DBA::fetch($receivers)) {
                        if (!in_array($receiver['uid'], $activity['receiver'])) {
@@ -248,6 +266,34 @@ class Queue
                DBA::close($entries);
        }
 
+       public static function isProcessable(int $id): bool
+       {
+               $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
+               if (empty($entry)) {
+                       return false;
+               }
+
+               if (!empty($entry['object-id']) && Post::exists(['uri' => $entry['object-id']])) {
+                       // The object already exists, so processing can be done
+                       return true;
+               }
+
+               if (!empty($entry['conversation'])) {
+                       $conv_id = ItemURI::getIdByURI($entry['conversation'], false);
+                       if (DBA::exists('post-thread', ['conversation-id' => $conv_id])) {
+                               // We have got the conversation in the system, so the post can be processed
+                               return true;
+                       }
+               }
+
+               if (!empty($entry['object-id']) && !empty($entry['in-reply-to-id']) && ($entry['object-id'] != $entry['in-reply-to-id']) && DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) {
+                       // This entry belongs to some other entry that should be processed first
+                       return false;
+               }
+
+               return true;
+       }
+
        /**
         * Clear old activities
         *
@@ -257,7 +303,11 @@ class Queue
        {
                // We delete all entries that aren't associated with a worker entry after seven days.
                // The other entries are deleted when the worker deferred for too long.
-               DBA::delete('inbox-entry', ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]);
+               $entries = DBA::select('inbox-entry', ['id'], ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]);
+               while ($entry = DBA::fetch($entries)) {
+                       self::deleteById($entry['id']);
+               }
+               DBA::close($entries);
 
                // Optimizing this table only last seconds
                if (DI::config()->get('system', 'optimize_tables')) {
index e8267b0eaa903becf68397a1f2aa6d07321699e3..b1ed57b26ae3eaf3ba0e4f3be24dd8639daf6889 100644 (file)
@@ -598,7 +598,7 @@ class Receiver
                                return true;
                        }
                }
-       
+
                if (DI::config()->get('system', 'decoupled_receiver') && ($trust_source || DI::config()->get('debug', 'ap_inbox_store_untrusted'))) {
                        $object_data = Queue::add($object_data, $type, $uid, $http_signer, $push, $trust_source);
                }
@@ -609,11 +609,15 @@ class Receiver
                }
 
                if (!empty($object_data['entry-id']) && DI::config()->get('system', 'decoupled_receiver') && ($push || ($completion == self::COMPLETION_RELAY))) {
-                       // We delay by 5 seconds to allow to accumulate all receivers
-                       $delayed = date(DateTimeFormat::MYSQL, time() + 5);
-                       Logger::debug('Initiate processing', ['id' => $object_data['entry-id'], 'uri' => $object_data['object_id']]);
-                       $wid = Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']);
-                       Queue::setWorkerId($object_data['entry-id'], $wid);
+                       if (Queue::isProcessable($object_data['entry-id'])) {
+                               // We delay by 5 seconds to allow to accumulate all receivers
+                               $delayed = date(DateTimeFormat::MYSQL, time() + 5);
+                               Logger::debug('Initiate processing', ['id' => $object_data['entry-id'], 'uri' => $object_data['object_id']]);
+                               $wid = Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']);
+                               Queue::setWorkerId($object_data['entry-id'], $wid);
+                       } else {
+                               Logger::debug('Other queue entries need to be processed first.', ['id' => $object_data['entry-id']]);
+                       }
                        return false;
                }
 
@@ -687,13 +691,18 @@ class Receiver
                                        $object_data['thread-completion'] = Contact::getIdForURL($actor);
                                        $object_data['completion-mode']   = self::COMPLETION_ANNOUCE;
 
-                                       $item = ActivityPub\Processor::createItem($object_data, $fetch_parents);
-                                       if (empty($item)) {
-                                               return false;
-                                       }
+                                       if (!Post::exists(['uri' => $object_data['id'], 'uid' => 0])) {
+                                               $item = ActivityPub\Processor::createItem($object_data, $fetch_parents);
+                                               if (empty($item)) {
+                                                       return false;
+                                               }
 
-                                       $item['post-reason'] = Item::PR_ANNOUNCEMENT;
-                                       ActivityPub\Processor::postItem($object_data, $item);
+                                               $item['post-reason'] = Item::PR_ANNOUNCEMENT;
+                                               ActivityPub\Processor::postItem($object_data, $item);
+                                       } else {
+                                               Logger::info('Announced id already exists', ['id' => $object_data['id']]);
+                                               Queue::remove($object_data);
+                                       }
 
                                        if (!empty($activity)) {
                                                $announce_object_data = self::processObject($activity);
index 0183d6b14ae5be6785f90eaf5c16af01e6974de9..1fba29ec3bfcefb4491b4735bccf99774666ea27 100644 (file)
@@ -315,6 +315,10 @@ class ParseUrl
 
                $body = mb_convert_encoding($body, 'HTML-ENTITIES', 'UTF-8');
 
+               if (empty($body)) {
+                       return $siteinfo;
+               }
+
                $doc = new DOMDocument();
                @$doc->loadHTML($body);