]> git.mxchange.org Git - friendica.git/commitdiff
workerqueue now has a "command" field
authorMichael <heluecht@pirati.ca>
Thu, 3 Dec 2020 15:47:50 +0000 (15:47 +0000)
committerMichael <heluecht@pirati.ca>
Thu, 3 Dec 2020 15:47:50 +0000 (15:47 +0000)
src/Core/Worker.php
src/Model/GServer.php
src/Worker/Cron.php
src/Worker/UpdateGServer.php
src/Worker/UpdateGServers.php
src/Worker/UpdatePublicContacts.php
static/dbstructure.config.php

index 33e66ae1c3bb6f13d2015c830a5df3ad33509a50..4f455277641f9b6ca43f1547b02685d814b20e1d 100644 (file)
@@ -300,7 +300,11 @@ class Worker
                        return false;
                }
 
-               $argv = json_decode($queue["parameter"], true);
+               $argv = json_decode($queue['parameter'], true);
+               if (!empty($queue['command'])) {
+                       array_unshift($argv, $queue['command']);
+               }
+
                if (empty($argv)) {
                        Logger::warning('Parameter is empty', ['queue' => $queue]);
                        return false;
@@ -576,7 +580,7 @@ class Worker
                $stamp = (float)microtime(true);
                $entries = DBA::select(
                        'workerqueue',
-                       ['id', 'pid', 'executed', 'priority', 'parameter'],
+                       ['id', 'pid', 'executed', 'priority', 'command', 'parameter'],
                        ['NOT `done` AND `pid` != 0'],
                        ['order' => ['priority', 'retrial', 'created']]
                );
@@ -603,17 +607,21 @@ class Worker
                                $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
                                $max_duration = $max_duration_defaults[$entry["priority"]];
 
-                               $argv = json_decode($entry["parameter"], true);
-                               if (empty($argv)) {
+                               $argv = json_decode($entry['parameter'], true);
+                               if (!empty($entry['command'])) {
+                                       $command = $entry['command'];
+                               } elseif (!empty($argv)) {
+                                       $command = array_shift($argv);
+                               } else {
                                        return;
                                }
 
-                               $argv[0] = basename($argv[0]);
+                               $command = basename($command);
 
                                // How long is the process already running?
                                $duration = (time() - strtotime($entry["executed"])) / 60;
                                if ($duration > $max_duration) {
-                                       Logger::notice("Worker process ".$entry["pid"]." (".substr(json_encode($argv), 0, 50).") took more than ".$max_duration." minutes. It will be killed now.");
+                                       Logger::notice('Worker process took too much time - killed', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
                                        posix_kill($entry["pid"], SIGTERM);
 
                                        // We killed the stale process.
@@ -636,7 +644,7 @@ class Worker
                                        self::$db_duration += (microtime(true) - $stamp);
                                        self::$db_duration_write += (microtime(true) - $stamp);
                                } else {
-                                       Logger::info('Process runtime is okay', ['pid' => $entry["pid"], 'duration' => $duration, 'max' => $max_duration, 'command' => substr(json_encode($argv), 0, 50)]);
+                                       Logger::info('Process runtime is okay', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
                                }
                        }
                }
@@ -848,12 +856,17 @@ class Worker
                $ids = [];
                $stamp = (float)microtime(true);
                $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
-               $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['retrial', 'created']]);
+               $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['retrial', 'created']]);
                self::$db_duration += (microtime(true) - $stamp);
                while ($task = DBA::fetch($tasks)) {
                        $ids[] = $task['id'];
                        // Only continue that loop while we are storing commands that can be processed quickly
-                       $command = json_decode($task['parameter'])[0];
+                       if (!empty($task['command'])) {
+                               $command = $task['command'];
+                       } else {
+                               $command = json_decode($task['parameter'])[0];
+                       }
+
                        if (!in_array($command, self::FAST_COMMANDS)) {
                                break;
                        }
@@ -968,13 +981,17 @@ class Worker
                if ($limit > 0) {
                        $stamp = (float)microtime(true);
                        $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
-                       $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'retrial', 'created']]);
+                       $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'retrial', 'created']]);
                        self::$db_duration += (microtime(true) - $stamp);
 
                        while ($task = DBA::fetch($tasks)) {
                                $ids[] = $task['id'];
                                // Only continue that loop while we are storing commands that can be processed quickly
-                               $command = json_decode($task['parameter'])[0];
+                               if (!empty($task['command'])) {
+                                       $command = $task['command'];
+                               } else {
+                                       $command = json_decode($task['parameter'])[0];
+                               }
                                if (!in_array($command, self::FAST_COMMANDS)) {
                                        break;
                                }
@@ -1242,8 +1259,9 @@ class Worker
                        }
                }
 
