return is_numeric($hookData['item_id']) ? $hookData['item_id'] : 0;
}
- $fetchQueue = new ActivityPub\FetchQueue();
- $fetched_uri = ActivityPub\Processor::fetchMissingActivity($fetchQueue, $uri);
- $fetchQueue->process();
+ $fetched_uri = ActivityPub\Processor::fetchMissingActivity($uri);
if ($fetched_uri) {
$item_id = self::searchByLink($fetched_uri, $uid);
'content' => visible_whitespace(var_export($object_data, true))
];
- $item = ActivityPub\Processor::createItem(new ActivityPub\FetchQueue(), $object_data);
+ $item = ActivityPub\Processor::createItem($object_data);
$results[] = [
'title' => DI::l10n()->t('Result Item'),
use Friendica\Core\Protocol;
use Friendica\Model\APContact;
use Friendica\Model\User;
-use Friendica\Protocol\ActivityPub\FetchQueue;
use Friendica\Util\HTTPSignature;
use Friendica\Util\JsonLD;
$items = [];
}
- $fetchQueue = new FetchQueue();
-
foreach ($items as $activity) {
$ldactivity = JsonLD::compact($activity);
- ActivityPub\Receiver::processActivity($fetchQueue, $ldactivity, '', $uid, true);
+ ActivityPub\Receiver::processActivity($ldactivity, '', $uid, true);
}
-
- $fetchQueue->process();
}
/**
+++ /dev/null
-<?php
-/**
- * @copyright Copyright (C) 2010-2022, the Friendica project
- *
- * @license GNU AGPL version 3 or any later version
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <https://www.gnu.org/licenses/>.
- *
- */
-
-namespace Friendica\Protocol\ActivityPub;
-
-/**
- * This class prevents maximum function nesting errors by flattening recursive calls to Processor::fetchMissingActivity
- */
-class FetchQueue
-{
- /** @var FetchQueueItem[] */
- protected $queue = [];
-
- public function push(FetchQueueItem $item)
- {
- array_push($this->queue, $item);
- }
-
- /**
- * Processes missing activities one by one. It is possible that a processing call will add additional missing
- * activities, they will be processed in subsequent iterations of the loop.
- *
- * Since this process is self-contained, it isn't suitable to retrieve the URI of a single activity.
- *
- * The simplest way to get the URI of the first activity and ensures all the parents are fetched is this way:
- *
- * $fetchQueue = new ActivityPub\FetchQueue();
- * $fetchedUri = ActivityPub\Processor::fetchMissingActivity($fetchQueue, $activityUri);
- * $fetchQueue->process();
- */
- public function process()
- {
- while (count($this->queue)) {
- $fetchQueueItem = array_pop($this->queue);
-
- call_user_func_array([Processor::class, 'fetchMissingActivity'], array_merge([$this], $fetchQueueItem->toParameters()));
- }
- }
-}
+++ /dev/null
-<?php
-/**
- * @copyright Copyright (C) 2010-2022, the Friendica project
- *
- * @license GNU AGPL version 3 or any later version
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <https://www.gnu.org/licenses/>.
- *
- */
-
-namespace Friendica\Protocol\ActivityPub;
-
-class FetchQueueItem
-{
- /** @var string */
- private $url;
- /** @var array */
- private $child;
- /** @var string */
- private $relay_actor;
- /** @var int */
- private $completion;
-
- /**
- * This constructor matches the signature of Processor::fetchMissingActivity except for the default $completion value
- *
- * @param string $url
- * @param array $child
- * @param string $relay_actor
- * @param int $completion
- */
- public function __construct(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_AUTO)
- {
- $this->url = $url;
- $this->child = $child;
- $this->relay_actor = $relay_actor;
- $this->completion = $completion;
- }
-
- /**
- * Array meant to be used in call_user_function_array([Processor::class, 'fetchMissingActivity']). Caller needs to
- * provide an instance of a FetchQueue that isn't included in these parameters.
- *
- * @see FetchQueue::process()
- * @return array
- */
- public function toParameters(): array
- {
- return [$this->url, $this->child, $this->relay_actor, $this->completion];
- }
-}
/**
* Updates a message
*
- * @param FetchQueue $fetchQueue
* @param array $activity Activity array
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException
*/
- public static function updateItem(FetchQueue $fetchQueue, array $activity)
+ public static function updateItem(array $activity)
{
$item = Post::selectFirst(['uri', 'uri-id', 'thr-parent', 'gravity', 'post-type'], ['uri' => $activity['id']]);
if (!DBA::isResult($item)) {
Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]);
- $item = self::createItem($fetchQueue, $activity);
+ $item = self::createItem($activity);
if (empty($item)) {
return;
}
Post\History::add($item['uri-id'], $item);
Item::update($item, ['uri' => $activity['id']]);
- Receiver::removeFromQueue($activity);
+ Queue::remove($activity);
if ($activity['object_type'] == 'as:Event') {
$posts = Post::select(['event-id', 'uid'], ["`uri` = ? AND `event-id` > ?", $activity['id'], 0]);
/**
* Prepares data for a message
*
- * @param FetchQueue $fetchQueue
* @param array $activity Activity array
* @return array Internal item
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException
*/
- public static function createItem(FetchQueue $fetchQueue, array $activity): array
+ public static function createItem(array $activity): array
{
$item = [];
$item['verb'] = Activity::POST;
}
if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
- Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id']]);
- /**
- * Instead of calling recursively self::fetchMissingActivity which can hit PHP's default function nesting
- * limit of 256 recursive calls, we push the parent activity fetch parameters in this queue. The initial
- * caller is responsible for processing the remaining queue once the original activity has been processed.
- */
- $fetchQueue->push(new FetchQueueItem($activity['reply-to-id'], $activity));
+ $recursion_depth = $activity['recursion-depth'] ?? 0;
+ Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
+ if ($recursion_depth < 10) {
+ self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
+ } else {
+ Logger::notice('Recursion level is too high, fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
+ Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
+ return [];
+ }
}
$item['diaspora_signed_text'] = $activity['diaspora:comment'] ?? '';
Logger::info('Deleting item', ['object' => $activity['object_id'], 'owner' => $owner]);
Item::markForDeletion(['uri' => $activity['object_id'], 'owner-id' => $owner]);
- Receiver::removeFromQueue($activity);
+ Queue::remove($activity);
}
/**
/**
* Prepare the item array for an activity
*
- * @param FetchQueue $fetchQueue
* @param array $activity Activity array
* @param string $verb Activity verb
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException
*/
- public static function createActivity(FetchQueue $fetchQueue, array $activity, string $verb)
+ public static function createActivity(array $activity, string $verb)
{
- $item = self::createItem($fetchQueue, $activity);
+ $item = self::createItem($activity);
if (empty($item)) {
return;
}
Logger::debug('Add post to featured collection', ['uri-id' => $uriid]);
Post\Collection::add($uriid, Post\Collection::FEATURED);
- Receiver::removeFromQueue($activity);
+ Queue::remove($activity);
}
/**
Logger::debug('Remove post from featured collection', ['uri-id' => $uriid]);
Post\Collection::remove($uriid, Post\Collection::FEATURED);
- Receiver::removeFromQueue($activity);
+ Queue::remove($activity);
}
/**
$item['body'] = Item::improveSharedDataInBody($item);
} else {
if (empty($activity['directmessage']) && ($item['thr-parent'] != $item['uri']) && ($item['gravity'] == GRAVITY_COMMENT)) {
- $item_private = !in_array(0, $activity['item_receiver']);
$parent = Post::selectFirst(['id', 'uri-id', 'private', 'author-link', 'alias'], ['uri' => $item['thr-parent']]);
if (!DBA::isResult($parent)) {
Logger::warning('Unknown parent item.', ['uri' => $item['thr-parent']]);
return false;
}
- if ($item_private && ($parent['private'] != Item::PRIVATE)) {
+ if (($parent['private'] == Item::PRIVATE) && ($parent['private'] != Item::PRIVATE)) {
Logger::warning('Item is private but the parent is not. Dropping.', ['item-uri' => $item['uri'], 'thr-parent' => $item['thr-parent']]);
return false;
}
}
$stored = false;
+ $success = false;
ksort($activity['receiver']);
if (!self::isSolicitedMessage($activity, $item)) {
$item_id = Item::insert($item);
if ($item_id) {
Logger::info('Item insertion successful', ['user' => $item['uid'], 'item_id' => $item_id]);
- Receiver::removeFromQueue($activity);
+ $success = true;
} else {
Logger::notice('Item insertion aborted', ['user' => $item['uid']]);
}
}
}
+ if ($success) {
+ Queue::remove($activity);
+ Queue::processReplyByUri($item['uri']);
+ }
+
// Store send a follow request for every reshare - but only when the item had been stored
if ($stored && ($item['private'] != Item::PRIVATE) && ($item['gravity'] == GRAVITY_PARENT) && !empty($item['author-link']) && ($item['author-link'] != $item['owner-link'])) {
$author = APContact::getByURL($item['owner-link'], false);
/**
* Fetches missing posts
*
- * @param FetchQueue $fetchQueue
* @param string $url message URL
* @param array $child activity array with the child of this message
* @param string $relay_actor Relay actor
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException
*/
- public static function fetchMissingActivity(FetchQueue $fetchQueue, string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string
+ public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string
{
if (!empty($child['receiver'])) {
$uid = ActivityPub\Receiver::getFirstUserFromReceivers($child['receiver']);
$object = ActivityPub::fetchContent($url, $uid);
if (empty($object)) {
- Logger::notice('Activity was not fetchable, aborting.', ['url' => $url]);
+ Logger::notice('Activity was not fetchable, aborting.', ['url' => $url, 'uid' => $uid]);
return '';
}
$ldactivity = JsonLD::compact($activity);
+ $ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 1;
+
if (!empty($relay_actor)) {
$ldactivity['thread-completion'] = $ldactivity['from-relay'] = Contact::getIdForURL($relay_actor);
$ldactivity['completion-mode'] = Receiver::COMPLETION_RELAY;
return '';
}
- ActivityPub\Receiver::processActivity($fetchQueue, $ldactivity, json_encode($activity), $uid, true, false, $signer);
+ ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer);
Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'object' => $activity['id']]);
Logger::info('Updating profile', ['object' => $activity['object_id']]);
Contact::updateFromProbeByURL($activity['object_id']);
- Receiver::removeFromQueue($activity);
+ Queue::remove($activity);
}
/**
DBA::close($contacts);
Logger::info('Deleted contact', ['object' => $activity['object_id']]);
- Receiver::removeFromQueue($activity);
+ Queue::remove($activity);
}
/**
$condition = ['id' => $cid];
Contact::update($fields, $condition);
Logger::info('Accept contact request', ['contact' => $cid, 'user' => $uid]);
- Receiver::removeFromQueue($activity);
+ Queue::remove($activity);
}
/**
} else {
Logger::info('Rejected contact request', ['contact' => $cid, 'user' => $uid]);
}
- Receiver::removeFromQueue($activity);
+ Queue::remove($activity);
}
/**
}
Item::markForDeletion(['uri' => $activity['object_id'], 'author-id' => $author_id, 'gravity' => GRAVITY_ACTIVITY]);
- Receiver::removeFromQueue($activity);
+ Queue::remove($activity);
}
/**
Contact::removeFollower($contact);
Logger::info('Undo following request', ['contact' => $cid, 'user' => $uid]);
- Receiver::removeFromQueue($activity);
+ Queue::remove($activity);
}
/**
--- /dev/null
+<?php
+/**
+ * @copyright Copyright (C) 2010-2022, the Friendica project
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace Friendica\Protocol\ActivityPub;
+
+use Friendica\Core\Logger;
+use Friendica\Database\Database;
+use Friendica\Database\DBA;
+use Friendica\Util\DateTimeFormat;
+
+/**
+ * This class handles the processing of incoming posts
+ */
+class Queue
+{
+ public static function add(array $activity, string $type, int $uid, string $http_signer, bool $push): array
+ {
+ $fields = [
+ 'activity-id' => $activity['id'],
+ 'object-id' => $activity['object_id'],
+ 'type' => $type,
+ 'object-type' => $activity['object_type'],
+ 'activity' => json_encode($activity, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),
+ 'received' => DateTimeFormat::utcNow(),
+ 'push' => $push,
+ ];
+
+ if (!empty($activity['reply-to-id'])) {
+ $fields['in-reply-to-id'] = $activity['reply-to-id'];
+ }
+
+ if (!empty($activity['object_object_type'])) {
+ $fields['object-object-type'] = $activity['object_object_type'];
+ }
+
+ if (!empty($http_signer)) {
+ $fields['signer'] = $http_signer;
+ }
+
+ DBA::insert('inbox-entry', $fields, Database::INSERT_IGNORE);
+
+ $queue = DBA::selectFirst('inbox-entry', ['id'], ['activity-id' => $activity['id']]);
+ if (!empty($queue['id'])) {
+ $activity['entry-id'] = $queue['id'];
+ DBA::insert('inbox-entry-receiver', ['queue-id' => $queue['id'], 'uid' => $uid], Database::INSERT_IGNORE);
+ }
+ return $activity;
+ }
+
+ public static function remove(array $activity = [])
+ {
+ if (empty($activity['entry-id'])) {
+ return;
+ }
+ DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
+ //echo "Delete ".$activity['entry-id']."\n";
+
+ }
+
+ public static function process(int $id)
+ {
+ $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
+ if (empty($entry)) {
+ return;
+ }
+
+ Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]);
+
+ $activity = json_decode($entry['activity'], true);
+ $type = $entry['type'];
+ $push = $entry['push'];
+
+ $activity['entry-id'] = $entry['id'];
+
+ if (!Receiver::routeActivities($activity, $type, $push)) {
+ self::remove($activity);
+ }
+ }
+
+ public static function processAll()
+ {
+ $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type'], [], ['order' => ['id' => true]]);
+ while ($entry = DBA::fetch($entries)) {
+ echo $entry['id'] . "\t" . $entry['type'] . "\t" . $entry['object-type'] . "\n";
+ self::process($entry['id']);
+ }
+ }
+
+ public static function processReplyByUri(string $uri)
+ {
+ $entries = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $uri], ['order' => ['id' => true]]);
+ while ($entry = DBA::fetch($entries)) {
+ self::process($entry['id']);
+ }
+ }
+}
use Friendica\Core\Logger;
use Friendica\Core\Protocol;
use Friendica\Core\System;
-use Friendica\Database\Database;
use Friendica\DI;
use Friendica\Model\Contact;
use Friendica\Model\APContact;
use Friendica\Model\User;
use Friendica\Protocol\Activity;
use Friendica\Protocol\ActivityPub;
-use Friendica\Util\DateTimeFormat;
use Friendica\Util\HTTPSignature;
use Friendica\Util\JsonLD;
use Friendica\Util\LDSignature;
$trust_source = false;
}
- $fetchQueue = new FetchQueue();
- self::processActivity($fetchQueue, $ldactivity, $body, $uid, $trust_source, true, $signer, $http_signer);
- $fetchQueue->process();
- }
-
- private static function enqueuePost(array $ldactivity = [], string $type, int $uid, string $http_signer): array
- {
- $fields = [
- 'activity-id' => $ldactivity['id'],
- 'object-id' => $ldactivity['object_id'],
- 'type' => $type,
- 'object-type' => $ldactivity['object_type'],
- 'activity' => json_encode($ldactivity, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),
- 'received' => DateTimeFormat::utcNow(),
- ];
-
- if (!empty($ldactivity['object_object_type'])) {
- $fields['object-object-type'] = $ldactivity['object_object_type'];
- }
-
- if (!empty($http_signer)) {
- $fields['signer'] = $http_signer;
- }
-
- DBA::insert('inbox-entry', $fields, Database::INSERT_IGNORE);
-
- $queue = DBA::selectFirst('inbox-entry', ['id'], ['activity-id' => $ldactivity['id']]);
- if (!empty($queue['id'])) {
- $ldactivity['entry-id'] = $queue['id'];
- DBA::insert('inbox-entry-receiver', ['queue-id' => $queue['id'], 'uid' => $uid], Database::INSERT_IGNORE);
- }
- return $ldactivity;
- }
-
- public static function removeFromQueue(array $activity = [])
- {
- if (empty($activity['entry-id'])) {
- return;
- }
- DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
+ self::processActivity($ldactivity, $body, $uid, $trust_source, true, $signer, $http_signer);
}
/**
return;
}
- $fetchQueue = new FetchQueue();
-
- $id = Processor::fetchMissingActivity($fetchQueue, $object_id, [], $actor, self::COMPLETION_RELAY);
+ $id = Processor::fetchMissingActivity($object_id, [], $actor, self::COMPLETION_RELAY);
if (empty($id)) {
Logger::notice('Relayed message had not been fetched', ['id' => $object_id, 'actor' => $actor]);
return;
}
- $fetchQueue->process();
-
$item_id = Item::searchByLink($object_id);
if ($item_id) {
Logger::info('Relayed message had been fetched and stored', ['id' => $object_id, 'item' => $item_id, 'actor' => $actor]);
/**
* Processes the activity object
*
- * @param FetchQueue $fetchQueue
* @param array $activity Array with activity data
* @param string $body The unprocessed body
* @param int|null $uid User ID
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException
*/
- public static function processActivity(FetchQueue $fetchQueue, array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [], string $http_signer = '')
+ public static function processActivity(array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [], string $http_signer = '')
{
$type = JsonLD::fetchElement($activity, '@type');
if (!$type) {
if (!empty($activity['thread-completion'])) {
$object_data['thread-completion'] = $activity['thread-completion'];
}
-
+
if (!empty($activity['completion-mode'])) {
$object_data['completion-mode'] = $activity['completion-mode'];
}
$object_data['thread-children-type'] = $activity['thread-children-type'];
}
+ if (!empty($activity['recursion-depth'])) {
+ $object_data['recursion-depth'] = $activity['recursion-depth'];
+ }
+
// Internal flag for posts that arrived via relay
if (!empty($activity['from-relay'])) {
$object_data['from-relay'] = $activity['from-relay'];
}
- $object_data = self::enqueuePost($object_data, $type, $uid, $http_signer);
+ if ($type == 'as:Announce') {
+ $object_data['object_activity'] = $activity;
+ }
+
+ $object_data = Queue::add($object_data, $type, $uid, $http_signer, $push);
if (in_array('as:Question', [$object_data['object_type'] ?? '', $object_data['object_object_type'] ?? ''])) {
self::storeUnhandledActivity(false, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
}
+ if (!self::routeActivities($object_data, $type, $push)) {
+ self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ //if (!DI::config()->get('debug', 'ap_log_unknown')) {
+ // Queue::remove($object_data);
+ //}
+ }
+ }
+
+ public static function routeActivities($object_data, $type, $push)
+ {
+ $activity = $object_data['object_activity'] ?? [];
+
switch ($type) {
case 'as:Create':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
- $item = ActivityPub\Processor::createItem($fetchQueue, $object_data);
+ $item = ActivityPub\Processor::createItem($object_data);
ActivityPub\Processor::postItem($object_data, $item);
} elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) {
// Unhandled Peertube activity
- self::removeFromQueue($object_data);
+ Queue::remove($object_data);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
case 'as:Invite':
if (in_array($object_data['object_type'], ['as:Event'])) {
- $item = ActivityPub\Processor::createItem($fetchQueue, $object_data);
+ $item = ActivityPub\Processor::createItem($object_data);
ActivityPub\Processor::postItem($object_data, $item);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
ActivityPub\Processor::addToFeaturedCollection($object_data);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
+ Queue::remove($object_data);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
case 'as:Announce':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
+ $actor = JsonLD::fetchElement($activity, 'as:actor', '@id');
$object_data['thread-completion'] = Contact::getIdForURL($actor);
$object_data['completion-mode'] = self::COMPLETION_ANNOUCE;
- $item = ActivityPub\Processor::createItem($fetchQueue, $object_data);
+ $item = ActivityPub\Processor::createItem($object_data);
if (empty($item)) {
return;
}
$item['post-reason'] = Item::PR_ANNOUNCEMENT;
ActivityPub\Processor::postItem($object_data, $item);
- $announce_object_data = self::processObject($activity);
- $announce_object_data['name'] = $type;
- $announce_object_data['author'] = JsonLD::fetchElement($activity, 'as:actor', '@id');
- $announce_object_data['object_id'] = $object_data['object_id'];
- $announce_object_data['object_type'] = $object_data['object_type'];
- $announce_object_data['push'] = $push;
+ if (!empty($activity)) {
+ $announce_object_data = self::processObject($activity);
+ $announce_object_data['name'] = $type;
+ $announce_object_data['author'] = $actor;
+ $announce_object_data['object_id'] = $object_data['object_id'];
+ $announce_object_data['object_type'] = $object_data['object_type'];
+ $announce_object_data['push'] = $push;
- if (!empty($body)) {
- $announce_object_data['raw'] = $body;
- }
-
- ActivityPub\Processor::createActivity($fetchQueue, $announce_object_data, Activity::ANNOUNCE);
+ if (!empty($object_data['raw'])) {
+ $announce_object_data['raw'] = $object_data['raw'];
+ }
+ ActivityPub\Processor::createActivity($announce_object_data, Activity::ANNOUNCE);
+ } else echo "\n***************************\n";
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
case 'as:Like':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
- ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::LIKE);
+ ActivityPub\Processor::createActivity($object_data, Activity::LIKE);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
+ Queue::remove($object_data);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
case 'as:Dislike':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
- ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::DISLIKE);
+ ActivityPub\Processor::createActivity($object_data, Activity::DISLIKE);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
+ Queue::remove($object_data);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
case 'as:TentativeAccept':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
- ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTENDMAYBE);
+ ActivityPub\Processor::createActivity($object_data, Activity::ATTENDMAYBE);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
case 'as:Update':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
- ActivityPub\Processor::updateItem($fetchQueue, $object_data);
+ ActivityPub\Processor::updateItem($object_data);
} elseif (in_array($object_data['object_type'], self::ACCOUNT_TYPES)) {
ActivityPub\Processor::updatePerson($object_data);
} elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) {
// Unhandled Peertube activity
- self::removeFromQueue($object_data);
+ Queue::remove($object_data);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
ActivityPub\Processor::deletePerson($object_data);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. Most likely we don't have it here. We ignore this activity.
+ Queue::remove($object_data);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
if (in_array($object_data['object_type'], self::ACCOUNT_TYPES)) {
ActivityPub\Processor::blockAccount($object_data);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
ActivityPub\Processor::removeFromFeaturedCollection($object_data);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
+ Queue::remove($object_data);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
ActivityPub\Processor::followUser($object_data);
} elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
$object_data['reply-to-id'] = $object_data['object_id'];
- ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::FOLLOW);
+ ActivityPub\Processor::createActivity($object_data, Activity::FOLLOW);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
if ($object_data['object_type'] == 'as:Follow') {
ActivityPub\Processor::acceptFollowUser($object_data);
} elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
- ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTEND);
+ ActivityPub\Processor::createActivity($object_data, Activity::ATTEND);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
if ($object_data['object_type'] == 'as:Follow') {
ActivityPub\Processor::rejectFollowUser($object_data);
} elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
- ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTENDNO);
+ ActivityPub\Processor::createActivity($object_data, Activity::ATTENDNO);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
} elseif (in_array($object_data['object_type'], array_merge(self::ACTIVITY_TYPES, ['as:Announce', 'as:Create', ''])) &&
empty($object_data['object_object_type'])) {
// We cannot detect the target object. So we can ignore it.
- self::removeFromQueue($object_data);
} elseif (in_array($object_data['object_type'], ['as:Create']) &&
in_array($object_data['object_object_type'], ['pt:CacheFile'])) {
// Unhandled Peertube activity
- self::removeFromQueue($object_data);
+ Queue::remove($object_data);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
case 'as:View':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
- ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::VIEW);
+ ActivityPub\Processor::createActivity($object_data, Activity::VIEW);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. Most likely we don't have it here. We ignore this activity.
- self::removeFromQueue($object_data);
+ Queue::remove($object_data);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
case 'litepub:EmojiReact':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
- ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::EMOJIREACT);
+ ActivityPub\Processor::createActivity($object_data, Activity::EMOJIREACT);
} elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
+ Queue::remove($object_data);
} else {
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+ return false;
}
break;
default:
Logger::info('Unknown activity: ' . $type . ' ' . $object_data['object_type']);
- self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
- break;
+ return false;
}
+ return true;
}
/**
*/
private static function storeUnhandledActivity(bool $unknown, string $type, array $object_data, array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [])
{
- if (!DI::config()->get('debug', 'ap_log_unknown')) {
- self::removeFromQueue($activity);
- return;
- }
-
$file = ($unknown ? 'unknown-' : 'unhandled-') . str_replace(':', '-', $type) . '-';
if (!empty($object_data['object_type'])) {
--- /dev/null
+<?php
+/**
+ * @copyright Copyright (C) 2010-2022, the Friendica project
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace Friendica\Worker;
+
+use Friendica\Core\Logger;
+use Friendica\Protocol\ActivityPub;
+use Friendica\Protocol\ActivityPub\Receiver;
+
+class FetchMissingActivity
+{
+ /**
+ * Fetch missing activities
+ * @param string $url Contact URL
+ */
+ public static function execute(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL)
+ {
+ Logger::info('Start fetching missing activity', ['url' => $url]);
+ $result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion);
+ Logger::info('Finished fetching missing activity', ['url' => $url, 'result' => $result]);
+ }
+}
"id" => ["type" => "int unsigned", "not null" => "1", "extra" => "auto_increment", "primary" => "1", "comment" => "sequential ID"],
"activity-id" => ["type" => "varbinary(255)", "comment" => "id of the incoming activity"],
"object-id" => ["type" => "varbinary(255)", "comment" => ""],
+ "in-reply-to-id" => ["type" => "varbinary(255)", "comment" => ""],
"type" => ["type" => "varchar(64)", "comment" => "Type of the activity"],
"object-type" => ["type" => "varchar(64)", "comment" => "Type of the object activity"],
"object-object-type" => ["type" => "varchar(64)", "comment" => "Type of the object's object activity"],
"received" => ["type" => "datetime", "comment" => "Receiving date"],
"activity" => ["type" => "mediumtext", "comment" => "The JSON activity"],
"signer" => ["type" => "varchar(255)", "comment" => ""],
+ "push" => ["type" => "boolean", "not null" => "1", "default" => "0", "comment" => ""],
],
"indexes" => [
"PRIMARY" => ["id"],