]> git.mxchange.org Git - friendica.git/blobdiff - src/Core/Worker.php
Suppress notice message when guid isn't supplied in Module\Admin\Item\Source
[friendica.git] / src / Core / Worker.php
index 2dc3c9137bc48572f1a866668bad429de94cddd0..4f455277641f9b6ca43f1547b02685d814b20e1d 100644 (file)
@@ -300,7 +300,11 @@ class Worker
                        return false;
                }
 
-               $argv = json_decode($queue["parameter"], true);
+               $argv = json_decode($queue['parameter'], true);
+               if (!empty($queue['command'])) {
+                       array_unshift($argv, $queue['command']);
+               }
+
                if (empty($argv)) {
                        Logger::warning('Parameter is empty', ['queue' => $queue]);
                        return false;
@@ -576,7 +580,7 @@ class Worker
                $stamp = (float)microtime(true);
                $entries = DBA::select(
                        'workerqueue',
-                       ['id', 'pid', 'executed', 'priority', 'parameter'],
+                       ['id', 'pid', 'executed', 'priority', 'command', 'parameter'],
                        ['NOT `done` AND `pid` != 0'],
                        ['order' => ['priority', 'retrial', 'created']]
                );
@@ -603,17 +607,21 @@ class Worker
                                $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
                                $max_duration = $max_duration_defaults[$entry["priority"]];
 
-                               $argv = json_decode($entry["parameter"], true);
-                               if (empty($argv)) {
+                               $argv = json_decode($entry['parameter'], true);
+                               if (!empty($entry['command'])) {
+                                       $command = $entry['command'];
+                               } elseif (!empty($argv)) {
+                                       $command = array_shift($argv);
+                               } else {
                                        return;
                                }
 
-                               $argv[0] = basename($argv[0]);
+                               $command = basename($command);
 
                                // How long is the process already running?
                                $duration = (time() - strtotime($entry["executed"])) / 60;
                                if ($duration > $max_duration) {
-                                       Logger::notice("Worker process ".$entry["pid"]." (".substr(json_encode($argv), 0, 50).") took more than ".$max_duration." minutes. It will be killed now.");
+                                       Logger::notice('Worker process took too much time - killed', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
                                        posix_kill($entry["pid"], SIGTERM);
 
                                        // We killed the stale process.
@@ -636,7 +644,7 @@ class Worker
                                        self::$db_duration += (microtime(true) - $stamp);
                                        self::$db_duration_write += (microtime(true) - $stamp);
                                } else {
-                                       Logger::info('Process runtime is okay', ['pid' => $entry["pid"], 'duration' => $duration, 'max' => $max_duration, 'command' => substr(json_encode($argv), 0, 50)]);
+                                       Logger::info('Process runtime is okay', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
                                }
                        }
                }
@@ -848,12 +856,17 @@ class Worker
                $ids = [];
                $stamp = (float)microtime(true);
                $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
-               $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['retrial', 'created']]);
+               $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['retrial', 'created']]);
                self::$db_duration += (microtime(true) - $stamp);
                while ($task = DBA::fetch($tasks)) {
                        $ids[] = $task['id'];
                        // Only continue that loop while we are storing commands that can be processed quickly
-                       $command = json_decode($task['parameter'])[0];
+                       if (!empty($task['command'])) {
+                               $command = $task['command'];
+                       } else {
+                               $command = json_decode($task['parameter'])[0];
+                       }
+
                        if (!in_array($command, self::FAST_COMMANDS)) {
                                break;
                        }
@@ -968,13 +981,17 @@ class Worker
                if ($limit > 0) {
                        $stamp = (float)microtime(true);
                        $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
-                       $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'retrial', 'created']]);
+                       $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'retrial', 'created']]);
                        self::$db_duration += (microtime(true) - $stamp);
 
                        while ($task = DBA::fetch($tasks)) {
                                $ids[] = $task['id'];
                                // Only continue that loop while we are storing commands that can be processed quickly
-                               $command = json_decode($task['parameter'])[0];
+                               if (!empty($task['command'])) {
+                                       $command = $task['command'];
+                               } else {
+                                       $command = json_decode($task['parameter'])[0];
+                               }
                                if (!in_array($command, self::FAST_COMMANDS)) {
                                        break;
                                }
@@ -1226,7 +1243,7 @@ class Worker
                        $priority = $run_parameter;
                } elseif (is_array($run_parameter)) {
                        if (isset($run_parameter['delayed'])) {
-                               $delayed = $run_parameter['execute'];
+                               $delayed = $run_parameter['delayed'];
                        }
                        if (isset($run_parameter['priority'])) {
                                $priority = $run_parameter['priority'];
@@ -1242,8 +1259,9 @@ class Worker
                        }
                }
 
+               $command = array_shift($args);
                $parameters = json_encode($args);
-               $found = DBA::exists('workerqueue', ['parameter' => $parameters, 'done' => false]);
+               $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
                $added = false;
 
                // Quit if there was a database error - a precaution for the update process to 3.5.3
@@ -1252,13 +1270,13 @@ class Worker
                }
 
                if (!$found) {
-                       $added = DBA::insert('workerqueue', ['parameter' => $parameters, 'created' => $created,
+                       $added = DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created,
                                'priority' => $priority, 'next_try' => $delayed]);
                        if (!$added) {
                                return false;
                        }
                } elseif ($force_priority) {
-                       DBA::update('workerqueue', ['priority' => $priority], ['parameter' => $parameters, 'done' => false, 'pid' => 0]);
+                       DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
                }
 
                // Set the IPC flag to ensure an immediate process execution via daemon
@@ -1297,6 +1315,11 @@ class Worker
                return $added;
        }
 
+       public static function countWorkersByCommand(string $command)
+       {
+               return DBA::count('workerqueue', ['done' => false, 'pid' => 0, 'command' => $command]);
+       }
+
        /**
         * Returns the next retrial level for worker jobs.
         * This function will skip levels when jobs are older.