+               $command = array_shift($args);
                $parameters = json_encode($args);
-               $found = DBA::exists('workerqueue', ['parameter' => $parameters, 'done' => false]);
+               $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
                $added = false;
 
                // Quit if there was a database error - a precaution for the update process to 3.5.3
@@ -1252,13 +1270,13 @@ class Worker
                }
 
                if (!$found) {
-                       $added = DBA::insert('workerqueue', ['parameter' => $parameters, 'created' => $created,
+                       $added = DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created,
                                'priority' => $priority, 'next_try' => $delayed]);
                        if (!$added) {
                                return false;
                        }
                } elseif ($force_priority) {
-                       DBA::update('workerqueue', ['priority' => $priority], ['parameter' => $parameters, 'done' => false, 'pid' => 0]);
+                       DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
                }
 
                // Set the IPC flag to ensure an immediate process execution via daemon
@@ -1297,6 +1315,11 @@ class Worker
                return $added;
        }
 
+       public static function countWorkersByCommand(string $command)
+       {
+               return DBA::count('workerqueue', ['done' => false, 'pid' => 0, 'command' => $command]);
+       }
+
        /**
         * Returns the next retrial level for worker jobs.
         * This function will skip levels when jobs are older.
index 713d6f114601d901b53036b7a46ade4b83805a0e..c035abc73fb711aaeb2f0c03c1f1bd5d4946d6f3 100644 (file)
@@ -124,6 +124,55 @@ class GServer
                return self::check($server, $network, $force);
        }
 
+       public static function getNextUpdateDate(bool $success, string $created = '', string $last_contact = '')
+       {
+               // On successful contact process check again next week
+               if ($success) {
+                       return DateTimeFormat::utc('now +7 day');
+               }
+
+               $now = strtotime(DateTimeFormat::utcNow());
+
+               if ($created > $last_contact) {
+                       $contact_time = strtotime($created);
+               } else {
+                       $contact_time = strtotime($last_contact);
+               }
+
+               // If the last contact was less than 6 hours before then try again in 6 hours
+               if (($now - $contact_time) < (60 * 60 * 6)) {
+                       return DateTimeFormat::utc('now +6 hour');
+               }
+
+               // If the last contact was less than 12 hours before then try again in 12 hours
+               if (($now - $contact_time) < (60 * 60 * 12)) {
+                       return DateTimeFormat::utc('now +12 hour');
+               }
+
+               // If the last contact was less than 24 hours before then try tomorrow again
+               if (($now - $contact_time) < (60 * 60 * 24)) {
+                       return DateTimeFormat::utc('now +1 day');
+               }
+               
+               // If the last contact was less than a week before then try again in a week
+               if (($now - $contact_time) < (60 * 60 * 24 * 7)) {
+                       return DateTimeFormat::utc('now +1 week');
+               }
+
+               // If the last contact was less than two weeks before then try again in two week
+               if (($now - $contact_time) < (60 * 60 * 24 * 14)) {
+                       return DateTimeFormat::utc('now +2 week');
+               }
+
+               // If the last contact was less than a month before then try again in a month
+               if (($now - $contact_time) < (60 * 60 * 24 * 30)) {
+                       return DateTimeFormat::utc('now +1 month');
+               }
+
+               // The system hadn't been successul contacted for more than a month, so try again in three months
+               return DateTimeFormat::utc('now +3 month');
+       }
+
        /**
         * Decides if a server needs to be updated, based upon several date fields
         *
@@ -235,10 +284,13 @@ class GServer
         *
         * @param string $url
         */
