<?php
/**
- * @copyright Copyright (C) 2010-2022, the Friendica project
+ * @copyright Copyright (C) 2010-2023, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
use Friendica\Database\DBA;
use Friendica\DI;
use Friendica\Model\Contact;
+use Friendica\Model\GServer;
use Friendica\Model\Post;
use Friendica\Protocol\ActivityPub;
+use Friendica\Protocol\Delivery;
use Friendica\Util\DateTimeFormat;
use Friendica\Util\Strings;
// Remove old entries from the workerqueue
self::cleanWorkerQueue();
- // Directly deliver or requeue posts
+ // Directly deliver or requeue posts to ActivityPub systems
+ self::deliverAPPosts();
+
+ // Directly deliver or requeue posts to other systems
self::deliverPosts();
}
*
* This function is placed here as a safeguard. Even when the worker queue is completely blocked, messages will be delivered.
*/
- private static function deliverPosts()
+ private static function deliverAPPosts()
{
- $deliveries = DBA::p("SELECT `item-uri`.`uri` AS `inbox`, MAX(`failed`) AS `failed` FROM `post-delivery` INNER JOIN `item-uri` ON `item-uri`.`id` = `post-delivery`.`inbox-id` GROUP BY `inbox` ORDER BY RAND()");
+ $deliveries = DBA::p("SELECT `item-uri`.`uri` AS `inbox`, MAX(`gsid`) AS `gsid`, MAX(`shared`) AS `shared`, MAX(`failed`) AS `failed` FROM `post-delivery` INNER JOIN `item-uri` ON `item-uri`.`id` = `post-delivery`.`inbox-id` LEFT JOIN `inbox-status` ON `inbox-status`.`url` = `item-uri`.`uri` GROUP BY `inbox` ORDER BY RAND()");
while ($delivery = DBA::fetch($deliveries)) {
if ($delivery['failed'] > 0) {
Logger::info('Removing failed deliveries', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed']]);
Post\Delivery::removeFailed($delivery['inbox']);
}
-
- if ($delivery['failed'] == 0) {
+ if (($delivery['failed'] == 0) && $delivery['shared'] && !empty($delivery['gsid']) && GServer::isReachableById($delivery['gsid'])) {
$result = ActivityPub\Delivery::deliver($delivery['inbox']);
Logger::info('Directly deliver inbox', ['inbox' => $delivery['inbox'], 'result' => $result['success']]);
continue;
}
if (Worker::add(['priority' => $priority, 'force_priority' => true], 'APDelivery', '', 0, $delivery['inbox'], 0)) {
- Logger::info('Missing APDelivery worker added for inbox', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed'], 'priority' => $priority]);
+ Logger::info('Priority for APDelivery worker adjusted', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed'], 'priority' => $priority]);
}
}
+ DBA::close($deliveries);
+
// Optimizing this table only last seconds
if (DI::config()->get('system', 'optimize_tables')) {
Logger::info('Optimize start');
}
}
+ /**
+ * Directly deliver messages or requeue them.
+ */
+ private static function deliverPosts()
+ {
+ $deliveries = DBA::p("SELECT `gsid`, MAX(`failed`) AS `failed` FROM `delivery-queue` GROUP BY `gsid` ORDER BY RAND()");
+ while ($delivery = DBA::fetch($deliveries)) {
+ if ($delivery['failed'] > 0) {
+ Logger::info('Removing failed deliveries', ['gsid' => $delivery['gsid'], 'failed' => $delivery['failed']]);
+ Delivery::removeFailedQueue($delivery['gsid']);
+ }
+
+ if (($delivery['failed'] < 3) || GServer::isReachableById($delivery['gsid'])) {
+ $priority = Worker::PRIORITY_HIGH;
+ } elseif ($delivery['failed'] < 6) {
+ $priority = Worker::PRIORITY_MEDIUM;
+ } elseif ($delivery['failed'] < 8) {
+ $priority = Worker::PRIORITY_LOW;
+ } else {
+ $priority = Worker::PRIORITY_NEGLIGIBLE;
+ }
+
+ if (Worker::add(['priority' => $priority, 'force_priority' => true], 'BulkDelivery', $delivery['gsid'])) {
+ Logger::info('Priority for BulkDelivery worker adjusted', ['gsid' => $delivery['gsid'], 'failed' => $delivery['failed'], 'priority' => $priority]);
+ }
+ }
+
+ // Optimizing this table only last seconds
+ if (DI::config()->get('system', 'optimize_tables')) {
+ Logger::info('Optimize start');
+ DBA::e("OPTIMIZE TABLE `delivery-queue`");
+ Logger::info('Optimize end');
+ }
+ }
+
/**
* Add missing "intro" records.
*