]> git.mxchange.org Git - friendica.git/commitdiff
The worker is split into several classes
authorMichael <heluecht@pirati.ca>
Thu, 19 May 2022 19:24:21 +0000 (19:24 +0000)
committerMichael <heluecht@pirati.ca>
Thu, 19 May 2022 19:24:21 +0000 (19:24 +0000)
bin/daemon.php
src/Core/Worker.php
src/Core/Worker/Cron.php [new file with mode: 0644]
src/Core/Worker/Daemon.php [new file with mode: 0644]
src/Core/Worker/IPC.php [new file with mode: 0644]
src/Protocol/ActivityPub/Delivery.php [new file with mode: 0644]
src/Worker/APDelivery.php
src/Worker/Cron.php
src/Worker/RequeuePosts.php [deleted file]

index c7a16a9e8d5b0952f732160ec1e63ad977ba291a..2173be1862f47735146484b3009030f06a022988 100755 (executable)
@@ -230,7 +230,7 @@ while (true) {
                }
 
                $timeout = ($seconds >= $wait_interval);
-       } while (!$timeout && !Worker::IPCJobsExists());
+       } while (!$timeout && !Worker\IPC::JobsExists());
 
        if ($timeout) {
                $do_cron = true;
index e29cf765a4fa38c5885798ef396cc0c6a9b8f89a..bc52843e693fee2699458b146e116c484a99b38d 100644 (file)
@@ -21,7 +21,6 @@
 
 namespace Friendica\Core;
 
-use Friendica\App\Mode;
 use Friendica\Core\Worker\Entity\Process;
 use Friendica\Database\DBA;
 use Friendica\DI;
@@ -50,7 +49,6 @@ class Worker
        private static $lock_duration = 0;
        private static $last_update;
        private static $state;
-       private static $daemon_mode = null;
        /** @var Process */
        private static $process;
 
@@ -79,7 +77,7 @@ class Worker
                $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0);
                if (time() > ($last_cleanup + 300)) {
                        DI::config()->set('system', 'worker_last_cleaned', time());
-                       self::killStaleWorkers();
+                       Worker\Cron::killStaleWorkers();
                }
 
                // Check if the system is ready
@@ -89,7 +87,7 @@ class Worker
 
                // Now we start additional cron processes if we should do so
                if ($run_cron) {
-                       self::runCron();
+                       Worker\Cron::run();
                }
 
                $last_check = $starttime = time();
