'photo' => $item['owner-avatar'], 'network' => $item['network']];
$item['owner-id'] = ($item['owner-id'] ?? 0) ?: Contact::getIdForURL($item['owner-link'], 0, null, $default);
- $actor = ($item['gravity'] == GRAVITY_PARENT) ? $item['owner-id'] : $item['author-id'];
- if (!$item['origin'] && ($item['uid'] != 0) && Contact::isSharing($actor, $item['uid'])) {
- $item['post-reason'] = self::PR_FOLLOWER;
- }
-
- if ($item['origin'] && empty($item['post-reason'])) {
- $item['post-reason'] = self::PR_LOCAL;
- }
+ $item['post-reason'] = self::getPostReason($item);
// Ensure that there is an avatar cache
Contact::checkAvatarCache($item['author-id']);
return $post_user_id;
}
+ /**
+ * Fetch the post reason for a given item array
+ *
+ * @param array $item
+ *
+ * @return integer
+ */
+ public static function getPostReason(array $item): int
+ {
+ $actor = ($item['gravity'] == GRAVITY_PARENT) ? $item['owner-id'] : $item['author-id'];
+ if (empty($item['origin']) && ($item['uid'] != 0) && Contact::isSharing($actor, $item['uid'])) {
+ return self::PR_FOLLOWER;
+ }
+
+ if (!empty($item['origin']) && empty($item['post-reason'])) {
+ return self::PR_LOCAL;
+ }
+
+ return $item['post-reason'] ?? self::PR_NONE;
+ }
+
/**
* Update the display cache
*
$item = array_merge($item, $fields);
+ $item['post-reason'] = self::getPostReason($item);
+
$is_reshare = ($item['gravity'] == GRAVITY_ACTIVITY) && ($item['verb'] == Activity::ANNOUNCE);
if ((($item['gravity'] == GRAVITY_PARENT) || $is_reshare) &&
DI::pConfig()->get($uid, 'system', 'accept_only_sharer') == self::COMPLETION_NONE &&
- !Contact::isSharingByURL($item['author-link'], $uid) &&
- !Contact::isSharingByURL($item['owner-link'], $uid)) {
+ !in_array($item['post-reason'], [self::PR_FOLLOWER, self::PR_TAG, self::PR_TO, self::PR_CC])) {
Logger::info('Contact is not a follower, thread will not be stored', ['author' => $item['author-link'], 'uid' => $uid]);
return 0;
}
Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]);
$item = self::createItem($activity, false);
if (empty($item)) {
+ Queue::remove($activity);
return;
}
$item = self::processContent($activity, $item);
if (empty($item)) {
+ Queue::remove($activity);
return;
}
*/
public static function createItem(array $activity, bool $fetch_parents): array
{
- if (self::isProcessed($activity['id'])) {
+ if (self::isProcessed($activity['id']) && !Post::exists(['uri' => $activity['id']])) {
Logger::info('Id is already processed', ['id' => $activity['id']]);
return [];
}
Logger::debug('Create Item', ['id' => $activity['id'], 'conversation' => $item['conversation'] ?? '']);
if (empty($activity['author']) && empty($activity['actor'])) {
Logger::notice('Missing author and actor. We quit here.', ['activity' => $activity]);
+ Queue::remove($activity);
return [];
}
if (empty($conversation) && empty($activity['directmessage']) && ($item['gravity'] != GRAVITY_PARENT) && !Post::exists(['uri' => $item['thr-parent']])) {
Logger::info('Parent not found, message will be discarded.', ['thr-parent' => $item['thr-parent']]);
+ if (!$fetch_parents) {
+ Queue::remove($activity);
+ }
return [];
}
$item = self::processContent($activity, $item);
if (empty($item)) {
Logger::info('Message was not processed');
+ Queue::remove($activity);
return [];
}
}
// @todo To ensure that the remote system is working correctly, we can check if the "Content-Type" contains JSON
- if (in_array($curlResult->getReturnCode(), [404])) {
+ if (in_array($curlResult->getReturnCode(), [401, 404])) {
return true;
}
- $object = json_decode($curlResult->getBody(), true);
- if (!empty($object)) {
- $activity = JsonLD::compact($object);
- if (JsonLD::fetchElement($activity, '@type') == 'as:Tombstone') {
+ if ($curlResult->isSuccess()) {
+ $object = json_decode($curlResult->getBody(), true);
+ if (!empty($object)) {
+ $activity = JsonLD::compact($object);
+ if (JsonLD::fetchElement($activity, '@type') == 'as:Tombstone') {
+ return true;
+ }
+ }
+ } elseif ($curlResult->getReturnCode() == 0) {
+ $host = parse_url($url, PHP_URL_HOST);
+ if (!(filter_var($host, FILTER_VALIDATE_IP) || @dns_get_record($host . '.', DNS_A + DNS_AAAA))) {
return true;
- }
+ }
}
return false;
Logger::warning('Unknown parent item.', ['uri' => $parent_uri]);
return false;
}
- if (($item['private'] == Item::PRIVATE) && ($parent['private'] != Item::PRIVATE)) {
+ if (!empty($activity['type']) && in_array($activity['type'], Receiver::CONTENT_TYPES) && ($item['private'] == Item::PRIVATE) && ($parent['private'] != Item::PRIVATE)) {
Logger::warning('Item is private but the parent is not. Dropping.', ['item-uri' => $item['uri'], 'thr-parent' => $item['thr-parent']]);
return false;
}
if (!self::isSolicitedMessage($activity, $item)) {
DBA::delete('item-uri', ['id' => $item['uri-id']]);
+ if (!empty($activity['entry-id'])) {
+ Queue::deleteById($activity['entry-id']);
+ }
return;
}
$item['post-reason'] = Item::PR_NONE;
}
- if (!empty($activity['from-relay'])) {
- $item['post-reason'] = Item::PR_RELAY;
- } elseif (!empty($activity['thread-completion'])) {
- $item['post-reason'] = Item::PR_FETCHED;
- } elseif (in_array($item['post-reason'], [Item::PR_GLOBAL, Item::PR_NONE]) && !empty($activity['push'])) {
- $item['post-reason'] = Item::PR_PUSHED;
+ $item['post-reason'] = Item::getPostReason($item);
+
+ if (in_array($item['post-reason'], [Item::PR_GLOBAL, Item::PR_NONE])) {
+ if (!empty($activity['from-relay'])) {
+ $item['post-reason'] = Item::PR_RELAY;
+ } elseif (!empty($activity['thread-completion'])) {
+ $item['post-reason'] = Item::PR_FETCHED;
+ } elseif (!empty($activity['push'])) {
+ $item['post-reason'] = Item::PR_PUSHED;
+ }
}
if ($item['isForum'] ?? false) {
continue;
}
- if (!($item['isForum'] ?? false) && ($receiver != 0) && ($item['gravity'] == GRAVITY_PARENT) && !Contact::isSharingByURL($activity['author'], $receiver)) {
- if ($item['post-reason'] == Item::PR_BCC) {
- Logger::info('Top level post via BCC from a non sharer, ignoring', ['uid' => $receiver, 'contact' => $item['contact-id']]);
- continue;
- }
+ if (($receiver != 0) && ($item['gravity'] == GRAVITY_PARENT) && !in_array($item['post-reason'], [Item::PR_FOLLOWER, Item::PR_TAG, item::PR_TO, Item::PR_CC])) {
+ if (!($item['isForum'] ?? false)) {
+ if ($item['post-reason'] == Item::PR_BCC) {
+ Logger::info('Top level post via BCC from a non sharer, ignoring', ['uid' => $receiver, 'contact' => $item['contact-id'], 'url' => $item['uri']]);
+ continue;
+ }
- if (
- !empty($activity['thread-children-type'])
- && in_array($activity['thread-children-type'], Receiver::ACTIVITY_TYPES)
- && DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') != Item::COMPLETION_LIKE
- ) {
- Logger::info('Top level post from thread completion from a non sharer had been initiated via an activity, ignoring',
- ['type' => $activity['thread-children-type'], 'user' => $item['uid'], 'causer' => $item['causer-link'], 'author' => $activity['author'], 'url' => $item['uri']]);
- continue;
+ if ((DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') != Item::COMPLETION_LIKE)
+ && in_array($activity['thread-children-type'] ?? '', Receiver::ACTIVITY_TYPES)) {
+ Logger::info('Top level post from thread completion from a non sharer had been initiated via an activity, ignoring',
+ ['type' => $activity['thread-children-type'], 'user' => $item['uid'], 'causer' => $item['causer-link'], 'author' => $activity['author'], 'url' => $item['uri']]);
+ continue;
+ }
}
- }
-
- $is_forum = false;
- if ($receiver != 0) {
+ $is_forum = false;
$user = User::getById($receiver, ['account-type']);
if (!empty($user['account-type'])) {
$is_forum = ($user['account-type'] == User::ACCOUNT_TYPE_COMMUNITY);
}
- }
-
- if (!$is_forum && DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') == Item::COMPLETION_NONE && ($receiver != 0) && ($item['gravity'] == GRAVITY_PARENT)) {
- $skip = !Contact::isSharingByURL($activity['author'], $receiver);
-
- if ($skip && (($activity['type'] == 'as:Announce') || ($item['isForum'] ?? false))) {
- $skip = !Contact::isSharingByURL($activity['actor'], $receiver);
- }
- if ($skip) {
- Logger::info('Skipping post', ['uid' => $receiver, 'url' => $item['uri']]);
+ if ((DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') == Item::COMPLETION_NONE)
+ && ((!$is_forum && !($item['isForum'] ?? false) && ($activity['type'] != 'as:Announce'))
+ || !Contact::isSharingByURL($activity['actor'], $receiver))) {
+ Logger::info('Actor is a non sharer, is no forum or it is no announce', ['uid' => $receiver, 'actor' => $activity['actor'], 'url' => $item['uri'], 'type' => $activity['type']]);
continue;
}
*/
public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string
{
- if (!empty($child['receiver'])) {
- $uid = ActivityPub\Receiver::getFirstUserFromReceivers($child['receiver']);
- } else {
- $uid = 0;
- }
-
- $object = self::fetchCachedActivity($url, $uid);
+ $object = self::fetchCachedActivity($url, 0);
if (empty($object)) {
return '';
}
$compacted = JsonLD::compact($object);
$attributed_to = JsonLD::fetchElement($compacted, 'as:attributedTo', '@id');
}
- $signer[] = $attributed_to;
+ $signer[] = $attributed_to;
}
if (!empty($object['actor'])) {
$ldactivity['completion-mode'] = $completion;
}
- if (!empty($child['type'])) {
+ if (!empty($child['thread-children-type'])) {
+ $ldactivity['thread-children-type'] = $child['thread-children-type'];
+ } elseif (!empty($child['type'])) {
$ldactivity['thread-children-type'] = $child['type'];
+ } else {
+ $ldactivity['thread-children-type'] = 'as:Create';
}
if (!empty($relay_actor) && !self::acceptIncomingMessage($ldactivity, $object['id'])) {
if (($completion == Receiver::COMPLETION_RELAY) && Queue::exists($url, 'as:Create')) {
Logger::notice('Activity has already been queued.', ['url' => $url, 'object' => $activity['id']]);
- } elseif (ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer, '', $completion)) {
+ } elseif (ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), 0, true, false, $signer, '', $completion)) {
Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]);
} else {
Logger::notice('Activity had been fetched and will be processed later.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]);
use Friendica\Database\Database;
use Friendica\Database\DBA;
use Friendica\DI;
+use Friendica\Model\ItemURI;
use Friendica\Model\Post;
use Friendica\Util\DateTimeFormat;
use Friendica\Util\JsonLD;
return;
}
+ Logger::debug('Delete inbox-entry', ['id' => $entry['id']]);
+
DBA::delete('inbox-entry', ['id' => $entry['id']]);
+
+ $children = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $entry['object-id']]);
+ while ($child = DBA::fetch($children)) {
+ self::deleteById($child['id']);
+ }
+ DBA::close($children);
}
/**
return false;
}
+ if (!self::isProcessable($id)) {
+ Logger::debug('Other queue entries need to be processed first.', ['id' => $id]);
+ return false;
+ }
+
if (!empty($entry['wid'])) {
$worker = DI::app()->getQueue();
$wid = $worker['id'] ?? 0;
$activity['worker-id'] = $entry['wid'];
$activity['recursion-depth'] = 0;
+ if (empty($activity['thread-children-type'])) {
+ $activity['thread-children-type'] = $type;
+ }
+
$receivers = DBA::select('inbox-entry-receiver', ['uid'], ["`queue-id` = ? AND `uid` != ?", $entry['id'], 0]);
while ($receiver = DBA::fetch($receivers)) {
if (!in_array($receiver['uid'], $activity['receiver'])) {
DBA::close($entries);
}
+ public static function isProcessable(int $id): bool
+ {
+ $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
+ if (empty($entry)) {
+ return false;
+ }
+
+ if (!empty($entry['object-id']) && Post::exists(['uri' => $entry['object-id']])) {
+ // The object already exists, so processing can be done
+ return true;
+ }
+
+ if (!empty($entry['conversation'])) {
+ $conv_id = ItemURI::getIdByURI($entry['conversation'], false);
+ if (DBA::exists('post-thread', ['conversation-id' => $conv_id])) {
+ // We have got the conversation in the system, so the post can be processed
+ return true;
+ }
+ }
+
+ if (!empty($entry['object-id']) && !empty($entry['in-reply-to-id']) && ($entry['object-id'] != $entry['in-reply-to-id']) && DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) {
+ // This entry belongs to some other entry that should be processed first
+ return false;
+ }
+
+ return true;
+ }
+
/**
* Clear old activities
*
{
// We delete all entries that aren't associated with a worker entry after seven days.
// The other entries are deleted when the worker deferred for too long.
- DBA::delete('inbox-entry', ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]);
+ $entries = DBA::select('inbox-entry', ['id'], ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]);
+ while ($entry = DBA::fetch($entries)) {
+ self::deleteById($entry['id']);
+ }
+ DBA::close($entries);
// Optimizing this table only last seconds
if (DI::config()->get('system', 'optimize_tables')) {
return true;
}
}
-
+
if (DI::config()->get('system', 'decoupled_receiver') && ($trust_source || DI::config()->get('debug', 'ap_inbox_store_untrusted'))) {
$object_data = Queue::add($object_data, $type, $uid, $http_signer, $push, $trust_source);
}
}
if (!empty($object_data['entry-id']) && DI::config()->get('system', 'decoupled_receiver') && ($push || ($completion == self::COMPLETION_RELAY))) {
- // We delay by 5 seconds to allow to accumulate all receivers
- $delayed = date(DateTimeFormat::MYSQL, time() + 5);
- Logger::debug('Initiate processing', ['id' => $object_data['entry-id'], 'uri' => $object_data['object_id']]);
- $wid = Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']);
- Queue::setWorkerId($object_data['entry-id'], $wid);
+ if (Queue::isProcessable($object_data['entry-id'])) {
+ // We delay by 5 seconds to allow to accumulate all receivers
+ $delayed = date(DateTimeFormat::MYSQL, time() + 5);
+ Logger::debug('Initiate processing', ['id' => $object_data['entry-id'], 'uri' => $object_data['object_id']]);
+ $wid = Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']);
+ Queue::setWorkerId($object_data['entry-id'], $wid);
+ } else {
+ Logger::debug('Other queue entries need to be processed first.', ['id' => $object_data['entry-id']]);
+ }
return false;
}
$object_data['thread-completion'] = Contact::getIdForURL($actor);
$object_data['completion-mode'] = self::COMPLETION_ANNOUCE;
- $item = ActivityPub\Processor::createItem($object_data, $fetch_parents);
- if (empty($item)) {
- return false;
- }
+ if (!Post::exists(['uri' => $object_data['id'], 'uid' => 0])) {
+ $item = ActivityPub\Processor::createItem($object_data, $fetch_parents);
+ if (empty($item)) {
+ return false;
+ }
- $item['post-reason'] = Item::PR_ANNOUNCEMENT;
- ActivityPub\Processor::postItem($object_data, $item);
+ $item['post-reason'] = Item::PR_ANNOUNCEMENT;
+ ActivityPub\Processor::postItem($object_data, $item);
+ } else {
+ Logger::info('Announced id already exists', ['id' => $object_data['id']]);
+ Queue::remove($object_data);
+ }
if (!empty($activity)) {
$announce_object_data = self::processObject($activity);