}
$timeout = ($seconds >= $wait_interval);
- } while (!$timeout && !Worker::IPCJobsExists());
+ } while (!$timeout && !Worker\IPC::JobsExists());
if ($timeout) {
$do_cron = true;
namespace Friendica\Core;
-use Friendica\App\Mode;
use Friendica\Core\Worker\Entity\Process;
use Friendica\Database\DBA;
use Friendica\DI;
private static $lock_duration = 0;
private static $last_update;
private static $state;
- private static $daemon_mode = null;
/** @var Process */
private static $process;
$last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0);
if (time() > ($last_cleanup + 300)) {
DI::config()->set('system', 'worker_last_cleaned', time());
- self::killStaleWorkers();
+ Worker\Cron::killStaleWorkers();
}
// Check if the system is ready
// Now we start additional cron processes if we should do so
if ($run_cron) {
- self::runCron();
+ Worker\Cron::run();
}
$last_check = $starttime = time();
// We fetch the next queue entry that is about to be executed
while ($r = self::workerProcess()) {
- if (self::IPCJobsExists(getmypid())) {
- self::IPCDeleteJobState(getmypid());
+ if (Worker\IPC::JobsExists(getmypid())) {
+ Worker\IPC::DeleteJobState(getmypid());
}
// Don't refetch when a worker fetches tasks for multiple workers
$refetched = DI::config()->get('system', 'worker_multiple_fetch');
foreach ($r as $entry) {
- $entry = self::checkPriority($entry);
-
// The work will be done
if (!self::execute($entry)) {
Logger::notice('Process execution failed, quitting.');
if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
Logger::info('Process lifetime reached, respawning.');
self::unclaimProcess($process);
- if (self::isDaemonMode()) {
- self::IPCSetJobState(true);
+ if (Worker\Daemon::isMode()) {
+ Worker\IPC::SetJobState(true);
} else {
self::spawnWorker();
}
}
// Cleaning up. Possibly not needed, but it doesn't harm anything.
- if (self::isDaemonMode()) {
- self::IPCSetJobState(false);
+ if (Worker\Daemon::isMode()) {
+ Worker\IPC::SetJobState(false);
}
Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]);
}
- /**
- * Check and fix the priority of a worker task
- * @param array $entry
- * @return array
- */
- private static function checkPriority(array $entry)
- {
- $entry['priority'] = (int)$entry['priority'];
-
- if (!in_array($entry['priority'], PRIORITIES)) {
- Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
- DBA::update('workerqueue', ['priority' => PRIORITY_MEDIUM], ['id' => $entry['id']]);
- $entry['priority'] = PRIORITY_MEDIUM;
- }
-
- return $entry;
- }
-
/**
* Checks if the system is ready.
*
return true;
}
- /**
- * fix the queue entry if the worker process died
- *
- * @return void
- * @throws \Exception
- */
- private static function killStaleWorkers()
- {
- $stamp = (float)microtime(true);
- $entries = DBA::select(
- 'workerqueue',
- ['id', 'pid', 'executed', 'priority', 'command', 'parameter'],
- ['NOT `done` AND `pid` != 0'],
- ['order' => ['priority', 'retrial', 'created']]
- );
- self::$db_duration += (microtime(true) - $stamp);
-
- while ($entry = DBA::fetch($entries)) {
- $entry = self::checkPriority($entry);
-
- if (!posix_kill($entry["pid"], 0)) {
- $stamp = (float)microtime(true);
- DBA::update(
- 'workerqueue',
- ['executed' => DBA::NULL_DATETIME, 'pid' => 0],
- ['id' => $entry["id"]]
- );
- self::$db_duration += (microtime(true) - $stamp);
- self::$db_duration_write += (microtime(true) - $stamp);
- } else {
- // Kill long running processes
-
- // Define the maximum durations
- $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
- $max_duration = $max_duration_defaults[$entry['priority']];
-
- $argv = json_decode($entry['parameter'], true);
- if (!empty($entry['command'])) {
- $command = $entry['command'];
- } elseif (!empty($argv)) {
- $command = array_shift($argv);
- } else {
- return;
- }
-
- $command = basename($command);
-
- // How long is the process already running?
- $duration = (time() - strtotime($entry["executed"])) / 60;
- if ($duration > $max_duration) {
- Logger::notice('Worker process took too much time - killed', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
- posix_kill($entry["pid"], SIGTERM);
-
- // We killed the stale process.
- // To avoid a blocking situation we reschedule the process at the beginning of the queue.
- // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
- $new_priority = $entry['priority'];
- if ($entry['priority'] == PRIORITY_HIGH) {
- $new_priority = PRIORITY_MEDIUM;
- } elseif ($entry['priority'] == PRIORITY_MEDIUM) {
- $new_priority = PRIORITY_LOW;
- } elseif ($entry['priority'] != PRIORITY_CRITICAL) {
- $new_priority = PRIORITY_NEGLIGIBLE;
- }
- $stamp = (float)microtime(true);
- DBA::update(
- 'workerqueue',
- ['executed' => DBA::NULL_DATETIME, 'created' => DateTimeFormat::utcNow(), 'priority' => $new_priority, 'pid' => 0],
- ['id' => $entry["id"]]
- );
- self::$db_duration += (microtime(true) - $stamp);
- self::$db_duration_write += (microtime(true) - $stamp);
- } else {
- Logger::info('Process runtime is okay', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
- }
- }
- }
- DBA::close($entries);
- }
/**
* Checks if the number of active workers exceeds the given limits
// Are there fewer workers running as possible? Then fork a new one.
if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) {
Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]);
- if (self::isDaemonMode()) {
- self::IPCSetJobState(true);
+ if (Worker\Daemon::isMode()) {
+ Worker\IPC::SetJobState(true);
} else {
self::spawnWorker();
}
}
// if there are too much worker, we don't spawn a new one.
- if (self::isDaemonMode() && ($active > $queues)) {
- self::IPCSetJobState(false);
+ if (Worker\Daemon::isMode() && ($active > $queues)) {
+ Worker\IPC::SetJobState(false);
}
return $active > $queues;
self::$db_duration_write += (microtime(true) - $stamp);
}
- /**
- * Runs the cron processes
- *
- * @return void
- * @throws \Friendica\Network\HTTPException\InternalServerErrorException
- */
- private static function runCron()
- {
- Logger::info('Add cron entries');
-
- // Check for spooled items
- self::add(['priority' => PRIORITY_HIGH, 'force_priority' => true], 'SpoolPost');
-
- // Run the cron job that calls all other jobs
- self::add(['priority' => PRIORITY_MEDIUM, 'force_priority' => true], 'Cron');
-
- // Cleaning dead processes
- self::killStaleWorkers();
-
- // Remove old entries from the workerqueue
- self::cleanWorkerQueue();
- }
-
- /**
- * Remove old entries from the workerqueue
- *
- * @return void
- */
- private static function cleanWorkerQueue()
- {
- DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]);
-
- // Optimizing this table only last seconds
- if (DI::config()->get('system', 'optimize_tables')) {
- // We are acquiring the two locks from the worker to avoid locking problems
- if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) {
- if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) {
- DBA::e("OPTIMIZE TABLE `workerqueue`");
- DBA::e("OPTIMIZE TABLE `process`");
- DI::lock()->release(Worker::LOCK_WORKER);
- }
- DI::lock()->release(Worker::LOCK_PROCESS);
- }
- }
- }
-
/**
* Fork a child process
*
// The parent process continues here
DBA::connect();
- self::IPCSetJobState(true, $pid);
+ Worker\IPC::SetJobState(true, $pid);
Logger::info('Spawned new worker', ['pid' => $pid]);
$cycles = 0;
- while (self::IPCJobsExists($pid) && (++$cycles < 100)) {
+ while (Worker\IPC::JobsExists($pid) && (++$cycles < 100)) {
usleep(10000);
}
$process = DI::process()->create(getmypid(), basename(__FILE__));
$cycles = 0;
- while (!self::IPCJobsExists($process->pid) && (++$cycles < 100)) {
+ while (!Worker\IPC::JobsExists($process->pid) && (++$cycles < 100)) {
usleep(10000);
}
self::unclaimProcess($process);
- self::IPCSetJobState(false, $process->pid);
+ Worker\IPC::SetJobState(false, $process->pid);
DI::process()->delete($process);
Logger::info('Worker ended', ['pid' => $process->pid]);
exit();
*/
public static function spawnWorker($do_cron = false)
{
- if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) {
+ if (Worker\Daemon::isMode() && DI::config()->get('system', 'worker_fork')) {
self::forkProcess($do_cron);
} else {
DI::system()->run('bin/worker.php', ['no_cron' => !$do_cron]);
}
- if (self::isDaemonMode()) {
- self::IPCSetJobState(false);
+ if (Worker\Daemon::isMode()) {
+ Worker\IPC::SetJobState(false);
}
}
}
// Set the IPC flag to ensure an immediate process execution via daemon
- if (self::isDaemonMode()) {
- self::IPCSetJobState(true);
+ if (Worker\Daemon::isMode()) {
+ Worker\IPC::SetJobState(true);
}
- self::checkDaemonState();
+ Worker\Daemon::checkState();
// Should we quit and wait for the worker to be called as a cronjob?
if ($dont_fork) {
}
// Quit on daemon mode
- if (self::isDaemonMode()) {
+ if (Worker\Daemon::isMode()) {
return $added;
}
return false;
}
- $queue = self::checkPriority($queue);
-
$id = $queue['id'];
$priority = $queue['priority'];
return true;
}
- /**
- * Set the flag if some job is waiting
- *
- * @param boolean $jobs Is there a waiting job?
- * @param int $key Key number
- * @throws \Exception
- */
- public static function IPCSetJobState(bool $jobs, int $key = 0)
- {
- $stamp = (float)microtime(true);
- DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]);
- self::$db_duration += (microtime(true) - $stamp);
- self::$db_duration_write += (microtime(true) - $stamp);
- }
-
- /**
- * Delete a key entry
- *
- * @param int $key Key number
- * @throws \Exception
- */
- public static function IPCDeleteJobState(int $key)
- {
- $stamp = (float)microtime(true);
- DBA::delete('worker-ipc', ['key' => $key]);
- self::$db_duration += (microtime(true) - $stamp);
- self::$db_duration_write += (microtime(true) - $stamp);
- }
-
- /**
- * Checks if some worker job waits to be executed
- *
- * @param int $key Key number
- * @return bool
- * @throws \Exception
- */
- public static function IPCJobsExists(int $key = 0)
- {
- $stamp = (float)microtime(true);
- $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]);
- self::$db_duration += (microtime(true) - $stamp);
-
- // When we don't have a row, no job is running
- if (!DBA::isResult($row)) {
- return false;
- }
-
- return (bool)$row['jobs'];
- }
-
- /**
- * Checks if the worker is running in the daemon mode.
- *
- * @return boolean
- */
- public static function isDaemonMode()
- {
- if (!is_null(self::$daemon_mode)) {
- return self::$daemon_mode;
- }
-
- if (DI::mode()->getExecutor() == Mode::DAEMON) {
- return true;
- }
-
- $daemon_mode = DI::config()->get('system', 'worker_daemon_mode', false, true);
- if ($daemon_mode) {
- return $daemon_mode;
- }
-
- if (!function_exists('pcntl_fork')) {
- self::$daemon_mode = false;
- return false;
- }
-
- $pidfile = DI::config()->get('system', 'pidfile');
- if (empty($pidfile)) {
- // No pid file, no daemon
- self::$daemon_mode = false;
- return false;
- }
-
- if (!is_readable($pidfile)) {
- // No pid file. We assume that the daemon had been intentionally stopped.
- self::$daemon_mode = false;
- return false;
- }
-
- $pid = intval(file_get_contents($pidfile));
- $running = posix_kill($pid, 0);
-
- self::$daemon_mode = $running;
- return $running;
- }
-
- /**
- * Test if the daemon is running. If not, it will be started
- *
- * @return void
- */
- private static function checkDaemonState()
- {
- if (!DI::config()->get('system', 'daemon_watchdog', false)) {
- return;
- }
-
- if (!DI::mode()->isNormal()) {
- return;
- }
-
- // Check every minute if the daemon is running
- if (DI::config()->get('system', 'last_daemon_check', 0) + 60 > time()) {
- return;
- }
-
- DI::config()->set('system', 'last_daemon_check', time());
-
- $pidfile = DI::config()->get('system', 'pidfile');
- if (empty($pidfile)) {
- // No pid file, no daemon
- return;
- }
-
- if (!is_readable($pidfile)) {
- // No pid file. We assume that the daemon had been intentionally stopped.
- return;
- }
-
- $pid = intval(file_get_contents($pidfile));
- if (posix_kill($pid, 0)) {
- Logger::info('Daemon process is running', ['pid' => $pid]);
- return;
- }
-
- Logger::warning('Daemon process is not running', ['pid' => $pid]);
-
- self::spawnDaemon();
- }
-
- /**
- * Spawn a new daemon process
- *
- * @return void
- */
- private static function spawnDaemon()
- {
- Logger::notice('Starting new daemon process');
- $command = 'bin/daemon.php';
- $a = DI::app();
- DI::system()->run($command, ['start']);
- Logger::notice('New daemon process started');
- }
-
/**
* Check if the system is inside the defined maintenance window
*
--- /dev/null
+<?php
+/**
+ * @copyright Copyright (C) 2010-2022, the Friendica project
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace Friendica\Core\Worker;
+
+use Friendica\Core\Logger;
+use Friendica\Core\Worker;
+use Friendica\Database\DBA;
+use Friendica\DI;
+use Friendica\Model\Post;
+use Friendica\Protocol\ActivityPub;
+use Friendica\Util\DateTimeFormat;
+
+/**
+ * Contains the class for jobs that are executed in an interval
+ */
+class Cron
+{
+ /**
+ * Runs the cron processes
+ *
+ * @return void
+ * @throws \Friendica\Network\HTTPException\InternalServerErrorException
+ */
+ public static function run()
+ {
+ Logger::info('Add cron entries');
+
+ // Check for spooled items
+ Worker::add(['priority' => PRIORITY_HIGH, 'force_priority' => true], 'SpoolPost');
+
+ // Run the cron job that calls all other jobs
+ Worker::add(['priority' => PRIORITY_MEDIUM, 'force_priority' => true], 'Cron');
+
+ // Cleaning dead processes
+ self::killStaleWorkers();
+
+ // Remove old entries from the workerqueue
+ self::cleanWorkerQueue();
+
+ // Directly deliver or requeue posts
+ self::deliverPosts();
+ }
+
+ /**
+ * fix the queue entry if the worker process died
+ *
+ * @return void
+ * @throws \Exception
+ */
+ public static function killStaleWorkers()
+ {
+ $stamp = (float)microtime(true);
+ $entries = DBA::select(
+ 'workerqueue',
+ ['id', 'pid', 'executed', 'priority', 'command', 'parameter'],
+ ['NOT `done` AND `pid` != 0'],
+ ['order' => ['priority', 'retrial', 'created']]
+ );
+
+ while ($entry = DBA::fetch($entries)) {
+ if (!posix_kill($entry["pid"], 0)) {
+ $stamp = (float)microtime(true);
+ DBA::update(
+ 'workerqueue',
+ ['executed' => DBA::NULL_DATETIME, 'pid' => 0],
+ ['id' => $entry["id"]]
+ );
+ } else {
+ // Kill long running processes
+
+ // Define the maximum durations
+ $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
+ $max_duration = $max_duration_defaults[$entry['priority']];
+
+ $argv = json_decode($entry['parameter'], true);
+ if (!empty($entry['command'])) {
+ $command = $entry['command'];
+ } elseif (!empty($argv)) {
+ $command = array_shift($argv);
+ } else {
+ return;
+ }
+
+ $command = basename($command);
+
+ // How long is the process already running?
+ $duration = (time() - strtotime($entry["executed"])) / 60;
+ if ($duration > $max_duration) {
+ Logger::notice('Worker process took too much time - killed', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
+ posix_kill($entry["pid"], SIGTERM);
+
+ // We killed the stale process.
+ // To avoid a blocking situation we reschedule the process at the beginning of the queue.
+ // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
+ $new_priority = $entry['priority'];
+ if ($entry['priority'] == PRIORITY_HIGH) {
+ $new_priority = PRIORITY_MEDIUM;
+ } elseif ($entry['priority'] == PRIORITY_MEDIUM) {
+ $new_priority = PRIORITY_LOW;
+ } elseif ($entry['priority'] != PRIORITY_CRITICAL) {
+ $new_priority = PRIORITY_NEGLIGIBLE;
+ }
+ $stamp = (float)microtime(true);
+ DBA::update(
+ 'workerqueue',
+ ['executed' => DBA::NULL_DATETIME, 'created' => DateTimeFormat::utcNow(), 'priority' => $new_priority, 'pid' => 0],
+ ['id' => $entry["id"]]
+ );
+ } else {
+ Logger::info('Process runtime is okay', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
+ }
+ }
+ }
+ DBA::close($entries);
+ }
+
+ /**
+ * Remove old entries from the workerqueue
+ *
+ * @return void
+ */
+ private static function cleanWorkerQueue()
+ {
+ DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]);
+
+ // Optimizing this table only last seconds
+ if (DI::config()->get('system', 'optimize_tables')) {
+ // We are acquiring the two locks from the worker to avoid locking problems
+ if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) {
+ if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) {
+ DBA::e("OPTIMIZE TABLE `workerqueue`");
+ DBA::e("OPTIMIZE TABLE `process`");
+ DI::lock()->release(Worker::LOCK_WORKER);
+ }
+ DI::lock()->release(Worker::LOCK_PROCESS);
+ }
+ }
+ }
+
+ /**
+ * Directly deliver AP messages or requeue them.
+ *
+ * This function is placed here as a safeguard. Even when the worker queue is completely blocked, messages will be delivered.
+ */
+ private static function deliverPosts()
+ {
+ $deliveries = DBA::p("SELECT `item-uri`.`uri` AS `inbox`, MAX(`failed`) AS `failed` FROM `post-delivery` INNER JOIN `item-uri` ON `item-uri`.`id` = `post-delivery`.`inbox-id` GROUP BY `inbox`");
+ while ($delivery = DBA::fetch($deliveries)) {
+ if ($delivery['failed'] == 0) {
+ $result = ActivityPub\Delivery::deliver($delivery['inbox']);
+ Logger::info('Drectly deliver inbox', ['inbox' => $delivery['inbox'], 'result' => $result['success']]);
+ continue;
+ } elseif ($delivery['failed'] < 3) {
+ $priority = PRIORITY_HIGH;
+ } elseif ($delivery['failed'] < 6) {
+ $priority = PRIORITY_MEDIUM;
+ } elseif ($delivery['failed'] < 8) {
+ $priority = PRIORITY_LOW;
+ } {
+ $priority = PRIORITY_NEGLIGIBLE;
+ }
+
+ if ($delivery['failed'] >= DI::config()->get('system', 'worker_defer_limit')) {
+ Logger::info('Removing failed deliveries', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed']]);
+ Post\Delivery::removeFailed($delivery['inbox']);
+ }
+
+ if (Worker::add($priority, 'APDelivery', '', 0, $delivery['inbox'], 0)) {
+ Logger::info('Missing APDelivery worker added for inbox', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed'], 'priority' => $priority]);
+ }
+ }
+ }
+}
--- /dev/null
+<?php
+/**
+ * @copyright Copyright (C) 2010-2022, the Friendica project
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace Friendica\Core\Worker;
+
+use Friendica\App\Mode;
+use Friendica\Core\Logger;
+use Friendica\DI;
+
+/**
+ * Contains the class for the worker background job processing
+ */
+class Daemon
+{
+ private static $daemon_mode = null;
+
+ /**
+ * Checks if the worker is running in the daemon mode.
+ *
+ * @return boolean
+ */
+ public static function isMode()
+ {
+ if (!is_null(self::$daemon_mode)) {
+ return self::$daemon_mode;
+ }
+
+ if (DI::mode()->getExecutor() == Mode::DAEMON) {
+ return true;
+ }
+
+ $daemon_mode = DI::config()->get('system', 'worker_daemon_mode', false, true);
+ if ($daemon_mode) {
+ return $daemon_mode;
+ }
+
+ if (!function_exists('pcntl_fork')) {
+ self::$daemon_mode = false;
+ return false;
+ }
+
+ $pidfile = DI::config()->get('system', 'pidfile');
+ if (empty($pidfile)) {
+ // No pid file, no daemon
+ self::$daemon_mode = false;
+ return false;
+ }
+
+ if (!is_readable($pidfile)) {
+ // No pid file. We assume that the daemon had been intentionally stopped.
+ self::$daemon_mode = false;
+ return false;
+ }
+
+ $pid = intval(file_get_contents($pidfile));
+ $running = posix_kill($pid, 0);
+
+ self::$daemon_mode = $running;
+ return $running;
+ }
+
+ /**
+ * Test if the daemon is running. If not, it will be started
+ *
+ * @return void
+ */
+ public static function checkState()
+ {
+ if (!DI::config()->get('system', 'daemon_watchdog', false)) {
+ return;
+ }
+
+ if (!DI::mode()->isNormal()) {
+ return;
+ }
+
+ // Check every minute if the daemon is running
+ if (DI::config()->get('system', 'last_daemon_check', 0) + 60 > time()) {
+ return;
+ }
+
+ DI::config()->set('system', 'last_daemon_check', time());
+
+ $pidfile = DI::config()->get('system', 'pidfile');
+ if (empty($pidfile)) {
+ // No pid file, no daemon
+ return;
+ }
+
+ if (!is_readable($pidfile)) {
+ // No pid file. We assume that the daemon had been intentionally stopped.
+ return;
+ }
+
+ $pid = intval(file_get_contents($pidfile));
+ if (posix_kill($pid, 0)) {
+ Logger::info('Daemon process is running', ['pid' => $pid]);
+ return;
+ }
+
+ Logger::warning('Daemon process is not running', ['pid' => $pid]);
+
+ self::spawn();
+ }
+
+ /**
+ * Spawn a new daemon process
+ *
+ * @return void
+ */
+ private static function spawn()
+ {
+ Logger::notice('Starting new daemon process');
+ $command = 'bin/daemon.php';
+ $a = DI::app();
+ DI::system()->run($command, ['start']);
+ Logger::notice('New daemon process started');
+ }
+}
--- /dev/null
+<?php
+/**
+ * @copyright Copyright (C) 2010-2022, the Friendica project
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace Friendica\Core\Worker;
+
+use Friendica\Database\DBA;
+
+/**
+ * Contains the class for the inter process communication
+ */
+class IPC
+{
+ /**
+ * Set the flag if some job is waiting
+ *
+ * @param boolean $jobs Is there a waiting job?
+ * @param int $key Key number
+ * @throws \Exception
+ */
+ public static function SetJobState(bool $jobs, int $key = 0)
+ {
+ $stamp = (float)microtime(true);
+ DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]);
+ }
+
+ /**
+ * Delete a key entry
+ *
+ * @param int $key Key number
+ * @throws \Exception
+ */
+ public static function DeleteJobState(int $key)
+ {
+ $stamp = (float)microtime(true);
+ DBA::delete('worker-ipc', ['key' => $key]);
+ }
+
+ /**
+ * Checks if some worker job waits to be executed
+ *
+ * @param int $key Key number
+ * @return bool
+ * @throws \Exception
+ */
+ public static function JobsExists(int $key = 0)
+ {
+ $stamp = (float)microtime(true);
+ $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]);
+
+ // When we don't have a row, no job is running
+ if (!DBA::isResult($row)) {
+ return false;
+ }
+
+ return (bool)$row['jobs'];
+ }
+}
--- /dev/null
+<?php
+/**
+ * @copyright Copyright (C) 2010-2022, the Friendica project
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace Friendica\Protocol\ActivityPub;
+
+use Friendica\Core\Logger;
+use Friendica\DI;
+use Friendica\Model\Contact;
+use Friendica\Model\GServer;
+use Friendica\Model\Post;
+use Friendica\Protocol\ActivityPub;
+use Friendica\Util\HTTPSignature;
+use Friendica\Worker\Delivery as WorkerDelivery;
+
+class Delivery
+{
+ public static function deliver(string $inbox):array
+ {
+ $uri_ids = [];
+ $posts = Post\Delivery::selectForInbox($inbox);
+ $serverfail = false;
+
+ foreach ($posts as $post) {
+ if (!$serverfail) {
+ $result = self::deliverToInbox($post['command'], 0, $inbox, $post['uid'], $post['receivers'], $post['uri-id']);
+
+ if ($result['serverfailure']) {
+ // In a timeout situation we assume that every delivery to that inbox will time out.
+ // So we set the flag and try all deliveries at a later time.
+ Logger::info('Inbox delivery has a server failure', ['inbox' => $inbox]);
+ $serverfail = true;
+ }
+ }
+
+ if ($serverfail || !$result['success']) {
+ $uri_ids[] = $post['uri-id'];
+ }
+ }
+
+ Logger::debug('Inbox delivery done', ['inbox' => $inbox, 'posts' => count($posts), 'failed' => count($uri_ids), 'serverfailure' => $serverfail]);
+ return ['success' => empty($uri_ids), 'uri_ids' => $uri_ids];
+ }
+
+ public static function deliverToInbox(string $cmd, int $item_id, string $inbox, int $uid, array $receivers, int $uri_id): array
+ {
+ if (empty($item_id) && !empty($uri_id) && !empty($uid)) {
+ $item = Post::selectFirst(['id', 'parent', 'origin'], ['uri-id' => $uri_id, 'uid' => [$uid, 0]], ['order' => ['uid' => true]]);
+ if (empty($item['id'])) {
+ Logger::notice('Item not found, removing delivery', ['uri-id' => $uri_id, 'uid' => $uid, 'cmd' => $cmd, 'inbox' => $inbox]);
+ Post\Delivery::remove($uri_id, $inbox);
+ return true;
+ } else {
+ $item_id = $item['id'];
+ }
+ }
+
+ $success = true;
+ $serverfail = false;
+
+ if ($cmd == WorkerDelivery::MAIL) {
+ $data = ActivityPub\Transmitter::createActivityFromMail($item_id);
+ if (!empty($data)) {
+ $success = HTTPSignature::transmit($data, $inbox, $uid);
+ }
+ } elseif ($cmd == WorkerDelivery::SUGGESTION) {
+ $success = ActivityPub\Transmitter::sendContactSuggestion($uid, $inbox, $item_id);
+ } elseif ($cmd == WorkerDelivery::RELOCATION) {
+ // @todo Implementation pending
+ } elseif ($cmd == WorkerDelivery::POKE) {
+ // Implementation not planned
+ } elseif ($cmd == WorkerDelivery::REMOVAL) {
+ $success = ActivityPub\Transmitter::sendProfileDeletion($uid, $inbox);
+ } elseif ($cmd == WorkerDelivery::PROFILEUPDATE) {
+ $success = ActivityPub\Transmitter::sendProfileUpdate($uid, $inbox);
+ } else {
+ $data = ActivityPub\Transmitter::createCachedActivityFromItem($item_id);
+ if (!empty($data)) {
+ $timestamp = microtime(true);
+ $response = HTTPSignature::post($data, $inbox, $uid);
+ $runtime = microtime(true) - $timestamp;
+ $success = $response->isSuccess();
+ $serverfail = $response->isTimeout();
+ if (!$success) {
+ if (!$serverfail && ($response->getReturnCode() >= 500) && ($response->getReturnCode() <= 599)) {
+ $serverfail = true;
+ }
+
+ $xrd_timeout = DI::config()->get('system', 'xrd_timeout');
+ if (!$serverfail && $xrd_timeout && ($runtime > $xrd_timeout)) {
+ $serverfail = true;
+ }
+ $curl_timeout = DI::config()->get('system', 'curl_timeout');
+ if (!$serverfail && $curl_timeout && ($runtime > $curl_timeout)) {
+ $serverfail = true;
+ }
+
+ Logger::info('Delivery failed', ['retcode' => $response->getReturnCode(), 'serverfailure' => $serverfail, 'runtime' => round($runtime, 3), 'uri-id' => $uri_id, 'uid' => $uid, 'item_id' => $item_id, 'cmd' => $cmd, 'inbox' => $inbox]);
+ }
+ if ($uri_id) {
+ if ($success) {
+ Post\Delivery::remove($uri_id, $inbox);
+ } else {
+ Post\Delivery::incrementFailed($uri_id, $inbox);
+ }
+ }
+ }
+ }
+
+ self::setSuccess($receivers, $success);
+
+ Logger::debug('Delivered', ['uri-id' => $uri_id, 'uid' => $uid, 'item_id' => $item_id, 'cmd' => $cmd, 'inbox' => $inbox, 'success' => $success]);
+
+ if ($success && in_array($cmd, [WorkerDelivery::POST])) {
+ Post\DeliveryData::incrementQueueDone($uri_id, Post\DeliveryData::ACTIVITYPUB);
+ }
+
+ return ['success' => $success, 'serverfailure' => $serverfail];
+ }
+
+ private static function setSuccess(array $receivers, bool $success)
+ {
+ $gsid = null;
+
+ foreach ($receivers as $receiver) {
+ $contact = Contact::getById($receiver);
+ if (empty($contact)) {
+ continue;
+ }
+
+ $gsid = $gsid ?: $contact['gsid'];
+
+ if ($success) {
+ Contact::unmarkForArchival($contact);
+ } else {
+ Contact::markForArchival($contact);
+ }
+ }
+
+ if (!empty($gsid)) {
+ GServer::setProtocol($gsid, Post\DeliveryData::ACTIVITYPUB);
+ }
+ }
+}
use Friendica\Core\Logger;
use Friendica\Core\Worker;
-use Friendica\DI;
-use Friendica\Model\Contact;
-use Friendica\Model\GServer;
use Friendica\Model\Post;
use Friendica\Protocol\ActivityPub;
-use Friendica\Util\HTTPSignature;
-
class APDelivery
{
/**
Logger::debug('Invoked', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $item_id, 'uri-id' => $uri_id, 'uid' => $uid]);
if (empty($uri_id)) {
- $result = self::deliver($inbox);
+ $result = ActivityPub\Delivery::deliver($inbox);
$success = $result['success'];
$uri_ids = $result['uri_ids'];
} else {
- $result = self::deliverToInbox($cmd, $item_id, $inbox, $uid, $receivers, $uri_id);
+ $result = ActivityPub\Delivery::deliverToInbox($cmd, $item_id, $inbox, $uid, $receivers, $uri_id);
$success = $result['success'];
$uri_ids = [$uri_id];
}
}
}
}
-
- private static function deliver(string $inbox):array
- {
- $uri_ids = [];
- $posts = Post\Delivery::selectForInbox($inbox);
- $serverfail = false;
-
- foreach ($posts as $post) {
- if (!$serverfail) {
- $result = self::deliverToInbox($post['command'], 0, $inbox, $post['uid'], $post['receivers'], $post['uri-id']);
-
- if ($result['serverfailure']) {
- // In a timeout situation we assume that every delivery to that inbox will time out.
- // So we set the flag and try all deliveries at a later time.
- Logger::info('Inbox delivery has a server failure', ['inbox' => $inbox]);
- $serverfail = true;
- }
- }
-
- if ($serverfail || !$result['success']) {
- $uri_ids[] = $post['uri-id'];
- }
- }
-
- Logger::debug('Inbox delivery done', ['inbox' => $inbox, 'posts' => count($posts), 'failed' => count($uri_ids), 'serverfailure' => $serverfail]);
- return ['success' => empty($uri_ids), 'uri_ids' => $uri_ids];
- }
-
- private static function deliverToInbox(string $cmd, int $item_id, string $inbox, int $uid, array $receivers, int $uri_id): array
- {
- if (empty($item_id) && !empty($uri_id) && !empty($uid)) {
- $item = Post::selectFirst(['id', 'parent', 'origin'], ['uri-id' => $uri_id, 'uid' => [$uid, 0]], ['order' => ['uid' => true]]);
- if (empty($item['id'])) {
- Logger::notice('Item not found, removing delivery', ['uri-id' => $uri_id, 'uid' => $uid, 'cmd' => $cmd, 'inbox' => $inbox]);
- Post\Delivery::remove($uri_id, $inbox);
- return true;
- } else {
- $item_id = $item['id'];
- }
- }
-
- $success = true;
- $serverfail = false;
-
- if ($cmd == Delivery::MAIL) {
- $data = ActivityPub\Transmitter::createActivityFromMail($item_id);
- if (!empty($data)) {
- $success = HTTPSignature::transmit($data, $inbox, $uid);
- }
- } elseif ($cmd == Delivery::SUGGESTION) {
- $success = ActivityPub\Transmitter::sendContactSuggestion($uid, $inbox, $item_id);
- } elseif ($cmd == Delivery::RELOCATION) {
- // @todo Implementation pending
- } elseif ($cmd == Delivery::POKE) {
- // Implementation not planned
- } elseif ($cmd == Delivery::REMOVAL) {
- $success = ActivityPub\Transmitter::sendProfileDeletion($uid, $inbox);
- } elseif ($cmd == Delivery::PROFILEUPDATE) {
- $success = ActivityPub\Transmitter::sendProfileUpdate($uid, $inbox);
- } else {
- $data = ActivityPub\Transmitter::createCachedActivityFromItem($item_id);
- if (!empty($data)) {
- $timestamp = microtime(true);
- $response = HTTPSignature::post($data, $inbox, $uid);
- $runtime = microtime(true) - $timestamp;
- $success = $response->isSuccess();
- $serverfail = $response->isTimeout();
- if (!$success) {
- if (!$serverfail && ($response->getReturnCode() >= 500) && ($response->getReturnCode() <= 599)) {
- $serverfail = true;
- }
-
- $xrd_timeout = DI::config()->get('system', 'xrd_timeout');
- if (!$serverfail && $xrd_timeout && ($runtime > $xrd_timeout)) {
- $serverfail = true;
- }
- $curl_timeout = DI::config()->get('system', 'curl_timeout');
- if (!$serverfail && $curl_timeout && ($runtime > $curl_timeout)) {
- $serverfail = true;
- }
-
- Logger::info('Delivery failed', ['retcode' => $response->getReturnCode(), 'serverfailure' => $serverfail, 'runtime' => round($runtime, 3), 'uri-id' => $uri_id, 'uid' => $uid, 'item_id' => $item_id, 'cmd' => $cmd, 'inbox' => $inbox]);
- }
- if ($uri_id) {
- if ($success) {
- Post\Delivery::remove($uri_id, $inbox);
- } else {
- Post\Delivery::incrementFailed($uri_id, $inbox);
- }
- }
- }
- }
-
- self::setSuccess($receivers, $success);
-
- Logger::debug('Delivered', ['uri-id' => $uri_id, 'uid' => $uid, 'item_id' => $item_id, 'cmd' => $cmd, 'inbox' => $inbox, 'success' => $success]);
-
- if ($success && in_array($cmd, [Delivery::POST])) {
- Post\DeliveryData::incrementQueueDone($uri_id, Post\DeliveryData::ACTIVITYPUB);
- }
-
- return ['success' => $success, 'serverfailure' => $serverfail];
- }
-
- private static function setSuccess(array $receivers, bool $success)
- {
- $gsid = null;
-
- foreach ($receivers as $receiver) {
- $contact = Contact::getById($receiver);
- if (empty($contact)) {
- continue;
- }
-
- $gsid = $gsid ?: $contact['gsid'];
-
- if ($success) {
- Contact::unmarkForArchival($contact);
- } else {
- Contact::markForArchival($contact);
- }
- }
-
- if (!empty($gsid)) {
- GServer::setProtocol($gsid, Post\DeliveryData::ACTIVITYPUB);
- }
- }
}
// Clear cache entries
Worker::add(PRIORITY_LOW, 'ClearCache');
- // Requeue posts from the post delivery entries
- Worker::add(PRIORITY_MEDIUM, 'RequeuePosts');
-
DI::config()->set('system', 'last_cron_hourly', time());
}
+++ /dev/null
-<?php
-/**
- * @copyright Copyright (C) 2010-2022, the Friendica project
- *
- * @license GNU AGPL version 3 or any later version
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <https://www.gnu.org/licenses/>.
- *
- */
-
-namespace Friendica\Worker;
-
-use Friendica\Core\Logger;
-use Friendica\Core\Worker;
-use Friendica\Database\DBA;
-use Friendica\Model\Post;
-
-/**
- * Requeue posts that are stuck in the post-delivery table without a matching delivery job.
- * This should not happen in regular situations, this is a precaution.
- */
-class RequeuePosts
-{
- public static function execute()
- {
- $deliveries = DBA::p("SELECT `item-uri`.`uri` AS `inbox` FROM `post-delivery` INNER JOIN `item-uri` ON `item-uri`.`id` = `post-delivery`.`inbox-id` GROUP BY `inbox`");
- while ($delivery = DBA::fetch($deliveries)) {
- Post\Delivery::removeFailed($delivery['inbox']);
-
- if (Worker::add(PRIORITY_HIGH, 'APDelivery', '', 0, $delivery['inbox'], 0)) {
- Logger::info('Missing APDelivery worker added for inbox', ['inbox' => $delivery['inbox']]);
- }
- }
- DBA::close($deliveries);
- }
-}