@@ -97,15 +95,13 @@ class Worker
 
                // We fetch the next queue entry that is about to be executed
                while ($r = self::workerProcess()) {
-                       if (self::IPCJobsExists(getmypid())) {
-                               self::IPCDeleteJobState(getmypid());
+                       if (Worker\IPC::JobsExists(getmypid())) {
+                               Worker\IPC::DeleteJobState(getmypid());
                        }
 
                        // Don't refetch when a worker fetches tasks for multiple workers
                        $refetched = DI::config()->get('system', 'worker_multiple_fetch');
                        foreach ($r as $entry) {
-                               $entry = self::checkPriority($entry);
-
                                // The work will be done
                                if (!self::execute($entry)) {
                                        Logger::notice('Process execution failed, quitting.');
@@ -150,8 +146,8 @@ class Worker
                        if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
                                Logger::info('Process lifetime reached, respawning.');
                                self::unclaimProcess($process);
-                               if (self::isDaemonMode()) {
-                                       self::IPCSetJobState(true);
+                               if (Worker\Daemon::isMode()) {
+                                       Worker\IPC::SetJobState(true);
                                } else {
                                        self::spawnWorker();
                                }
@@ -160,30 +156,12 @@ class Worker
                }
 
                // Cleaning up. Possibly not needed, but it doesn't harm anything.
-               if (self::isDaemonMode()) {
-                       self::IPCSetJobState(false);
+               if (Worker\Daemon::isMode()) {
+                       Worker\IPC::SetJobState(false);
                }
                Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]);
        }
 
-       /**
-        * Check and fix the priority of a worker task
-        * @param array $entry
-        * @return array
-        */
-       private static function checkPriority(array $entry)
-       {
-               $entry['priority'] = (int)$entry['priority'];
-
-               if (!in_array($entry['priority'], PRIORITIES)) {
-                       Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
-                       DBA::update('workerqueue', ['priority' => PRIORITY_MEDIUM], ['id' => $entry['id']]);
-                       $entry['priority'] = PRIORITY_MEDIUM;
-               }
-
-               return $entry;
-       }
-
        /**
         * Checks if the system is ready.
         *
@@ -642,85 +620,6 @@ class Worker
                return true;
        }
 
-       /**
-        * fix the queue entry if the worker process died
-        *
-        * @return void
-        * @throws \Exception
-        */
-       private static function killStaleWorkers()
-       {
-               $stamp = (float)microtime(true);
-               $entries = DBA::select(
-                       'workerqueue',
-                       ['id', 'pid', 'executed', 'priority', 'command', 'parameter'],
-                       ['NOT `done` AND `pid` != 0'],
-                       ['order' => ['priority', 'retrial', 'created']]
-               );
-               self::$db_duration += (microtime(true) - $stamp);
-
-               while ($entry = DBA::fetch($entries)) {
-                       $entry = self::checkPriority($entry);
-
-                       if (!posix_kill($entry["pid"], 0)) {
-                               $stamp = (float)microtime(true);
-                               DBA::update(
-                                       'workerqueue',
-                                       ['executed' => DBA::NULL_DATETIME, 'pid' => 0],
-                                       ['id' => $entry["id"]]
-                               );
-                               self::$db_duration += (microtime(true) - $stamp);
-                               self::$db_duration_write += (microtime(true) - $stamp);
-                       } else {
-                               // Kill long running processes
-
-                               // Define the maximum durations
-                               $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
-                               $max_duration = $max_duration_defaults[$entry['priority']];
-
-                               $argv = json_decode($entry['parameter'], true);
-                               if (!empty($entry['command'])) {
-                                       $command = $entry['command'];
-                               } elseif (!empty($argv)) {
-                                       $command = array_shift($argv);
-                               } else {
-                                       return;
-                               }
-
-                               $command = basename($command);
-
-                               // How long is the process already running?
-                               $duration = (time() - strtotime($entry["executed"])) / 60;
-                               if ($duration > $max_duration) {
-                                       Logger::notice('Worker process took too much time - killed', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
-                                       posix_kill($entry["pid"], SIGTERM);
-
-                                       // We killed the stale process.
-                                       // To avoid a blocking situation we reschedule the process at the beginning of the queue.
-                                       // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
-                                       $new_priority = $entry['priority'];
-                                       if ($entry['priority'] == PRIORITY_HIGH) {
-                                               $new_priority = PRIORITY_MEDIUM;
-                                       } elseif ($entry['priority'] == PRIORITY_MEDIUM) {
-                                               $new_priority = PRIORITY_LOW;
-                                       } elseif ($entry['priority'] != PRIORITY_CRITICAL) {
-                                               $new_priority = PRIORITY_NEGLIGIBLE;
-                                       }
-                                       $stamp = (float)microtime(true);
-                                       DBA::update(
-                                               'workerqueue',
-                                               ['executed' => DBA::NULL_DATETIME, 'created' => DateTimeFormat::utcNow(), 'priority' => $new_priority, 'pid' => 0],
-                                               ['id' => $entry["id"]]
-                                       );
-                                       self::$db_duration += (microtime(true) - $stamp);
-                                       self::$db_duration_write += (microtime(true) - $stamp);
-                               } else {
-                                       Logger::info('Process runtime is okay', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
-                               }
-                       }
-               }
-               DBA::close($entries);
-       }
 
        /**
         * Checks if the number of active workers exceeds the given limits
@@ -830,8 +729,8 @@ class Worker
                        // Are there fewer workers running as possible? Then fork a new one.
                        if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) {
                                Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]);
-                               if (self::isDaemonMode()) {
-                                       self::IPCSetJobState(true);
+                               if (Worker\Daemon::isMode()) {
+                                       Worker\IPC::SetJobState(true);
                                } else {
                                        self::spawnWorker();
                                }
@@ -839,8 +738,8 @@ class Worker
                }
 
                // if there are too much worker, we don't spawn a new one.
-               if (self::isDaemonMode() && ($active > $queues)) {
-                       self::IPCSetJobState(false);
+               if (Worker\Daemon::isMode() && ($active > $queues)) {
+                       Worker\IPC::SetJobState(false);
                }
 
                return $active > $queues;
@@ -1131,52 +1030,6 @@ class Worker
                self::$db_duration_write += (microtime(true) - $stamp);
        }
 
-       /**
-        * Runs the cron processes
-        *
-        * @return void
-        * @throws \Friendica\Network\HTTPException\InternalServerErrorException
-        */
-       private static function runCron()
-       {
-               Logger::info('Add cron entries');
-
-               // Check for spooled items
-               self::add(['priority' => PRIORITY_HIGH, 'force_priority' => true], 'SpoolPost');
-
-               // Run the cron job that calls all other jobs
-               self::add(['priority' => PRIORITY_MEDIUM, 'force_priority' => true], 'Cron');
-
-               // Cleaning dead processes
-               self::killStaleWorkers();
-
-               // Remove old entries from the workerqueue
-               self::cleanWorkerQueue();
-       }
-
-       /**
-        * Remove old entries from the workerqueue
-        *
-        * @return void
-        */
-       private static function cleanWorkerQueue()
-       {
-               DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]);
-
-               // Optimizing this table only last seconds
-               if (DI::config()->get('system', 'optimize_tables')) {
-                       // We are acquiring the two locks from the worker to avoid locking problems
-                       if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) {
-                               if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) {
-                                       DBA::e("OPTIMIZE TABLE `workerqueue`");
-                                       DBA::e("OPTIMIZE TABLE `process`");
-                                       DI::lock()->release(Worker::LOCK_WORKER);
-                               }
-                               DI::lock()->release(Worker::LOCK_PROCESS);
-                       }
-               }
-       }
-
        /**
         * Fork a child process
         *
@@ -1202,11 +1055,11 @@ class Worker
                        // The parent process continues here
                        DBA::connect();
 
-                       self::IPCSetJobState(true, $pid);
+                       Worker\IPC::SetJobState(true, $pid);
                        Logger::info('Spawned new worker', ['pid' => $pid]);
 
                        $cycles = 0;
-                       while (self::IPCJobsExists($pid) && (++$cycles < 100)) {
+                       while (Worker\IPC::JobsExists($pid) && (++$cycles < 100)) {
                                usleep(10000);
                        }
 
@@ -1221,7 +1074,7 @@ class Worker
                $process = DI::process()->create(getmypid(), basename(__FILE__));
 
                $cycles = 0;
-               while (!self::IPCJobsExists($process->pid) && (++$cycles < 100)) {
+               while (!Worker\IPC::JobsExists($process->pid) && (++$cycles < 100)) {
                        usleep(10000);
                }
 
@@ -1231,7 +1084,7 @@ class Worker
 
                self::unclaimProcess($process);
 
-               self::IPCSetJobState(false, $process->pid);
+               Worker\IPC::SetJobState(false, $process->pid);
                DI::process()->delete($process);
                Logger::info('Worker ended', ['pid' => $process->pid]);
                exit();
@@ -1246,13 +1099,13 @@ class Worker
         */
        public static function spawnWorker($do_cron = false)
        {
-               if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) {
+               if (Worker\Daemon::isMode() && DI::config()->get('system', 'worker_fork')) {
                        self::forkProcess($do_cron);
                } else {
                        DI::system()->run('bin/worker.php', ['no_cron' => !$do_cron]);
                }
-               if (self::isDaemonMode()) {
-                       self::IPCSetJobState(false);
+               if (Worker\Daemon::isMode()) {
+                       Worker\IPC::SetJobState(false);
                }
        }
 
@@ -1343,11 +1196,11 @@ class Worker
                }
 
                // Set the IPC flag to ensure an immediate process execution via daemon
