X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=src%2FWorker%2FNotifier.php;h=18ac579ccdeb84520d405682f60290847db398cf;hb=9115ec5f0deb71111d53a1eda0c1de7c4d8c6c53;hp=80628d7dba08d08fe5e55a3dabc84886e2a27038;hpb=67eb9ae360d60f719ea6b5a4c9df9c57d90fca01;p=friendica.git diff --git a/src/Worker/Notifier.php b/src/Worker/Notifier.php index 80628d7dba..18ac579ccd 100644 --- a/src/Worker/Notifier.php +++ b/src/Worker/Notifier.php @@ -30,6 +30,7 @@ use Friendica\DI; use Friendica\Model\Contact; use Friendica\Model\Conversation; use Friendica\Model\Group; +use Friendica\Model\GServer; use Friendica\Model\Item; use Friendica\Model\Post; use Friendica\Model\PushSubscriber; @@ -38,6 +39,7 @@ use Friendica\Model\User; use Friendica\Protocol\Activity; use Friendica\Protocol\ActivityPub; use Friendica\Protocol\Diaspora; +use Friendica\Protocol\Delivery; use Friendica\Protocol\OStatus; use Friendica\Protocol\Salmon; use Friendica\Util\Network; @@ -81,12 +83,12 @@ class Notifier $uid = $message['uid']; $recipients[] = $message['contact-id']; - $mail = ActivityPub\Transmitter::ItemArrayFromMail($target_id); + $mail = ActivityPub\Transmitter::getItemArrayFromMail($target_id); $inboxes = ActivityPub\Transmitter::fetchTargetInboxes($mail, $uid, true); foreach ($inboxes as $inbox => $receivers) { $ap_contacts = array_merge($ap_contacts, $receivers); Logger::info('Delivery via ActivityPub', ['cmd' => $cmd, 'target' => $target_id, 'inbox' => $inbox]); - Worker::add(['priority' => PRIORITY_HIGH, 'created' => $a->getQueueValue('created'), 'dont_fork' => true], + Worker::add(['priority' => Worker::PRIORITY_HIGH, 'created' => $a->getQueueValue('created'), 'dont_fork' => true], 'APDelivery', $cmd, $target_id, $inbox, $uid, $receivers, $post_uriid); } } elseif ($cmd == Delivery::SUGGESTION) { @@ -99,7 +101,7 @@ class Notifier $uid = $target_id; $condition = ['uid' => $target_id, 'self' => false, 'network' => [Protocol::DFRN, Protocol::DIASPORA]]; - $delivery_contacts_stmt = DBA::select('contact', ['id', 'url', 'addr', 'network', 'protocol', 'batch'], $condition); + $delivery_contacts_stmt = DBA::select('contact', ['id', 'url', 'addr', 'network', 'protocol', 'baseurl', 'gsid', 'batch'], $condition); } else { $post = Post::selectFirst(['id'], ['uri-id' => $post_uriid, 'uid' => $sender_uid]); if (!DBA::isResult($post)) { @@ -143,7 +145,7 @@ class Notifier } } - $top_level = $target_item['gravity'] == GRAVITY_PARENT; + $top_level = $target_item['gravity'] == Item::GRAVITY_PARENT; } $owner = User::getOwnerDataById($uid); @@ -173,7 +175,7 @@ class Notifier $parent = $items[0]; $fields = ['network', 'author-id', 'author-link', 'author-network', 'owner-id']; - $condition = ['uri' => $target_item["thr-parent"], 'uid' => $target_item["uid"]]; + $condition = ['uri' => $target_item['thr-parent'], 'uid' => $target_item['uid']]; $thr_parent = Post::selectFirst($fields, $condition); if (empty($thr_parent)) { $thr_parent = $parent; @@ -191,6 +193,10 @@ class Notifier // when the original comment author does support the Diaspora protocol. if ($thr_parent['author-link'] && $target_item['parent-uri'] != $target_item['thr-parent']) { $diaspora_delivery = Diaspora::isSupportedByContactUrl($thr_parent['author-link']); + if ($diaspora_delivery && empty($target_item['signed_text'])) { + Logger::debug('Post has got no Diaspora signature, so there will be no Diaspora delivery', ['guid' => $target_item['guid'], 'uri-id' => $target_item['uri-id']]); + $diaspora_delivery = false; + } Logger::info('Threaded comment', ['diaspora_delivery' => (int)$diaspora_delivery]); } @@ -332,7 +338,7 @@ class Notifier foreach ($items as $item) { $recipients[] = $item['contact-id']; // pull out additional tagged people to notify (if public message) - if ($public_message && strlen($item['inform'])) { + if ($public_message && $item['inform']) { $people = explode(',',$item['inform']); foreach ($people as $person) { if (substr($person,0,4) === 'cid:') { @@ -372,27 +378,27 @@ class Notifier if (($thr_parent && ($thr_parent['network'] == Protocol::OSTATUS)) || ($parent['network'] == Protocol::OSTATUS)) { $diaspora_delivery = false; - Logger::info('Some parent is OStatus for '.$target_item["guid"]." - Author: ".$thr_parent['author-id']." - Owner: ".$thr_parent['owner-id']); + Logger::info('Some parent is OStatus for ' . $target_item['guid'] . ' - Author: ' . $thr_parent['author-id'] . ' - Owner: ' . $thr_parent['owner-id']); // Send a salmon to the parent author $probed_contact = DBA::selectFirst('contact', ['url', 'notify'], ['id' => $thr_parent['author-id']]); - if (DBA::isResult($probed_contact) && !empty($probed_contact["notify"])) { - Logger::notice('Notify parent author', ['url' => $probed_contact["url"], 'notify' => $probed_contact["notify"]]); - $url_recipients[$probed_contact["notify"]] = $probed_contact["notify"]; + if (DBA::isResult($probed_contact) && !empty($probed_contact['notify'])) { + Logger::notice('Notify parent author', ['url' => $probed_contact['url'], 'notify' => $probed_contact['notify']]); + $url_recipients[$probed_contact['notify']] = $probed_contact['notify']; } // Send a salmon to the parent owner $probed_contact = DBA::selectFirst('contact', ['url', 'notify'], ['id' => $thr_parent['owner-id']]); - if (DBA::isResult($probed_contact) && !empty($probed_contact["notify"])) { - Logger::notice('Notify parent owner', ['url' => $probed_contact["url"], 'notify' => $probed_contact["notify"]]); - $url_recipients[$probed_contact["notify"]] = $probed_contact["notify"]; + if (DBA::isResult($probed_contact) && !empty($probed_contact['notify'])) { + Logger::notice('Notify parent owner', ['url' => $probed_contact['url'], 'notify' => $probed_contact['notify']]); + $url_recipients[$probed_contact['notify']] = $probed_contact['notify']; } // Send a salmon notification to every person we mentioned in the post foreach (Tag::getByURIId($target_item['uri-id'], [Tag::MENTION, Tag::EXCLUSIVE_MENTION, Tag::IMPLICIT_MENTION]) as $tag) { $probed_contact = Contact::getByURL($tag['url']); if (!empty($probed_contact['notify'])) { - Logger::notice('Notify mentioned user', ['url' => $probed_contact["url"], 'notify' => $probed_contact["notify"]]); + Logger::notice('Notify mentioned user', ['url' => $probed_contact['url'], 'notify' => $probed_contact['notify']]); $url_recipients[$probed_contact['notify']] = $probed_contact['notify']; } } @@ -421,7 +427,7 @@ class Notifier if (!empty($networks)) { $condition['network'] = $networks; } - $delivery_contacts_stmt = DBA::select('contact', ['id', 'addr', 'url', 'network', 'protocol', 'batch'], $condition); + $delivery_contacts_stmt = DBA::select('contact', ['id', 'addr', 'url', 'network', 'protocol', 'baseurl', 'gsid', 'batch'], $condition); } $conversants = []; @@ -433,7 +439,7 @@ class Notifier if ($diaspora_delivery && !$unlisted) { $batch_delivery = true; - $participants = DBA::selectToArray('contact', ['batch', 'network', 'protocol', 'id', 'url', 'name'], + $participants = DBA::selectToArray('contact', ['batch', 'network', 'protocol', 'baseurl', 'gsid', 'id', 'url', 'name'], ["`network` = ? AND `batch` != '' AND `uid` = ? AND `rel` != ? AND NOT `blocked` AND NOT `pending` AND NOT `archive`", Protocol::DIASPORA, $owner['uid'], Contact::SHARING], ['group_by' => ['batch', 'network', 'protocol']]); @@ -445,7 +451,7 @@ class Notifier $condition = ['network' => Protocol::DFRN, 'uid' => $owner['uid'], 'blocked' => false, 'pending' => false, 'archive' => false, 'rel' => [Contact::FOLLOWER, Contact::FRIEND]]; - $contacts = DBA::selectToArray('contact', ['id', 'url', 'addr', 'name', 'network', 'protocol'], $condition); + $contacts = DBA::selectToArray('contact', ['id', 'url', 'addr', 'name', 'network', 'protocol', 'baseurl', 'gsid'], $condition); $conversants = array_merge($contacts, $participants); @@ -467,7 +473,7 @@ class Notifier Hook::callAll('notifier_end', $target_item); // Workaround for pure connector posts - if (in_array($cmd, [Delivery::POST, Delivery::POKE])) { + if ($cmd == Delivery::POST) { if ($delivery_queue_count == 0) { Post\DeliveryData::incrementQueueDone($target_item['uri-id']); $delivery_queue_count = 1; @@ -493,21 +499,35 @@ class Notifier * @param array $contacts * @param array $ap_contacts * @param array $conversants - * @return int + * + * @return int Count of delivery queue * @throws InternalServerErrorException * @throws Exception */ - private static function delivery(string $cmd, int $post_uriid, int $sender_uid, array $target_item, array $thr_parent, array $owner, bool $batch_delivery, bool $in_batch, array $contacts, array $ap_contacts, array $conversants = []) + private static function delivery(string $cmd, int $post_uriid, int $sender_uid, array $target_item, array $thr_parent, array $owner, bool $batch_delivery, bool $in_batch, array $contacts, array $ap_contacts, array $conversants = []): int { $a = DI::app(); $delivery_queue_count = 0; + if (!empty($target_item['verb']) && ($target_item['verb'] == Activity::ANNOUNCE)) { + Logger::notice('Announces are only delivery via ActivityPub', ['cmd' => $cmd, 'id' => $target_item['id'], 'guid' => $target_item['guid'], 'uri-id' => $target_item['uri-id'], 'uri' => $target_item['uri']]); + return 0; + } + foreach ($contacts as $contact) { // Direct delivery of local contacts if (!in_array($cmd, [Delivery::RELOCATION, Delivery::SUGGESTION, Delivery::DELETION, Delivery::MAIL]) && $target_uid = User::getIdForURL($contact['url'])) { - Logger::info('Direct delivery', ['uri-id' => $target_item['uri-id'], 'target' => $target_uid]); - $fields = ['protocol' => Conversation::PARCEL_LOCAL_DFRN, 'direction' => Conversation::PUSH]; - Item::storeForUserByUriId($target_item['uri-id'], $target_uid, $fields, $target_item['uid']); + if ($target_item['origin'] || ($target_item['network'] != Protocol::ACTIVITYPUB)) { + if ($target_uid != $target_item['uid']) { + $fields = ['protocol' => Conversation::PARCEL_LOCAL_DFRN, 'direction' => Conversation::PUSH, 'post-reason' => Item::PR_DIRECT]; + Item::storeForUserByUriId($target_item['uri-id'], $target_uid, $fields, $target_item['uid']); + Logger::info('Delivered locally', ['cmd' => $cmd, 'id' => $target_item['id'], 'target' => $target_uid]); + } else { + Logger::info('No need to deliver to myself', ['uid' => $target_uid, 'guid' => $target_item['guid'], 'uri-id' => $target_item['uri-id'], 'uri' => $target_item['uri']]); + } + } else { + Logger::info('Remote item does not need to be delivered locally', ['guid' => $target_item['guid'], 'uri-id' => $target_item['uri-id'], 'uri' => $target_item['uri']]); + } continue; } @@ -545,13 +565,18 @@ class Notifier continue; } + if (!GServer::reachable($contact)) { + Logger::info('Server is not reachable', ['id' => $post_uriid, 'uid' => $sender_uid, 'contact' => $contact]); + continue; + } + Logger::info('Delivery', ['batch' => $in_batch, 'target' => $post_uriid, 'uid' => $sender_uid, 'guid' => $target_item['guid'] ?? '', 'to' => $contact]); // Ensure that posts with our own protocol arrives before Diaspora posts arrive. // Situation is that sometimes Friendica servers receive Friendica posts over the Diaspora protocol first. // The conversion in Markdown reduces the formatting, so these posts should arrive after the Friendica posts. // This is only important for high and medium priority tasks and not for Low priority jobs like deletions. - if (($contact['network'] == Protocol::DIASPORA) && in_array($a->getQueueValue('priority'), [PRIORITY_HIGH, PRIORITY_MEDIUM])) { + if (($contact['network'] == Protocol::DIASPORA) && in_array($a->getQueueValue('priority'), [Worker::PRIORITY_HIGH, Worker::PRIORITY_MEDIUM])) { $deliver_options = ['priority' => $a->getQueueValue('priority'), 'dont_fork' => true]; } else { $deliver_options = ['priority' => $a->getQueueValue('priority'), 'created' => $a->getQueueValue('created'), 'dont_fork' => true]; @@ -560,6 +585,7 @@ class Notifier if (Worker::add($deliver_options, 'Delivery', $cmd, $post_uriid, (int)$contact['id'], $sender_uid)) { $delivery_queue_count++; } + Worker::coolDown(); } return $delivery_queue_count; } @@ -573,11 +599,12 @@ class Notifier * @param array $url_recipients * @param bool $public_message * @param bool $push_notify - * @return int + * + * @return int Count of sent Salmon notifications * @throws InternalServerErrorException * @throws Exception */ - private static function deliverOStatus(int $target_id, array $target_item, array $owner, array $url_recipients, bool $public_message, bool $push_notify) + private static function deliverOStatus(int $target_id, array $target_item, array $owner, array $url_recipients, bool $public_message, bool $push_notify): int { $a = DI::app(); $delivery_queue_count = 0; @@ -598,7 +625,7 @@ class Notifier // Notify PuSH subscribers (Used for OStatus distribution of regular posts) if ($push_notify) { - Logger::info('Activating internal PuSH', ['item' => $target_id]); + Logger::info('Activating internal PuSH', ['uid' => $owner['uid']]); // Handling the pubsubhubbub requests PushSubscriber::publishFeed($owner['uid'], $a->getQueueValue('priority')); @@ -613,9 +640,10 @@ class Notifier * @param array $contact Receiver of the post * @param array $item The post * @param array $thr_parent The thread parent + * * @return bool */ - private static function skipActivityPubForDiaspora(array $contact, array $item, array $thr_parent) + private static function skipActivityPubForDiaspora(array $contact, array $item, array $thr_parent): bool { // No skipping needs to be done when delivery isn't done to Diaspora if ($contact['network'] != Protocol::DIASPORA) { @@ -643,11 +671,12 @@ class Notifier * @param string $cmd Notifier command * @param array $owner Sender of the post * @param string $network Receiver network + * * @return bool * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - private static function isRemovalActivity($cmd, $owner, $network) + private static function isRemovalActivity(string $cmd, array $owner, string $network): bool { return ($cmd == Delivery::DELETION) && $owner['account_removed'] && in_array($network, [Protocol::ACTIVITYPUB, Protocol::DIASPORA]); } @@ -656,11 +685,12 @@ class Notifier * @param int $self_user_id * @param int $priority The priority the Notifier queue item was created with * @param string $created The date the Notifier queue item was created on + * * @return bool * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - private static function notifySelfRemoval($self_user_id, $priority, $created) + private static function notifySelfRemoval(int $self_user_id, int $priority, string $created): bool { $owner = User::getOwnerDataById($self_user_id); if (empty($self_user_id) || empty($owner)) { @@ -677,11 +707,12 @@ class Notifier } DBA::close($contacts_stmt); - $inboxes = ActivityPub\Transmitter::fetchTargetInboxesforUser(0); + $inboxes = ActivityPub\Transmitter::fetchTargetInboxesforUser($self_user_id); foreach ($inboxes as $inbox => $receivers) { Logger::info('Account removal via ActivityPub', ['uid' => $self_user_id, 'inbox' => $inbox]); - Worker::add(['priority' => PRIORITY_NEGLIGIBLE, 'created' => $created, 'dont_fork' => true], + Worker::add(['priority' => Worker::PRIORITY_NEGLIGIBLE, 'created' => $created, 'dont_fork' => true], 'APDelivery', Delivery::REMOVAL, 0, $inbox, $self_user_id, $receivers); + Worker::coolDown(); } return true; @@ -694,11 +725,13 @@ class Notifier * @param array $thr_parent * @param int $priority The priority the Notifier queue item was created with * @param string $created The date the Notifier queue item was created on + * * @return array 'count' => The number of delivery tasks created, 'contacts' => their contact ids * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException + * @todo Unused parameter $owner */ - private static function activityPubDelivery($cmd, array $target_item, array $parent, array $thr_parent, $priority, $created, $owner) + private static function activityPubDelivery($cmd, array $target_item, array $parent, array $thr_parent, int $priority, string $created, $owner): array { // Don't deliver via AP when the starting post isn't from a federated network if (!in_array($parent['network'], Protocol::FEDERATED)) { @@ -745,21 +778,21 @@ class Notifier $relay_inboxes = ActivityPub\Transmitter::addRelayServerInboxes(); } - Logger::info('Origin item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . ' will be distributed.'); - } elseif (!DBA::exists('conversation', ['item-uri' => $target_item['uri'], 'protocol' => Conversation::PARCEL_ACTIVITYPUB])) { - Logger::info('Remote item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . ' is no AP post. It will not be distributed.'); + Logger::info('Origin item will be distributed', ['id' => $target_item['id'], 'url' => $target_item['uri'], 'verb' => $target_item['verb']]); + } elseif (!Post\Activity::exists($target_item['uri-id'])) { + Logger::info('Remote item is no AP post. It will not be distributed.', ['id' => $target_item['id'], 'url' => $target_item['uri'], 'verb' => $target_item['verb']]); return ['count' => 0, 'contacts' => []]; - } elseif ($parent['origin']) { - // Remote items are transmitted via the personal inboxes. - // Doing so ensures that the dedicated receiver will get the message. - $inboxes = ActivityPub\Transmitter::fetchTargetInboxes($parent, $uid, true, $target_item['id']); + } elseif ($parent['origin'] && (($target_item['gravity'] != Item::GRAVITY_ACTIVITY) || DI::config()->get('system', 'redistribute_activities'))) { + $inboxes = ActivityPub\Transmitter::fetchTargetInboxes($parent, $uid, false, $target_item['id']); if (in_array($target_item['private'], [Item::PUBLIC])) { $inboxes = ActivityPub\Transmitter::addRelayServerInboxesForItem($parent['id'], $inboxes); - $relay_inboxes = ActivityPub\Transmitter::addRelayServerInboxes([]); } - Logger::info('Remote item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . ' will be distributed.'); + Logger::info('Remote item will be distributed', ['id' => $target_item['id'], 'url' => $target_item['uri'], 'verb' => $target_item['verb']]); + } else { + Logger::info('Remote activity will not be distributed', ['id' => $target_item['id'], 'url' => $target_item['uri'], 'verb' => $target_item['verb']]); + return ['count' => 0, 'contacts' => []]; } if (empty($inboxes) && empty($relay_inboxes)) { @@ -778,29 +811,53 @@ class Notifier if ((count($receivers) == 1) && Network::isLocalLink($inbox)) { $contact = Contact::getById($receivers[0], ['url']); - if ($target_uid = User::getIdForURL($contact['url'])) { - $fields = ['protocol' => Conversation::PARCEL_LOCAL_DFRN, 'direction' => Conversation::PUSH, 'post-reason' => Item::PR_BCC]; - Item::storeForUserByUriId($target_item['uri-id'], $target_uid, $fields, $target_item['uid']); - Logger::info('Delivered locally', ['cmd' => $cmd, 'id' => $target_item['id'], 'inbox' => $inbox]); + if (!in_array($cmd, [Delivery::RELOCATION, Delivery::SUGGESTION, Delivery::DELETION, Delivery::MAIL]) && ($target_uid = User::getIdForURL($contact['url']))) { + if ($target_item['origin'] || ($target_item['network'] != Protocol::ACTIVITYPUB)) { + if ($target_uid != $target_item['uid']) { + $fields = ['protocol' => Conversation::PARCEL_LOCAL_DFRN, 'direction' => Conversation::PUSH, 'post-reason' => Item::PR_BCC]; + Item::storeForUserByUriId($target_item['uri-id'], $target_uid, $fields, $target_item['uid']); + Logger::info('Delivered locally', ['cmd' => $cmd, 'id' => $target_item['id'], 'inbox' => $inbox]); + } else { + Logger::info('No need to deliver to myself', ['uid' => $target_uid, 'guid' => $target_item['guid'], 'uri-id' => $target_item['uri-id'], 'uri' => $target_item['uri']]); + } + } else { + Logger::info('Remote item does not need to be delivered locally', ['guid' => $target_item['guid'], 'uri-id' => $target_item['uri-id'], 'uri' => $target_item['uri']]); + } continue; } + } elseif ((count($receivers) >= 1) && Network::isLocalLink($inbox)) { + Logger::info('Is this a thing?', ['guid' => $target_item['guid'], 'uri-id' => $target_item['uri-id'], 'uri' => $target_item['uri']]); } Logger::info('Delivery via ActivityPub', ['cmd' => $cmd, 'id' => $target_item['id'], 'inbox' => $inbox]); - if (Worker::add(['priority' => $priority, 'created' => $created, 'dont_fork' => true], - 'APDelivery', $cmd, $target_item['id'], $inbox, $uid, $receivers, $target_item['uri-id'])) { + if (DI::config()->get('system', 'bulk_delivery')) { $delivery_queue_count++; + Post\Delivery::add($target_item['uri-id'], $uid, $inbox, $target_item['created'], $cmd, $receivers); + Worker::add(Worker::PRIORITY_HIGH, 'APDelivery', '', 0, $inbox, 0); + } else { + if (Worker::add(['priority' => $priority, 'created' => $created, 'dont_fork' => true], + 'APDelivery', $cmd, $target_item['id'], $inbox, $uid, $receivers, $target_item['uri-id'])) { + $delivery_queue_count++; + } } + Worker::coolDown(); } // We deliver posts to relay servers slightly delayed to priorize the direct delivery foreach ($relay_inboxes as $inbox) { Logger::info('Delivery to relay servers via ActivityPub', ['cmd' => $cmd, 'id' => $target_item['id'], 'inbox' => $inbox]); - if (Worker::add(['priority' => $priority, 'dont_fork' => true], 'APDelivery', $cmd, $target_item['id'], $inbox, $uid, [], $target_item['uri-id'])) { + if (DI::config()->get('system', 'bulk_delivery')) { $delivery_queue_count++; + Post\Delivery::add($target_item['uri-id'], $uid, $inbox, $target_item['created'], $cmd, []); + Worker::add(Worker::PRIORITY_MEDIUM, 'APDelivery', '', 0, $inbox, 0); + } else { + if (Worker::add(['priority' => $priority, 'dont_fork' => true], 'APDelivery', $cmd, $target_item['id'], $inbox, $uid, [], $target_item['uri-id'])) { + $delivery_queue_count++; + } } + Worker::coolDown(); } return ['count' => $delivery_queue_count, 'contacts' => $contacts];