-       private static function setFailure(string $url)
+       public static function setFailure(string $url)
        {
-               if (DBA::exists('gserver', ['nurl' => Strings::normaliseLink($url)])) {
-                       DBA::update('gserver', ['failed' => true, 'last_failure' => DateTimeFormat::utcNow(), 'detection-method' => null],
+               $gserver = DBA::selectFirst('gserver', [], ['nurl' => Strings::normaliseLink($url)]);
+               if (DBA::isResult($gserver)) {
+                       $next_update = self::getNextUpdateDate(false, $gserver['created'], $gserver['last_contact']);
+                       DBA::update('gserver', ['failed' => true, 'last_failure' => DateTimeFormat::utcNow(),
+                       'next_contact' => $next_update, 'detection-method' => null],
                        ['nurl' => Strings::normaliseLink($url)]);
                        Logger::info('Set failed status for existing server', ['url' => $url]);
                        return;
@@ -306,6 +358,7 @@ class GServer
 
                // If the URL missmatches, then we mark the old entry as failure
                if ($url != $original_url) {
+                       /// @todo What to do with "next_contact" here?
                        DBA::update('gserver', ['failed' => true, 'last_failure' => DateTimeFormat::utcNow()],
                                ['nurl' => Strings::normaliseLink($original_url)]);
                }
@@ -452,6 +505,8 @@ class GServer
                        $serverdata = self::detectNetworkViaContacts($url, $serverdata);
                }
 
+               $serverdata['next_contact'] = self::getNextUpdateDate(true);
+
                $serverdata['last_contact'] = DateTimeFormat::utcNow();
                $serverdata['failed'] = false;
 
@@ -1593,13 +1648,6 @@ class GServer
                );
 
                while ($gserver = DBA::fetch($gservers)) {
-                       if (!GServer::check($gserver['url'], $gserver['network'])) {
-                               // The server is not reachable? Okay, then we will try it later
-                               $fields = ['last_poco_query' => DateTimeFormat::utcNow()];
-                               DBA::update('gserver', $fields, ['nurl' => $gserver['nurl']]);
-                               continue;
-                       }
-
                        Logger::info('Update peer list', ['server' => $gserver['url'], 'id' => $gserver['id']]);
                        Worker::add(PRIORITY_LOW, 'UpdateServerPeers', $gserver['url']);
 
index 270387d3551e8c983dca9b2eb07db477df4b7198..c47e4bd2020833727cd9c4296406f90fb025c997 100644 (file)
@@ -63,6 +63,9 @@ class Cron
                // Update contact information
                Worker::add(PRIORITY_LOW, 'UpdatePublicContacts');
 
+               // Update server information
+               Worker::add(PRIORITY_LOW, 'UpdateGServers');
+
                // run the process to update server directories in the background
                Worker::add(PRIORITY_LOW, 'UpdateServerDirectories');
 
@@ -103,8 +106,6 @@ class Cron
                        // update nodeinfo data
                        Worker::add(PRIORITY_LOW, 'NodeInfo');
 
-                       Worker::add(PRIORITY_LOW, 'UpdateGServers');
-
                        // Repair entries in the database
                        Worker::add(PRIORITY_LOW, 'RepairDatabase');
 
index 12f9572b92c95bbcb9da484fea258dc5c75a1132..696ec125d82516506b8df3c8b6fa35f06b70bfd8 100644 (file)
@@ -32,18 +32,19 @@ class UpdateGServer
         * @param string  $server_url    Server URL
         * @param boolean $only_nodeinfo Only use nodeinfo for server detection
         */
-       public static function execute(string $server_url, bool $only_nodeinfo = false)
+       public static function execute(string $server_url, bool $only_nodeinfo = false, bool $force = false)
        {
                if (empty($server_url)) {
                        return;
                }
 
-               $server_url = filter_var($server_url, FILTER_SANITIZE_URL);
-               if (substr(Strings::normaliseLink($server_url), 0, 7) != 'http://') {
+               $filtered = filter_var($server_url, FILTER_SANITIZE_URL);
+               if (substr(Strings::normaliseLink($filtered), 0, 7) != 'http://') {
+                       GServer::setFailure($filtered);
                        return;
                }
 
-               $ret = GServer::check($server_url, '', false, $only_nodeinfo);
-               Logger::info('Updated gserver', ['url' => $server_url, 'result' => $ret]);
+               $ret = GServer::check($filtered, '', $force, $only_nodeinfo);
+               Logger::info('Updated gserver', ['url' => $filtered, 'result' => $ret]);
        }
 }
index 5a45138462f797f9e7c258d6d8a4b81935159f40..12022a34682f47cc831f1bbdb68e4b04578a9b32 100644 (file)
@@ -24,34 +24,36 @@ namespace Friendica\Worker;
 use Friendica\Core\Logger;
 use Friendica\Core\Worker;
 use Friendica\Database\DBA;
-use Friendica\Model\GServer;
 
 class UpdateGServers
 {
        /**
-        * Updates the first 250 servers
+        * Updates up to 100 servers
         */
        public static function execute()
        {
-               $gservers = DBA::p("SELECT `url`, `created`, `last_failure`, `last_contact` FROM `gserver` ORDER BY rand()");
-               if (!DBA::isResult($gservers)) {
+               $updating = Worker::countWorkersByCommand('UpdateGServer');
+               $limit = 100 - $updating;
+               if ($limit <= 0) {
+                       Logger::info('The number of currently running jobs exceed the limit');
                        return;
                }
 
-               $updated = 0;
-
-               while ($gserver = DBA::fetch($gservers)) {
-                       if (!GServer::updateNeeded($gserver['created'], '', $gserver['last_failure'], $gserver['last_contact'])) {
-                               continue;
-                       }
-                       Logger::info('Update server status', ['server' => $gserver['url']]);
+               $outdated = DBA::count('gserver', ["`next_contact` < UTC_TIMESTAMP()"]);
+               $total = DBA::count('gserver');
+               Logger::info('Server status', ['total' => $total, 'outdated' => $outdated, 'updating' => $limit]);
 
-                       Worker::add(PRIORITY_LOW, 'UpdateGServer', $gserver['url']);
+               $gservers = DBA::select('gserver', ['url'], ["`next_contact` < UTC_TIMESTAMP()"], ['limit' => $limit]);
+               if (!DBA::isResult($gservers)) {
+                       return;
+               }
 
-                       if (++$updated > 250) {
-                               return;
-                       }
+               $count = 0;
+               while ($gserver = DBA::fetch($gservers)) {
+                       Worker::add(PRIORITY_LOW, 'UpdateGServer', $gserver['url'], false, true);
+                       $count++;
                }
                DBA::close($gservers);
+               Logger::info('Updated servers', ['count' => $count]);
        }
 }
index 2e8602b05487116bd4536506271043a0368361eb..939d9fa8d77eadeaa4e1cfaea1d3c2971acce9a9 100644 (file)
@@ -39,16 +39,27 @@ class UpdatePublicContacts
                $ids = [];
                $base_condition = ['network' => Protocol::FEDERATED, 'uid' => 0, 'self' => false];
 
+               $existing = Worker::countWorkersByCommand('UpdateContact');
+               Logger::info('Already existing jobs', ['existing' => $existing]);
+               if ($existing > 100) {
+                       return;
+               }
+
+               $limit = 100 - $existing;
+
                if (!DI::config()->get('system', 'update_active_contacts')) {
+                       $part = 3;
                        // Add every contact (mostly failed ones) that hadn't been updated for six months
                        $condition = DBA::mergeConditions($base_condition,
                                ["`last-update` < ?", DateTimeFormat::utc('now - 6 month')]);
-                       $ids = self::getContactsToUpdate($condition, $ids);
+                       $ids = self::getContactsToUpdate($condition, $ids, round($limit / $part));
 
                        // Add every non failed contact that hadn't been updated for a month
                        $condition = DBA::mergeConditions($base_condition,
                                ["NOT `failed` AND `last-update` < ?", DateTimeFormat::utc('now - 1 month')]);
-                       $ids = self::getContactsToUpdate($condition, $ids);
+                       $ids = self::getContactsToUpdate($condition, $ids, round($limit / $part));
+               } else {
+                       $part = 1;
                }
 
                // Add every contact our system interacted with and hadn't been updated for a week
@@ -56,7 +67,7 @@ class UpdatePublicContacts
                        `id` IN (SELECT `owner-id` FROM `item`) OR `id` IN (SELECT `causer-id` FROM `item`) OR
                        `id` IN (SELECT `cid` FROM `post-tag`) OR `id` IN (SELECT `cid` FROM `user-contact`)) AND
                        `last-update` < ?", DateTimeFormat::utc('now - 1 week')]);
-               $ids = self::getContactsToUpdate($condition, $ids);
+               $ids = self::getContactsToUpdate($condition, $ids, round($limit / $part));
 
                foreach ($ids as $id) {
                        Worker::add(PRIORITY_LOW, "UpdateContact", $id);
@@ -73,9 +84,9 @@ class UpdatePublicContacts
         * @param array $ids
         * @return array contact ids
         */
-       private static function getContactsToUpdate(array $condition, array $ids = [])
+       private static function getContactsToUpdate(array $condition, array $ids = [], int $limit)
        {
-               $contacts = DBA::select('contact', ['id'], $condition, ['limit' => 100, 'order' => ['last-update']]);
+               $contacts = DBA::select('contact', ['id'], $condition, ['limit' => $limit, 'order' => ['last-update']]);
                while ($contact = DBA::fetch($contacts)) {
                        $ids[] = $contact['id'];
                }
index 9a125cc15be11a52d83fabdda86f75f91e698cd1..c8ebd656dfd149d92512c8c0e8ffb5ea103f6d63 100644 (file)
@@ -81,13 +81,15 @@ return [
                        "detection-method" => ["type" => "tinyint unsigned", "comment" => "Method that had been used to detect that server"],
                        "created" => ["type" => "datetime", "not null" => "1", "default" => DBA::NULL_DATETIME, "comment" => ""],
                        "last_poco_query" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => ""],
-                       "last_contact" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => ""],
-                       "last_failure" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => ""],
+                       "last_contact" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => "Last successful connection request"],
+                       "last_failure" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => "Last failed connection request"],
                        "failed" => ["type" => "boolean", "comment" => "Connection failed"],
+                       "next_contact" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => "Next connection request"],
                ],
                "indexes" => [
                        "PRIMARY" => ["id"],
                        "nurl" => ["UNIQUE", "nurl(190)"],
+                       "next_contact" => ["next_contact"],
                ]
        ],
        "user" => [
@@ -1496,7 +1498,8 @@ return [
                "comment" => "Background tasks queue entries",
                "fields" => [
                        "id" => ["type" => "int unsigned", "not null" => "1", "extra" => "auto_increment", "primary" => "1", "comment" => "Auto incremented worker task id"],
-                       "parameter" => ["type" => "mediumtext", "comment" => "Task command"],
+                       "command" => ["type" => "varchar(100)", "comment" => "Task command"],
+                       "parameter" => ["type" => "mediumtext", "comment" => "Task parameter"],
                        "priority" => ["type" => "tinyint unsigned", "not null" => "1", "default" => "0", "comment" => "Task priority"],
                        "created" => ["type" => "datetime", "not null" => "1", "default" => DBA::NULL_DATETIME, "comment" => "Creation date"],
                        "pid" => ["type" => "int unsigned", "not null" => "1", "default" => "0", "comment" => "Process id of the worker"],
@@ -1507,7 +1510,8 @@ return [
                ],
                "indexes" => [
                        "PRIMARY" => ["id"],
-                       "done_parameter" => ["done", "parameter(64)"],
+                       "command" => ["command"],
+                       "done_command_parameter" => ["done", "command", "parameter(64)"],
                        "done_executed" => ["done", "executed"],
                        "done_priority_retrial_created" => ["done", "priority", "retrial", "created"],
                        "done_priority_next_try" => ["done", "priority", "next_try"],