-- ------------------------------------------
-- Friendica 2022.09-dev (Giant Rhubarb)
--- DB_UPDATE_VERSION 1477
+-- DB_UPDATE_VERSION 1478
-- ------------------------------------------
FOREIGN KEY (`uri-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE
) DEFAULT COLLATE utf8mb4_general_ci COMMENT='User specific public contact data';
+--
+-- TABLE arrived-activity
+--
+CREATE TABLE IF NOT EXISTS `arrived-activity` (
+ `object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity',
+ `received` datetime COMMENT 'Receiving date',
+ PRIMARY KEY(`object-id`)
+) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of arrived activities';
+
+--
+-- TABLE processed-activity
+--
+CREATE TABLE IF NOT EXISTS `processed-activity` (
+ `object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity',
+ `received` datetime COMMENT 'Receiving date',
+ PRIMARY KEY(`object-id`)
+) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of processed activities';
+
--
-- TABLE worker-ipc
--
| [application](help/database/db_application) | OAuth application |
| [application-marker](help/database/db_application-marker) | Timeline marker |
| [application-token](help/database/db_application-token) | OAuth user token |
+| [arrived-activity](help/database/db_arrived-activity) | Id of arrived activities |
| [attach](help/database/db_attach) | file attachments |
| [cache](help/database/db_cache) | Stores temporary data |
| [config](help/database/db_config) | main configuration storage |
| [post-user](help/database/db_post-user) | User specific post data |
| [post-user-notification](help/database/db_post-user-notification) | User post notifications |
| [process](help/database/db_process) | Currently running system processes |
+| [processed-activity](help/database/db_processed-activity) | Id of processed activities |
| [profile](help/database/db_profile) | user profiles data |
| [profile_field](help/database/db_profile_field) | Custom profile fields |
| [push_subscriber](help/database/db_push_subscriber) | Used for OStatus: Contains feed subscribers |
--- /dev/null
+Table arrived-activity
+===========
+
+Id of arrived activities
+
+Fields
+------
+
+| Field | Description | Type | Null | Key | Default | Extra |
+| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- |
+| object-id | object id of the incoming activity | varbinary(255) | NO | PRI | NULL | |
+| received | Receiving date | datetime | YES | | NULL | |
+
+Indexes
+------------
+
+| Name | Fields |
+| ------- | --------- |
+| PRIMARY | object-id |
+
+
+Return to [database documentation](help/database)
--- /dev/null
+Table processed-activity
+===========
+
+Id of processed activities
+
+Fields
+------
+
+| Field | Description | Type | Null | Key | Default | Extra |
+| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- |
+| object-id | object id of the incoming activity | varbinary(255) | NO | PRI | NULL | |
+| received | Receiving date | datetime | YES | | NULL | |
+
+Indexes
+------------
+
+| Name | Fields |
+| ------- | --------- |
+| PRIMARY | object-id |
+
+
+Return to [database documentation](help/database)
namespace Friendica\Module\Debug;
use Friendica\BaseModule;
-use Friendica\Content\Text;
-use Friendica\Core\Logger;
use Friendica\Core\Renderer;
use Friendica\DI;
-use Friendica\Model\Item;
-use Friendica\Model\Tag;
use Friendica\Protocol\ActivityPub;
use Friendica\Util\JsonLD;
-use Friendica\Util\XML;
class ActivityPubConversion extends BaseModule
{
'content' => visible_whitespace(var_export($object_data, true))
];
- $item = ActivityPub\Processor::createItem($object_data);
+ $item = ActivityPub\Processor::createItem($object_data, true);
$results[] = [
'title' => DI::l10n()->t('Result Item'),
const CACHEKEY_FETCH_ACTIVITY = 'processor:fetchMissingActivity:';
const CACHEKEY_JUST_FETCHED = 'processor:isJustFetched:';
- static $processed = [];
-
/**
- * Add an activity id to the list of processed ids
+ * Add an object id to the list of processed ids
*
* @param string $id
*
* @return void
*/
- public static function addActivityId(string $id)
+ private static function addActivityId(string $id)
{
- self::$processed[] = $id;
- if (count(self::$processed) > 100) {
- self::$processed = array_slice(self::$processed, 1);
- }
+ DBA::delete('processed-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]);
+ DBA::insert('processed-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]);
}
/**
- * Checks if the given has just been processed
+ * Checks if the given object id has just been processed
*
* @param string $id
*
* @return boolean
*/
- public static function isProcessed(string $id): bool
+ private static function isProcessed(string $id): bool
{
- return in_array($id, self::$processed);
+ return DBA::exists('processed-activity', ['object-id' => $id]);
}
/**
$item = Post::selectFirst(['uri', 'uri-id', 'thr-parent', 'gravity', 'post-type'], ['uri' => $activity['id']]);
if (!DBA::isResult($item)) {
Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]);
- $item = self::createItem($activity);
+ $item = self::createItem($activity, false);
if (empty($item)) {
return;
}
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException
*/
- public static function createItem(array $activity, bool $fetch_parents = true): array
+ public static function createItem(array $activity, bool $fetch_parents): array
{
if (self::isProcessed($activity['id'])) {
Logger::info('Id is already processed', ['id' => $activity['id']]);
$item['object-type'] = Activity\ObjectType::COMMENT;
}
- if (!empty($activity['context'])) {
- $item['conversation'] = $activity['context'];
- } elseif (!empty($activity['conversation'])) {
+ if (!empty($activity['conversation'])) {
$item['conversation'] = $activity['conversation'];
+ } elseif (!empty($activity['context'])) {
+ $item['conversation'] = $activity['context'];
}
if (!empty($item['conversation'])) {
$conversation = [];
}
+ Logger::debug('Create Item', ['id' => $activity['id'], 'conversation' => $item['conversation'] ?? '']);
if (empty($activity['author']) && empty($activity['actor'])) {
Logger::notice('Missing author and actor. We quit here.', ['activity' => $activity]);
return [];
*/
private static function fetchParent(array $activity): string
{
- if (self::hasJustBeenFetched($activity['reply-to-id'])) {
- Logger::notice('We just have tried to fetch this activity. We don\'t try it again.', ['parent' => $activity['reply-to-id']]);
- return '';
- }
-
$recursion_depth = $activity['recursion-depth'] ?? 0;
if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) {
return '';
}
- /**
- * Check if a given activity has recently been fetched
- *
- * @param string $url
- * @return boolean
- */
- private static function hasJustBeenFetched(string $url): bool
- {
- $cachekey = self::CACHEKEY_JUST_FETCHED . $url;
- $time = DI::cache()->get($cachekey);
- if (is_null($time)) {
- DI::cache()->set($cachekey, time(), Duration::FIVE_MINUTES);
- return false;
- }
- return ($time + 300) > time();
- }
-
/**
* Check if a given activity is no longer available
*
public static function createActivity(array $activity, string $verb)
{
$activity['reply-to-id'] = $activity['object_id'];
- $item = self::createItem($activity);
+ $item = self::createItem($activity, false);
if (empty($item)) {
return;
}
$ldactivity = JsonLD::compact($activity);
- $ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 1;
+ $ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 0;
if (!empty($relay_actor)) {
$ldactivity['thread-completion'] = $ldactivity['from-relay'] = Contact::getIdForURL($relay_actor);
$object_data['object_activity'] = $activity;
}
- if (($type == 'as:Create') && Queue::exists($object_data['object_id'], $type)) {
- Logger::info('The activity is already added.', ['id' => $object_data['object_id']]);
- return true;
- }
+ if (($type == 'as:Create') && $trust_source) {
+ if (self::hasArrived($object_data['object_id'])) {
+ Logger::info('The activity already arrived.', ['id' => $object_data['object_id']]);
+ return true;
+ }
+ self::addArrivedId($object_data['object_id']);
+ if (Queue::exists($object_data['object_id'], $type)) {
+ Logger::info('The activity is already added.', ['id' => $object_data['object_id']]);
+ return true;
+ }
+ }
+
if (DI::config()->get('system', 'decoupled_receiver') && ($trust_source || DI::config()->get('debug', 'ap_inbox_store_untrusted'))) {
$object_data = Queue::add($object_data, $type, $uid, $http_signer, $push, $trust_source);
}
return $object_data;
}
+
+ /**
+ * Add an object id to the list of arrived activities
+ *
+ * @param string $id
+ *
+ * @return void
+ */
+ private static function addArrivedId(string $id)
+ {
+ DBA::delete('arrived-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]);
+ DBA::insert('arrived-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]);
+ }
+
+ /**
+ * Checks if the given object already arrived before
+ *
+ * @param string $id
+ *
+ * @return boolean
+ */
+ private static function hasArrived(string $id): bool
+ {
+ return DBA::exists('arrived-activity', ['object-id' => $id]);
+ }
}
use Friendica\Database\DBA;
if (!defined('DB_UPDATE_VERSION')) {
- define('DB_UPDATE_VERSION', 1477);
+ define('DB_UPDATE_VERSION', 1478);
}
return [
"uri-id_uid" => ["UNIQUE", "uri-id", "uid"],
]
],
+ "arrived-activity" => [
+ "comment" => "Id of arrived activities",
+ "fields" => [
+ "object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"],
+ "received" => ["type" => "datetime", "comment" => "Receiving date"],
+ ],
+ "indexes" => [
+ "PRIMARY" => ["object-id"],
+ ],
+ "engine" => "MEMORY",
+ ],
+ "processed-activity" => [
+ "comment" => "Id of processed activities",
+ "fields" => [
+ "object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"],
+ "received" => ["type" => "datetime", "comment" => "Receiving date"],
+ ],
+ "indexes" => [
+ "PRIMARY" => ["object-id"],
+ ],
+ "engine" => "MEMORY",
+ ],
"worker-ipc" => [
"comment" => "Inter process communication between the frontend and the worker",
"fields" => [