-               if (self::isDaemonMode()) {
-                       self::IPCSetJobState(true);
+               if (Worker\Daemon::isMode()) {
+                       Worker\IPC::SetJobState(true);
                }
 
-               self::checkDaemonState();
+               Worker\Daemon::checkState();
 
                // Should we quit and wait for the worker to be called as a cronjob?
                if ($dont_fork) {
@@ -1368,7 +1221,7 @@ class Worker
                }
 
                // Quit on daemon mode
-               if (self::isDaemonMode()) {
+               if (Worker\Daemon::isMode()) {
                        return $added;
                }
 
@@ -1423,8 +1276,6 @@ class Worker
                        return false;
                }
 
-               $queue = self::checkPriority($queue);
-
                $id = $queue['id'];
                $priority = $queue['priority'];
 
@@ -1460,159 +1311,6 @@ class Worker
                return true;
        }
 
-       /**
-        * Set the flag if some job is waiting
-        *
-        * @param boolean $jobs Is there a waiting job?
-        * @param int $key Key number
-        * @throws \Exception
-        */
-       public static function IPCSetJobState(bool $jobs, int $key = 0)
-       {
-               $stamp = (float)microtime(true);
-               DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]);
-               self::$db_duration += (microtime(true) - $stamp);
-               self::$db_duration_write += (microtime(true) - $stamp);
-       }
-
-       /**
-        * Delete a key entry
-        *
-        * @param int $key Key number
-        * @throws \Exception
-        */
-       public static function IPCDeleteJobState(int $key)
-       {
-               $stamp = (float)microtime(true);
-               DBA::delete('worker-ipc', ['key' => $key]);
-               self::$db_duration += (microtime(true) - $stamp);
-               self::$db_duration_write += (microtime(true) - $stamp);
-       }
-
-       /**
-        * Checks if some worker job waits to be executed
-        *
-        * @param int $key Key number
-        * @return bool
-        * @throws \Exception
-        */
-       public static function IPCJobsExists(int $key = 0)
-       {
-               $stamp = (float)microtime(true);
-               $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]);
-               self::$db_duration += (microtime(true) - $stamp);
-
-               // When we don't have a row, no job is running
-               if (!DBA::isResult($row)) {
-                       return false;
-               }
-
-               return (bool)$row['jobs'];
-       }
-
-       /**
-        * Checks if the worker is running in the daemon mode.
-        *
-        * @return boolean
-        */
-       public static function isDaemonMode()
-       {
-               if (!is_null(self::$daemon_mode)) {
-                       return self::$daemon_mode;
-               }
-
-               if (DI::mode()->getExecutor() == Mode::DAEMON) {
-                       return true;
-               }
-
-               $daemon_mode = DI::config()->get('system', 'worker_daemon_mode', false, true);
-               if ($daemon_mode) {
-                       return $daemon_mode;
-               }
-
-               if (!function_exists('pcntl_fork')) {
-                       self::$daemon_mode = false;
-                       return false;
-               }
-
-               $pidfile = DI::config()->get('system', 'pidfile');
-               if (empty($pidfile)) {
-                       // No pid file, no daemon
-                       self::$daemon_mode = false;
-                       return false;
-               }
-
-               if (!is_readable($pidfile)) {
-                       // No pid file. We assume that the daemon had been intentionally stopped.
-                       self::$daemon_mode = false;
-                       return false;
-               }
-
-               $pid = intval(file_get_contents($pidfile));
-               $running = posix_kill($pid, 0);
-
-               self::$daemon_mode = $running;
-               return $running;
-       }
-
-       /**
-        * Test if the daemon is running. If not, it will be started
-        *
-        * @return void
-        */
-       private static function checkDaemonState()
-       {
-               if (!DI::config()->get('system', 'daemon_watchdog', false)) {
-                       return;
-               }
-
-               if (!DI::mode()->isNormal()) {
-                       return;
-               }
-
-               // Check every minute if the daemon is running
-               if (DI::config()->get('system', 'last_daemon_check', 0) + 60 > time()) {
-                       return;
-               }
-
-               DI::config()->set('system', 'last_daemon_check', time());
-
-               $pidfile = DI::config()->get('system', 'pidfile');
-               if (empty($pidfile)) {
-                       // No pid file, no daemon
-                       return;
-               }
-
-               if (!is_readable($pidfile)) {
-                       // No pid file. We assume that the daemon had been intentionally stopped.
-                       return;
-               }
-
-               $pid = intval(file_get_contents($pidfile));
-               if (posix_kill($pid, 0)) {
-                       Logger::info('Daemon process is running', ['pid' => $pid]);
-                       return;
-               }
-
-               Logger::warning('Daemon process is not running', ['pid' => $pid]);
-
-               self::spawnDaemon();
-       }
-
-       /**
-        * Spawn a new daemon process
-        *
-        * @return void
-        */
-       private static function spawnDaemon()
-       {
-               Logger::notice('Starting new daemon process');
-               $command = 'bin/daemon.php';
-               $a = DI::app();
-               DI::system()->run($command, ['start']);
-               Logger::notice('New daemon process started');
-       }
-
        /**
         * Check if the system is inside the defined maintenance window
         *
diff --git a/src/Core/Worker/Cron.php b/src/Core/Worker/Cron.php
new file mode 100644 (file)
index 0000000..b07c472
--- /dev/null
@@ -0,0 +1,192 @@
+<?php
+/**
+ * @copyright Copyright (C) 2010-2022, the Friendica project
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace Friendica\Core\Worker;
+
+use Friendica\Core\Logger;
+use Friendica\Core\Worker;
+use Friendica\Database\DBA;
+use Friendica\DI;
+use Friendica\Model\Post;
+use Friendica\Protocol\ActivityPub;
+use Friendica\Util\DateTimeFormat;
+
+/**
+ * Contains the class for jobs that are executed in an interval
+ */
+class Cron
+{
+       /**
+        * Runs the cron processes
+        *
+        * @return void
+        * @throws \Friendica\Network\HTTPException\InternalServerErrorException
+        */
+       public static function run()
+       {
+               Logger::info('Add cron entries');
+
+               // Check for spooled items
+               Worker::add(['priority' => PRIORITY_HIGH, 'force_priority' => true], 'SpoolPost');
+
+               // Run the cron job that calls all other jobs
+               Worker::add(['priority' => PRIORITY_MEDIUM, 'force_priority' => true], 'Cron');
+
+               // Cleaning dead processes
+               self::killStaleWorkers();
+
+               // Remove old entries from the workerqueue
+               self::cleanWorkerQueue();
+
+               // Directly deliver or requeue posts
+               self::deliverPosts();
+       }
+
+       /**
+        * fix the queue entry if the worker process died
+        *
+        * @return void
+        * @throws \Exception
+        */
+       public static function killStaleWorkers()
+       {
+               $stamp = (float)microtime(true);
+               $entries = DBA::select(
+                       'workerqueue',
+                       ['id', 'pid', 'executed', 'priority', 'command', 'parameter'],
+                       ['NOT `done` AND `pid` != 0'],
+                       ['order' => ['priority', 'retrial', 'created']]
+               );
+
+               while ($entry = DBA::fetch($entries)) {
+                       if (!posix_kill($entry["pid"], 0)) {
+                               $stamp = (float)microtime(true);
+                               DBA::update(
+                                       'workerqueue',
+                                       ['executed' => DBA::NULL_DATETIME, 'pid' => 0],
+                                       ['id' => $entry["id"]]
+                               );
+                       } else {
+                               // Kill long running processes
+
+                               // Define the maximum durations
+                               $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
+                               $max_duration = $max_duration_defaults[$entry['priority']];
+
+                               $argv = json_decode($entry['parameter'], true);
+                               if (!empty($entry['command'])) {
+                                       $command = $entry['command'];
+                               } elseif (!empty($argv)) {
+                                       $command = array_shift($argv);
+                               } else {
+                                       return;
+                               }
+
+                               $command = basename($command);
+
+                               // How long is the process already running?
+                               $duration = (time() - strtotime($entry["executed"])) / 60;
+                               if ($duration > $max_duration) {
+                                       Logger::notice('Worker process took too much time - killed', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
+                                       posix_kill($entry["pid"], SIGTERM);
+
+                                       // We killed the stale process.
+                                       // To avoid a blocking situation we reschedule the process at the beginning of the queue.
+                                       // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
+                                       $new_priority = $entry['priority'];
+                                       if ($entry['priority'] == PRIORITY_HIGH) {
+                                               $new_priority = PRIORITY_MEDIUM;
+                                       } elseif ($entry['priority'] == PRIORITY_MEDIUM) {
+                                               $new_priority = PRIORITY_LOW;
+                                       } elseif ($entry['priority'] != PRIORITY_CRITICAL) {
+                                               $new_priority = PRIORITY_NEGLIGIBLE;
+                                       }
+                                       $stamp = (float)microtime(true);
+                                       DBA::update(
+                                               'workerqueue',
+                                               ['executed' => DBA::NULL_DATETIME, 'created' => DateTimeFormat::utcNow(), 'priority' => $new_priority, 'pid' => 0],
+                                               ['id' => $entry["id"]]
+                                       );
+                               } else {
+                                       Logger::info('Process runtime is okay', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
+                               }
+                       }
+               }
+               DBA::close($entries);
+       }
+
+       /**
+        * Remove old entries from the workerqueue
+        *
+        * @return void
+        */
+       private static function cleanWorkerQueue()
+       {
+               DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]);
+
+               // Optimizing this table only last seconds
+               if (DI::config()->get('system', 'optimize_tables')) {
+                       // We are acquiring the two locks from the worker to avoid locking problems
+                       if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) {
+                               if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) {
+                                       DBA::e("OPTIMIZE TABLE `workerqueue`");
+                                       DBA::e("OPTIMIZE TABLE `process`");
+                                       DI::lock()->release(Worker::LOCK_WORKER);
+                               }
+                               DI::lock()->release(Worker::LOCK_PROCESS);
+                       }
+               }
+       }       
+
+       /**
+        * Directly deliver AP messages or requeue them.
+        * 
+        * This function is placed here as a safeguard. Even when the worker queue is completely blocked, messages will be delivered.
+        */
+       private static function deliverPosts()
+       {
+               $deliveries = DBA::p("SELECT `item-uri`.`uri` AS `inbox`, MAX(`failed`) AS `failed` FROM `post-delivery` INNER JOIN `item-uri` ON `item-uri`.`id` = `post-delivery`.`inbox-id` GROUP BY `inbox`");
+               while ($delivery = DBA::fetch($deliveries)) {
+                       if ($delivery['failed'] == 0) {
+                               $result = ActivityPub\Delivery::deliver($delivery['inbox']);
+                               Logger::info('Drectly deliver inbox', ['inbox' => $delivery['inbox'], 'result' => $result['success']]);
+                               continue;
+                       } elseif ($delivery['failed'] < 3) {
+                               $priority = PRIORITY_HIGH;
+                       } elseif ($delivery['failed'] < 6) {
+                               $priority = PRIORITY_MEDIUM;
+                       } elseif ($delivery['failed'] < 8) {
+                               $priority = PRIORITY_LOW;
+                       } {
+                               $priority = PRIORITY_NEGLIGIBLE;
+                       }
+
+                       if ($delivery['failed'] >= DI::config()->get('system', 'worker_defer_limit')) {
+                               Logger::info('Removing failed deliveries', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed']]);
+                               Post\Delivery::removeFailed($delivery['inbox']);
+                       }
+
+                       if (Worker::add($priority, 'APDelivery', '', 0, $delivery['inbox'], 0)) {
+                               Logger::info('Missing APDelivery worker added for inbox', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed'], 'priority' => $priority]);
+                       }
+               }
+       }
+}
diff --git a/src/Core/Worker/Daemon.php b/src/Core/Worker/Daemon.php
new file mode 100644 (file)
index 0000000..afc6cda
--- /dev/null
@@ -0,0 +1,137 @@
+<?php
+/**
+ * @copyright Copyright (C) 2010-2022, the Friendica project
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace Friendica\Core\Worker;
+
+use Friendica\App\Mode;
+use Friendica\Core\Logger;
+use Friendica\DI;
+
+/**
+ * Contains the class for the worker background job processing
+ */
+class Daemon
+{
+       private static $daemon_mode = null;
+
+       /**
+        * Checks if the worker is running in the daemon mode.
+        *
+        * @return boolean
+        */
+       public static function isMode()
+       {
+               if (!is_null(self::$daemon_mode)) {
+                       return self::$daemon_mode;
+               }
+
+               if (DI::mode()->getExecutor() == Mode::DAEMON) {
+                       return true;
+               }
+
+               $daemon_mode = DI::config()->get('system', 'worker_daemon_mode', false, true);
+               if ($daemon_mode) {
+                       return $daemon_mode;
+               }
+
+               if (!function_exists('pcntl_fork')) {
+                       self::$daemon_mode = false;
+                       return false;
+               }
+
+               $pidfile = DI::config()->get('system', 'pidfile');
+               if (empty($pidfile)) {
+                       // No pid file, no daemon
+                       self::$daemon_mode = false;
+                       return false;
+               }
+
+               if (!is_readable($pidfile)) {
+                       // No pid file. We assume that the daemon had been intentionally stopped.
+                       self::$daemon_mode = false;
+                       return false;
+               }
+
+               $pid = intval(file_get_contents($pidfile));
+               $running = posix_kill($pid, 0);
+
+               self::$daemon_mode = $running;
+               return $running;
+       }
+
+       /**
+        * Test if the daemon is running. If not, it will be started
+        *
+        * @return void
+        */
+       public static function checkState()
+       {
+               if (!DI::config()->get('system', 'daemon_watchdog', false)) {
+                       return;
+               }
+
+               if (!DI::mode()->isNormal()) {
+                       return;
+               }
+
+               // Check every minute if the daemon is running
+               if (DI::config()->get('system', 'last_daemon_check', 0) + 60 > time()) {
+                       return;
+               }
+
+               DI::config()->set('system', 'last_daemon_check', time());
+
+               $pidfile = DI::config()->get('system', 'pidfile');
+               if (empty($pidfile)) {
+                       // No pid file, no daemon
+                       return;
+               }
+
+               if (!is_readable($pidfile)) {
+                       // No pid file. We assume that the daemon had been intentionally stopped.
+                       return;
+               }
+
+               $pid = intval(file_get_contents($pidfile));
+               if (posix_kill($pid, 0)) {
+                       Logger::info('Daemon process is running', ['pid' => $pid]);
+                       return;
+               }
+
+               Logger::warning('Daemon process is not running', ['pid' => $pid]);
+
+               self::spawn();
+       }
+
+       /**
+        * Spawn a new daemon process
+        *
+        * @return void
+        */
+       private static function spawn()
+       {
+               Logger::notice('Starting new daemon process');
+               $command = 'bin/daemon.php';
+               $a = DI::app();
+               DI::system()->run($command, ['start']);
+               Logger::notice('New daemon process started');
+       }
+}
diff --git a/src/Core/Worker/IPC.php b/src/Core/Worker/IPC.php
new file mode 100644 (file)
index 0000000..a10cf1c
--- /dev/null
@@ -0,0 +1,75 @@
+<?php
+/**
+ * @copyright Copyright (C) 2010-2022, the Friendica project
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace Friendica\Core\Worker;
+
+use Friendica\Database\DBA;
+
+/**
+ * Contains the class for the inter process communication
+ */
+class IPC
+{
+       /**
+        * Set the flag if some job is waiting
+        *
+        * @param boolean $jobs Is there a waiting job?
+        * @param int $key Key number
+        * @throws \Exception
+        */
+       public static function SetJobState(bool $jobs, int $key = 0)
+       {
+               $stamp = (float)microtime(true);
+               DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]);
+       }
+
+       /**
+        * Delete a key entry
+        *
+        * @param int $key Key number
+        * @throws \Exception
+        */
+       public static function DeleteJobState(int $key)
+       {
+               $stamp = (float)microtime(true);
+               DBA::delete('worker-ipc', ['key' => $key]);
+       }
+
+       /**
+        * Checks if some worker job waits to be executed
+        *
+        * @param int $key Key number
+        * @return bool
+        * @throws \Exception
+        */
+       public static function JobsExists(int $key = 0)
+       {
+               $stamp = (float)microtime(true);
+               $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]);
+
+               // When we don't have a row, no job is running
+               if (!DBA::isResult($row)) {
+                       return false;
+               }
+
+               return (bool)$row['jobs'];
+       }
+}
diff --git a/src/Protocol/ActivityPub/Delivery.php b/src/Protocol/ActivityPub/Delivery.php
new file mode 100644 (file)
index 0000000..29fe21d
--- /dev/null
@@ -0,0 +1,161 @@
+<?php
+/**
+ * @copyright Copyright (C) 2010-2022, the Friendica project
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace Friendica\Protocol\ActivityPub;
+
+use Friendica\Core\Logger;
+use Friendica\DI;
+use Friendica\Model\Contact;
+use Friendica\Model\GServer;
+use Friendica\Model\Post;
+use Friendica\Protocol\ActivityPub;
+use Friendica\Util\HTTPSignature;
+use Friendica\Worker\Delivery as WorkerDelivery;
+
+class Delivery
+{
+       public static function deliver(string $inbox):array
+       {
+               $uri_ids    = [];
+               $posts      = Post\Delivery::selectForInbox($inbox);
+               $serverfail = false;
+
+               foreach ($posts as $post) {
+                       if (!$serverfail) {
+                               $result = self::deliverToInbox($post['command'], 0, $inbox, $post['uid'], $post['receivers'], $post['uri-id']);
+
+                               if ($result['serverfailure']) {
+                                       // In a timeout situation we assume that every delivery to that inbox will time out.
+                                       // So we set the flag and try all deliveries at a later time.
+                                       Logger::info('Inbox delivery has a server failure', ['inbox' => $inbox]);
+                                       $serverfail = true;
+                               }
+                       }
+
+                       if ($serverfail || !$result['success']) {
+                               $uri_ids[] = $post['uri-id'];
+                       }
+               }
+
+               Logger::debug('Inbox delivery done', ['inbox' => $inbox, 'posts' => count($posts), 'failed' => count($uri_ids), 'serverfailure' => $serverfail]);
+               return ['success' => empty($uri_ids), 'uri_ids' => $uri_ids];
+       }
+
+       public static function deliverToInbox(string $cmd, int $item_id, string $inbox, int $uid, array $receivers, int $uri_id): array
+       {
+               if (empty($item_id) && !empty($uri_id) && !empty($uid)) {
+                       $item = Post::selectFirst(['id', 'parent', 'origin'], ['uri-id' => $uri_id, 'uid' => [$uid, 0]], ['order' => ['uid' => true]]);
+                       if (empty($item['id'])) {
+                               Logger::notice('Item not found, removing delivery', ['uri-id' => $uri_id, 'uid' => $uid, 'cmd' => $cmd, 'inbox' => $inbox]);
+                               Post\Delivery::remove($uri_id, $inbox);
+                               return true;
+                       } else {
+                               $item_id = $item['id'];
+                       }
+               }
+
+               $success    = true;
+               $serverfail = false;
+
+               if ($cmd == WorkerDelivery::MAIL) {
+                       $data = ActivityPub\Transmitter::createActivityFromMail($item_id);
+                       if (!empty($data)) {
+                               $success = HTTPSignature::transmit($data, $inbox, $uid);
+                       }
+               } elseif ($cmd == WorkerDelivery::SUGGESTION) {
+                       $success = ActivityPub\Transmitter::sendContactSuggestion($uid, $inbox, $item_id);
+               } elseif ($cmd == WorkerDelivery::RELOCATION) {
+                       // @todo Implementation pending
+               } elseif ($cmd == WorkerDelivery::POKE) {
+                       // Implementation not planned
+               } elseif ($cmd == WorkerDelivery::REMOVAL) {
+                       $success = ActivityPub\Transmitter::sendProfileDeletion($uid, $inbox);
+               } elseif ($cmd == WorkerDelivery::PROFILEUPDATE) {
+                       $success = ActivityPub\Transmitter::sendProfileUpdate($uid, $inbox);
+               } else {
+                       $data = ActivityPub\Transmitter::createCachedActivityFromItem($item_id);
+                       if (!empty($data)) {
+                               $timestamp  = microtime(true);
+                               $response   = HTTPSignature::post($data, $inbox, $uid);
+                               $runtime    = microtime(true) - $timestamp;
+                               $success    = $response->isSuccess();
+                               $serverfail = $response->isTimeout();
+                               if (!$success) {
+                                       if (!$serverfail && ($response->getReturnCode() >= 500) && ($response->getReturnCode() <= 599)) {
+                                               $serverfail = true;
+                                       }
+
+                                       $xrd_timeout = DI::config()->get('system', 'xrd_timeout');
+                                       if (!$serverfail && $xrd_timeout && ($runtime > $xrd_timeout)) {
+                                               $serverfail = true;
+                                       }
+                                       $curl_timeout = DI::config()->get('system', 'curl_timeout');
+                                       if (!$serverfail && $curl_timeout && ($runtime > $curl_timeout)) {
+                                               $serverfail = true;
+                                       }
+
+                                       Logger::info('Delivery failed', ['retcode' => $response->getReturnCode(), 'serverfailure' => $serverfail, 'runtime' => round($runtime, 3), 'uri-id' => $uri_id, 'uid' => $uid, 'item_id' => $item_id, 'cmd' => $cmd, 'inbox' => $inbox]);
+                               }
+                               if ($uri_id) {
+                                       if ($success) {
+                                               Post\Delivery::remove($uri_id, $inbox);
+                                       } else {
+                                               Post\Delivery::incrementFailed($uri_id, $inbox);
+                                       }
+                               }
+                       }
+               }
+
+               self::setSuccess($receivers, $success);
+
+               Logger::debug('Delivered', ['uri-id' => $uri_id, 'uid' => $uid, 'item_id' => $item_id, 'cmd' => $cmd, 'inbox' => $inbox, 'success' => $success]);
+
+               if ($success && in_array($cmd, [WorkerDelivery::POST])) {
+                       Post\DeliveryData::incrementQueueDone($uri_id, Post\DeliveryData::ACTIVITYPUB);
+               }
+
+               return ['success' => $success, 'serverfailure' => $serverfail];
+       }
+
+       private static function setSuccess(array $receivers, bool $success)
+       {
+               $gsid = null;
+
+               foreach ($receivers as $receiver) {
+                       $contact = Contact::getById($receiver);
+                       if (empty($contact)) {
+                               continue;
+                       }
+
+                       $gsid = $gsid ?: $contact['gsid'];
+
+                       if ($success) {
+                               Contact::unmarkForArchival($contact);
+                       } else {
+                               Contact::markForArchival($contact);
+                       }
+               }
+
+               if (!empty($gsid)) {
+                       GServer::setProtocol($gsid, Post\DeliveryData::ACTIVITYPUB);
+               }
+       }
+}
index 051bde0a0ce9bdea0c26f30d794cc4e4ef64bc4f..4703b8ca165f32b391959d27dd8de1500533b4f8 100644 (file)
@@ -23,13 +23,8 @@ namespace Friendica\Worker;
 
 use Friendica\Core\Logger;
 use Friendica\Core\Worker;
-use Friendica\DI;
-use Friendica\Model\Contact;
-use Friendica\Model\GServer;
 use Friendica\Model\Post;
 use Friendica\Protocol\ActivityPub;
-use Friendica\Util\HTTPSignature;
-
 class APDelivery
 {
        /**
@@ -69,11 +64,11 @@ class APDelivery
                Logger::debug('Invoked', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $item_id, 'uri-id' => $uri_id, 'uid' => $uid]);
 
                if (empty($uri_id)) {
-                       $result  = self::deliver($inbox);
+                       $result  = ActivityPub\Delivery::deliver($inbox);
                        $success = $result['success'];
                        $uri_ids = $result['uri_ids'];
                } else {
-                       $result  = self::deliverToInbox($cmd, $item_id, $inbox, $uid, $receivers, $uri_id);
+                       $result  = ActivityPub\Delivery::deliverToInbox($cmd, $item_id, $inbox, $uid, $receivers, $uri_id);
                        $success = $result['success'];
                        $uri_ids = [$uri_id];
                }
@@ -85,131 +80,4 @@ class APDelivery
                        }
                }
        }
-
-       private static function deliver(string $inbox):array
-       {
-               $uri_ids    = [];
-               $posts      = Post\Delivery::selectForInbox($inbox);
-               $serverfail = false;
-
-               foreach ($posts as $post) {
-                       if (!$serverfail) {
-                               $result = self::deliverToInbox($post['command'], 0, $inbox, $post['uid'], $post['receivers'], $post['uri-id']);
-
-                               if ($result['serverfailure']) {
-                                       // In a timeout situation we assume that every delivery to that inbox will time out.
-                                       // So we set the flag and try all deliveries at a later time.
-                                       Logger::info('Inbox delivery has a server failure', ['inbox' => $inbox]);
-                                       $serverfail = true;
-                               }
-                       }
-
-                       if ($serverfail || !$result['success']) {
-                               $uri_ids[] = $post['uri-id'];
-                       }
-               }
-
-               Logger::debug('Inbox delivery done', ['inbox' => $inbox, 'posts' => count($posts), 'failed' => count($uri_ids), 'serverfailure' => $serverfail]);
-               return ['success' => empty($uri_ids), 'uri_ids' => $uri_ids];
-       }
-
-       private static function deliverToInbox(string $cmd, int $item_id, string $inbox, int $uid, array $receivers, int $uri_id): array
-       {
-               if (empty($item_id) && !empty($uri_id) && !empty($uid)) {
-                       $item = Post::selectFirst(['id', 'parent', 'origin'], ['uri-id' => $uri_id, 'uid' => [$uid, 0]], ['order' => ['uid' => true]]);
-                       if (empty($item['id'])) {
-                               Logger::notice('Item not found, removing delivery', ['uri-id' => $uri_id, 'uid' => $uid, 'cmd' => $cmd, 'inbox' => $inbox]);
-                               Post\Delivery::remove($uri_id, $inbox);
-                               return true;
-                       } else {
-                               $item_id = $item['id'];
-                       }
-               }
-
-               $success    = true;
-               $serverfail = false;
-
-               if ($cmd == Delivery::MAIL) {
-                       $data = ActivityPub\Transmitter::createActivityFromMail($item_id);
-                       if (!empty($data)) {
-                               $success = HTTPSignature::transmit($data, $inbox, $uid);
-                       }
-               } elseif ($cmd == Delivery::SUGGESTION) {
-                       $success = ActivityPub\Transmitter::sendContactSuggestion($uid, $inbox, $item_id);
-               } elseif ($cmd == Delivery::RELOCATION) {
-                       // @todo Implementation pending
-               } elseif ($cmd == Delivery::POKE) {
-                       // Implementation not planned
-               } elseif ($cmd == Delivery::REMOVAL) {
-                       $success = ActivityPub\Transmitter::sendProfileDeletion($uid, $inbox);
-               } elseif ($cmd == Delivery::PROFILEUPDATE) {
-                       $success = ActivityPub\Transmitter::sendProfileUpdate($uid, $inbox);
-               } else {
-                       $data = ActivityPub\Transmitter::createCachedActivityFromItem($item_id);
-                       if (!empty($data)) {
-                               $timestamp  = microtime(true);
-                               $response   = HTTPSignature::post($data, $inbox, $uid);
-                               $runtime    = microtime(true) - $timestamp;
-                               $success    = $response->isSuccess();
-                               $serverfail = $response->isTimeout();
-                               if (!$success) {
-                                       if (!$serverfail && ($response->getReturnCode() >= 500) && ($response->getReturnCode() <= 599)) {
-                                               $serverfail = true;
-                                       }
-
-                                       $xrd_timeout = DI::config()->get('system', 'xrd_timeout');
-                                       if (!$serverfail && $xrd_timeout && ($runtime > $xrd_timeout)) {
-                                               $serverfail = true;
-                                       }
-                                       $curl_timeout = DI::config()->get('system', 'curl_timeout');
-                                       if (!$serverfail && $curl_timeout && ($runtime > $curl_timeout)) {
-                                               $serverfail = true;
-                                       }
-
-                                       Logger::info('Delivery failed', ['retcode' => $response->getReturnCode(), 'serverfailure' => $serverfail, 'runtime' => round($runtime, 3), 'uri-id' => $uri_id, 'uid' => $uid, 'item_id' => $item_id, 'cmd' => $cmd, 'inbox' => $inbox]);
-                               }
-                               if ($uri_id) {
-                                       if ($success) {
-                                               Post\Delivery::remove($uri_id, $inbox);
-                                       } else {
-                                               Post\Delivery::incrementFailed($uri_id, $inbox);
-                                       }
-                               }
-                       }
-               }
-
-               self::setSuccess($receivers, $success);
-
-               Logger::debug('Delivered', ['uri-id' => $uri_id, 'uid' => $uid, 'item_id' => $item_id, 'cmd' => $cmd, 'inbox' => $inbox, 'success' => $success]);
-
-               if ($success && in_array($cmd, [Delivery::POST])) {
-                       Post\DeliveryData::incrementQueueDone($uri_id, Post\DeliveryData::ACTIVITYPUB);
-               }
-
-               return ['success' => $success, 'serverfailure' => $serverfail];
-       }
-
-       private static function setSuccess(array $receivers, bool $success)
-       {
-               $gsid = null;
-
-               foreach ($receivers as $receiver) {
-                       $contact = Contact::getById($receiver);
-                       if (empty($contact)) {
-                               continue;
-                       }
-
-                       $gsid = $gsid ?: $contact['gsid'];
-
-                       if ($success) {
-                               Contact::unmarkForArchival($contact);
-                       } else {
-                               Contact::markForArchival($contact);
-                       }
-               }
-
-               if (!empty($gsid)) {
-                       GServer::setProtocol($gsid, Post\DeliveryData::ACTIVITYPUB);
-               }
-       }
 }
index e39ba24aae1cc71ca8754f2c9747ca9d5f9fc1f0..01c54fed1845576124498faf43d02f712bb52eb9 100644 (file)
@@ -95,9 +95,6 @@ class Cron
                        // Clear cache entries
                        Worker::add(PRIORITY_LOW, 'ClearCache');
 
-                       // Requeue posts from the post delivery entries
-                       Worker::add(PRIORITY_MEDIUM, 'RequeuePosts');
-
                        DI::config()->set('system', 'last_cron_hourly', time());
                }
 
diff --git a/src/Worker/RequeuePosts.php b/src/Worker/RequeuePosts.php
deleted file mode 100644 (file)
index 568595a..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-<?php
-/**
- * @copyright Copyright (C) 2010-2022, the Friendica project
- *
- * @license GNU AGPL version 3 or any later version
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program.  If not, see <https://www.gnu.org/licenses/>.
- *
- */
-
-namespace Friendica\Worker;
-
-use Friendica\Core\Logger;
-use Friendica\Core\Worker;
-use Friendica\Database\DBA;
-use Friendica\Model\Post;
-
-/**
- * Requeue posts that are stuck in the post-delivery table without a matching delivery job.
- * This should not happen in regular situations, this is a precaution.
- */
-class RequeuePosts
-{
-       public static function execute()
-       {
-               $deliveries = DBA::p("SELECT `item-uri`.`uri` AS `inbox` FROM `post-delivery` INNER JOIN `item-uri` ON `item-uri`.`id` = `post-delivery`.`inbox-id` GROUP BY `inbox`");
-               while ($delivery = DBA::fetch($deliveries)) {
-                       Post\Delivery::removeFailed($delivery['inbox']);
-
-                       if (Worker::add(PRIORITY_HIGH, 'APDelivery', '', 0, $delivery['inbox'], 0)) {
-                               Logger::info('Missing APDelivery worker added for inbox', ['inbox' => $delivery['inbox']]);
-                       }
-               }
-               DBA::close($deliveries);
-       }
-}