const COMPLETION_RELAY = 2;
const COMPLETION_MANUAL = 3;
const COMPLETION_AUTO = 4;
+ const COMPLETION_ASYNC = 5;
/**
* Checks incoming message from the inbox
if (empty($apcontact)) {
Logger::notice('Unable to retrieve AP contact for actor - message is discarded', ['actor' => $actor]);
return;
- } elseif (APContact::isRelay($apcontact)) {
+ } elseif (APContact::isRelay($apcontact) && self::isRelayPost($ldactivity)) {
self::processRelayPost($ldactivity, $actor);
return;
} else {
}
$sig_contact = HTTPSignature::getKeyIdContact($header);
- if (APContact::isRelay($sig_contact)) {
+ if (APContact::isRelay($sig_contact) && self::isRelayPost($ldactivity)) {
Logger::info('Message from a relay', ['url' => $sig_contact['url']]);
self::processRelayPost($ldactivity, $sig_contact['url']);
return;
}
/**
- * Process incoming posts from relays
+ * Check if the activity is a post rhat can be send via a relay
*
- * @param array $activity
- * @param string $actor
- * @return void
+ * @param array $activity
+ * @return boolean
*/
- private static function processRelayPost(array $activity, string $actor)
+ private static function isRelayPost(array $activity): bool
{
$type = JsonLD::fetchElement($activity, '@type');
if (!$type) {
- Logger::notice('Empty type', ['activity' => $activity, 'actor' => $actor]);
- return;
+ return false;
}
$object_type = JsonLD::fetchElement($activity, 'as:object', '@type') ?? '';
$object_id = JsonLD::fetchElement($activity, 'as:object', '@id');
if (empty($object_id)) {
- Logger::notice('No object id found', ['type' => $type, 'object_type' => $object_type, 'actor' => $actor, 'activity' => $activity]);
- return;
+ return false;
}
$handle = ($type == 'as:Announce');
if (!$handle && in_array($type, ['as:Create', 'as:Update'])) {
$handle = in_array($object_type, self::CONTENT_TYPES);
}
+ return $handle;
+ }
- if (!$handle) {
- $trust_source = false;
- $object_data = self::prepareObjectData($activity, 0, false, $trust_source);
-
- if (!$trust_source) {
- Logger::notice('Activity trust could not be achieved.', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor, 'activity' => $activity]);
- return;
- }
+ /**
+ * Process incoming posts from relays
+ *
+ * @param array $activity
+ * @param string $actor
+ * @return void
+ */
+ private static function processRelayPost(array $activity, string $actor)
+ {
+ $type = JsonLD::fetchElement($activity, '@type');
+ if (!$type) {
+ Logger::notice('Empty type', ['activity' => $activity, 'actor' => $actor]);
+ return;
+ }
- if (empty($object_data)) {
- Logger::notice('No object data found', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor, 'activity' => $activity]);
- return;
- }
+ $object_type = JsonLD::fetchElement($activity, 'as:object', '@type') ?? '';
- if (self::routeActivities($object_data, $type, true)) {
- Logger::debug('Handled activity', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor]);
- } else {
- Logger::info('Unhandled activity', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor, 'activity' => $activity]);
- }
+ $object_id = JsonLD::fetchElement($activity, 'as:object', '@id');
+ if (empty($object_id)) {
+ Logger::notice('No object id found', ['type' => $type, 'object_type' => $object_type, 'actor' => $actor, 'activity' => $activity]);
return;
}
return;
}
- Logger::debug('Got relayed message id', ['id' => $object_id, 'actor' => $actor]);
+ Logger::debug('Process post from relay server', ['type' => $type, 'object_type' => $object_type, 'object_id' => $object_id, 'actor' => $actor]);
$item_id = Item::searchByLink($object_id);
if ($item_id) {
return;
}
- $id = Processor::fetchMissingActivity($object_id, [], $actor, self::COMPLETION_RELAY);
- if (empty($id)) {
- Logger::notice('Relayed message had not been fetched', ['id' => $object_id, 'actor' => $actor]);
- return;
+ if (!DI::config()->get('system', 'decoupled_receiver')) {
+ $id = Processor::fetchMissingActivity($object_id, [], $actor, self::COMPLETION_RELAY);
+ if (!empty($id)) {
+ Logger::notice('Relayed message is fetched', ['result' => $id, 'id' => $object_id, 'actor' => $actor]);
+ } else {
+ Logger::notice('Relayed message had not been fetched', ['id' => $object_id, 'actor' => $actor, 'activity' => $activity]);
+ }
+ } elseif (!Fetch::hasWorker($object_id)) {
+ Logger::notice('Fetching is done by worker.', ['id' => $object_id]);
+ Fetch::add($object_id);
+ $activity['recursion-depth'] = 0;
+ $wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $object_id, [], $actor, self::COMPLETION_RELAY);
+ Fetch::setWorkerId($object_id, $wid);
+ } else {
+ Logger::debug('Activity will already be fetched via a worker.', ['url' => $object_id]);
}
}
return true;
}
- if (!empty($object_data['entry-id']) && $decouple && ($push || ($completion == self::COMPLETION_RELAY))) {
+ if (!empty($object_data['entry-id']) && $decouple && ($push || in_array($completion, [self::COMPLETION_RELAY, self::COMPLETION_ASYNC]))) {
if (Queue::isProcessable($object_data['entry-id'])) {
// We delay by 5 seconds to allow to accumulate all receivers
$delayed = date(DateTimeFormat::MYSQL, time() + 5);