]> git.mxchange.org Git - friendica.git/blobdiff - src/Core/Worker.php
Suppress notice message when guid isn't supplied in Module\Admin\Item\Source
[friendica.git] / src / Core / Worker.php
index 80ec16982c709e608eff186dfc54e1dfac04ea3c..4f455277641f9b6ca43f1547b02685d814b20e1d 100644 (file)
 namespace Friendica\Core;
 
 use Friendica\Core;
-use Friendica\Core\Process as ProcessAlias;
 use Friendica\Database\DBA;
 use Friendica\DI;
-use Friendica\Model\Process;
 use Friendica\Util\DateTimeFormat;
 
 /**
@@ -38,7 +36,7 @@ class Worker
        const STATE_REFETCH    = 3; // Worker had refetched jobs in the execution loop.
        const STATE_SHORT_LOOP = 4; // Worker is processing preassigned jobs, thus saving much time.
 
-       const FAST_COMMANDS = ['APDelivery', 'Delivery', 'CreateShadowEntry'];
+       const FAST_COMMANDS = ['APDelivery', 'Delivery'];
 
        const LOCK_PROCESS = 'worker_process';
        const LOCK_WORKER = 'worker';
@@ -68,7 +66,7 @@ class Worker
 
                // At first check the maximum load. We shouldn't continue with a high load
                if (DI::process()->isMaxLoadReached()) {
-                       Logger::info('Pre check: maximum load reached, quitting.');
+                       Logger::notice('Pre check: maximum load reached, quitting.');
                        return;
                }
 
@@ -134,7 +132,7 @@ class Worker
 
                                        // Check free memory
                                        if (DI::process()->isMinMemoryReached()) {
-                                               Logger::info('Memory limit reached, quitting.');
+                                               Logger::notice('Memory limit reached, quitting.');
                                                DI::lock()->release(self::LOCK_WORKER);
                                                return;
                                        }
@@ -176,19 +174,19 @@ class Worker
 
                // Do we have too few memory?
                if (DI::process()->isMinMemoryReached()) {
-                       Logger::info('Memory limit reached, quitting.');
+                       Logger::notice('Memory limit reached, quitting.');
                        return false;
                }
 
                // Possibly there are too much database connections
                if (self::maxConnectionsReached()) {
-                       Logger::info('Maximum connections reached, quitting.');
+                       Logger::notice('Maximum connections reached, quitting.');
                        return false;
                }
 
                // Possibly there are too much database processes that block the system
                if (DI::process()->isMaxProcessesReached()) {
-                       Logger::info('Maximum processes reached, quitting.');
+                       Logger::notice('Maximum processes reached, quitting.');
                        return false;
                }
                
@@ -286,25 +284,29 @@ class Worker
 
                // Quit when in maintenance
                if (DI::config()->get('system', 'maintenance', false, true)) {
-                       Logger::info("Maintenance mode - quit process", ['pid' => $mypid]);
+                       Logger::notice("Maintenance mode - quit process", ['pid' => $mypid]);
                        return false;
                }
 
                // Constantly check the number of parallel database processes
                if (DI::process()->isMaxProcessesReached()) {
-                       Logger::info("Max processes reached for process", ['pid' => $mypid]);
+                       Logger::notice("Max processes reached for process", ['pid' => $mypid]);
                        return false;
                }
 
                // Constantly check the number of available database connections to let the frontend be accessible at any time
                if (self::maxConnectionsReached()) {
-                       Logger::info("Max connection reached for process", ['pid' => $mypid]);
+                       Logger::notice("Max connection reached for process", ['pid' => $mypid]);
                        return false;
                }
 
-               $argv = json_decode($queue["parameter"], true);
+               $argv = json_decode($queue['parameter'], true);
+               if (!empty($queue['command'])) {
+                       array_unshift($argv, $queue['command']);
+               }
+
                if (empty($argv)) {
-                       Logger::error('Parameter is empty', ['queue' => $queue]);
+                       Logger::warning('Parameter is empty', ['queue' => $queue]);
                        return false;
                }
 
@@ -348,7 +350,7 @@ class Worker
                }
 
                if (!validate_include($include)) {
-                       Logger::log("Include file ".$argv[0]." is not valid!");
+                       Logger::warning("Include file is not valid", ['file' => $argv[0]]);
                        $stamp = (float)microtime(true);
                        DBA::delete('workerqueue', ['id' => $queue["id"]]);
                        self::$db_duration = (microtime(true) - $stamp);
@@ -385,7 +387,7 @@ class Worker
                        self::$db_duration = (microtime(true) - $stamp);
                        self::$db_duration_write += (microtime(true) - $stamp);
                } else {
-                       Logger::log("Function ".$funcname." does not exist");
+                       Logger::warning("Function does not exist", ['function' => $funcname]);
                        $stamp = (float)microtime(true);
                        DBA::delete('workerqueue', ['id' => $queue["id"]]);
                        self::$db_duration = (microtime(true) - $stamp);
@@ -533,7 +535,7 @@ class Worker
                        $level = ($used / $max) * 100;
 
                        if ($level >= $maxlevel) {
-                               Logger::log("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max);
+                               Logger::notice("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max);
                                return true;
                        }
                }
@@ -563,7 +565,7 @@ class Worker
                if ($level < $maxlevel) {
                        return false;
                }
-               Logger::log("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
+               Logger::notice("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
                return true;
        }
 
@@ -578,9 +580,9 @@ class Worker
                $stamp = (float)microtime(true);
                $entries = DBA::select(
                        'workerqueue',
-                       ['id', 'pid', 'executed', 'priority', 'parameter'],
+                       ['id', 'pid', 'executed', 'priority', 'command', 'parameter'],
                        ['NOT `done` AND `pid` != 0'],
-                       ['order' => ['priority', 'created']]
+                       ['order' => ['priority', 'retrial', 'created']]
                );
                self::$db_duration += (microtime(true) - $stamp);
 
@@ -605,17 +607,21 @@ class Worker
                                $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
                                $max_duration = $max_duration_defaults[$entry["priority"]];
 
-                               $argv = json_decode($entry["parameter"], true);
-                               if (empty($argv)) {
+                               $argv = json_decode($entry['parameter'], true);
+                               if (!empty($entry['command'])) {
+                                       $command = $entry['command'];
+                               } elseif (!empty($argv)) {
+                                       $command = array_shift($argv);
+                               } else {
                                        return;
                                }
 
-                               $argv[0] = basename($argv[0]);
+                               $command = basename($command);
 
                                // How long is the process already running?
                                $duration = (time() - strtotime($entry["executed"])) / 60;
                                if ($duration > $max_duration) {
-                                       Logger::log("Worker process ".$entry["pid"]." (".substr(json_encode($argv), 0, 50).") took more than ".$max_duration." minutes. It will be killed now.");
+                                       Logger::notice('Worker process took too much time - killed', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
                                        posix_kill($entry["pid"], SIGTERM);
 
                                        // We killed the stale process.
@@ -638,7 +644,7 @@ class Worker
                                        self::$db_duration += (microtime(true) - $stamp);
                                        self::$db_duration_write += (microtime(true) - $stamp);
                                } else {
-                                       Logger::info('Process runtime is okay', ['pid' => $entry["pid"], 'duration' => $duration, 'max' => $max_duration, 'command' => substr(json_encode($argv), 0, 50)]);
+                                       Logger::info('Process runtime is okay', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
                                }
                        }
                }
@@ -850,12 +856,17 @@ class Worker
                $ids = [];
                $stamp = (float)microtime(true);
                $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
-               $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['created']]);
+               $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['retrial', 'created']]);
                self::$db_duration += (microtime(true) - $stamp);
                while ($task = DBA::fetch($tasks)) {
                        $ids[] = $task['id'];
                        // Only continue that loop while we are storing commands that can be processed quickly
-                       $command = json_decode($task['parameter'])[0];
+                       if (!empty($task['command'])) {
+                               $command = $task['command'];
+                       } else {
+                               $command = json_decode($task['parameter'])[0];
+                       }
+
                        if (!in_array($command, self::FAST_COMMANDS)) {
                                break;
                        }
@@ -970,13 +981,17 @@ class Worker
                if ($limit > 0) {
                        $stamp = (float)microtime(true);
                        $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
-                       $tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]);
+                       $tasks = DBA::select('workerqueue', ['id', 'command', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'retrial', 'created']]);
                        self::$db_duration += (microtime(true) - $stamp);
 
                        while ($task = DBA::fetch($tasks)) {
                                $ids[] = $task['id'];
                                // Only continue that loop while we are storing commands that can be processed quickly
-                               $command = json_decode($task['parameter'])[0];
+                               if (!empty($task['command'])) {
+                                       $command = $task['command'];
+                               } else {
+                                       $command = json_decode($task['parameter'])[0];
+                               }
                                if (!in_array($command, self::FAST_COMMANDS)) {
                                        break;
                                }
@@ -1064,7 +1079,7 @@ class Worker
                }
 
                $url = DI::baseUrl() . '/worker';
-               DI::httpRequest()->fetch($url, false, 1);
+               DI::httpRequest()->fetch($url, 1);
        }
 
        /**
@@ -1075,6 +1090,8 @@ class Worker
         */
        public static function executeIfIdle()
        {
+               self::checkDaemonState();
+
                if (!DI::config()->get("system", "frontend_worker")) {
                        return;
                }
@@ -1172,7 +1189,7 @@ class Worker
                $args = ['no_cron' => !$do_cron];
 
                $a = DI::app();
-               $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), $a->getBasePath());
+               $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), $a->getBasePath(), getmypid());
                $process->run($command, $args);
 
                // after spawning we have to remove the flag.
@@ -1188,7 +1205,7 @@ class Worker
         *
         * next args are passed as $cmd command line
         * or: Worker::add(PRIORITY_HIGH, "Notifier", Delivery::DELETION, $drop_id);
-        * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "CreateShadowEntry", $post_id);
+        * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "Delivery", $post_id);
         *
         * @return boolean "false" if worker queue entry already existed or there had been an error
         * @throws \Friendica\Network\HTTPException\InternalServerErrorException
@@ -1217,6 +1234,7 @@ class Worker
                // Don't fork from frontend tasks by default
                $dont_fork = DI::config()->get("system", "worker_dont_fork", false) || !DI::mode()->isBackend();
                $created = DateTimeFormat::utcNow();
+               $delayed = DBA::NULL_DATETIME;
                $force_priority = false;
 
                $run_parameter = array_shift($args);
@@ -1224,6 +1242,9 @@ class Worker
                if (is_int($run_parameter)) {
                        $priority = $run_parameter;
                } elseif (is_array($run_parameter)) {
+                       if (isset($run_parameter['delayed'])) {
+                               $delayed = $run_parameter['delayed'];
+                       }
                        if (isset($run_parameter['priority'])) {
                                $priority = $run_parameter['priority'];
                        }
@@ -1238,8 +1259,9 @@ class Worker
                        }
                }
 
+               $command = array_shift($args);
                $parameters = json_encode($args);
-               $found = DBA::exists('workerqueue', ['parameter' => $parameters, 'done' => false]);
+               $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
                $added = false;
 
                // Quit if there was a database error - a precaution for the update process to 3.5.3
@@ -1248,14 +1270,22 @@ class Worker
                }
 
                if (!$found) {
-                       $added = DBA::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, 'priority' => $priority]);
+                       $added = DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created,
+                               'priority' => $priority, 'next_try' => $delayed]);
                        if (!$added) {
                                return false;
                        }
                } elseif ($force_priority) {
-                       DBA::update('workerqueue', ['priority' => $priority], ['parameter' => $parameters, 'done' => false, 'pid' => 0]);
+                       DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
+               }
+
+               // Set the IPC flag to ensure an immediate process execution via daemon
+               if (DI::config()->get('system', 'worker_daemon_mode', false)) {
+                       self::IPCSetJobState(true);
                }
 
+               self::checkDaemonState();
+
                // Should we quit and wait for the worker to be called as a cronjob?
                if ($dont_fork) {
                        return $added;
@@ -1274,9 +1304,8 @@ class Worker
                        return $added;
                }
 
-               // We tell the daemon that a new job entry exists
+               // Quit on daemon mode
                if (DI::config()->get('system', 'worker_daemon_mode', false)) {
-                       // We don't have to set the IPC flag - this is done in "tooMuchWorkers"
                        return $added;
                }
 
@@ -1286,6 +1315,11 @@ class Worker
                return $added;
        }
 
+       public static function countWorkersByCommand(string $command)
+       {
+               return DBA::count('workerqueue', ['done' => false, 'pid' => 0, 'command' => $command]);
+       }
+
        /**
         * Returns the next retrial level for worker jobs.
         * This function will skip levels when jobs are older.
@@ -1394,4 +1428,107 @@ class Worker
 
                return (bool)$row['jobs'];
        }
+
+       /**
+        * Test if the daemon is running. If not, it will be started
+        *
+        * @return void
+        */
+       private static function checkDaemonState()
+       {
+               if (!DI::config()->get('system', 'daemon_watchdog', false)) {
+                       return;
+               }
+
+               if (!DI::mode()->isNormal()) {
+                       return;
+               }
+
+               // Check every minute if the daemon is running
+               if (DI::config()->get('system', 'last_daemon_check', 0) + 60 > time()) {
+                       return;
+               }
+
+               DI::config()->set('system', 'last_daemon_check', time());
+
+               $pidfile = DI::config()->get('system', 'pidfile');
+               if (empty($pidfile)) {
+                       // No pid file, no daemon
+                       return;
+               }
+
+               if (!is_readable($pidfile)) {
+                       // No pid file. We assume that the daemon had been intentionally stopped.
+                       return;
+               }
+
+               $pid = intval(file_get_contents($pidfile));
+               if (posix_kill($pid, 0)) {
+                       Logger::info('Daemon process is running', ['pid' => $pid]);
+                       return;
+               }
+
+               Logger::warning('Daemon process is not running', ['pid' => $pid]);
+
+               self::spawnDaemon();
+       }
+
+       /**
+        * Spawn a new daemon process
+        *
+        * @return void
+        */
+       private static function spawnDaemon()
+       {
+               Logger::info('Starting new daemon process');
+               $command = 'bin/daemon.php';
+               $a = DI::app();
+               $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), $a->getBasePath(), getmypid());
+               $process->run($command, ['start']);
+               Logger::info('New daemon process started');
+       }
+
+       /**
+        * Check if the system is inside the defined maintenance window
+        *
+        * @return boolean
+        */
+       public static function isInMaintenanceWindow(bool $check_last_execution = false)
+       {
+               // Calculate the seconds of the start end end of the maintenance window
+               $start = strtotime(DI::config()->get('system', 'maintenance_start')) % 86400;
+               $end = strtotime(DI::config()->get('system', 'maintenance_end')) % 86400;
+
+               Logger::info('Maintenance window', ['start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]);
+
+               if ($check_last_execution) {
+                       // Calculate the window duration
+                       $duration = max($start, $end) - min($start, $end);
+
+                       // Quit when the last cron execution had been after the previous window
+                       $last_cron = DI::config()->get('system', 'last_cron_daily');
+                       if ($last_cron + $duration > time()) {
+                               Logger::info('The Daily cron had been executed recently', ['last' => date(DateTimeFormat::MYSQL, $last_cron), 'start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]);
+                               return false;
+                       }
+               }
+
+               $current = time() % 86400;
+
+               if ($start < $end) {
+                       // Execute if we are inside the window
+                       $execute = ($current >= $start) && ($current <= $end);
+               } else {
+                       // Don't execute if we are outside the window
+                       $execute = !(($current > $end) && ($current < $start));
+               }
+
+               if ($execute) {
+                       Logger::info('We are inside the maintenance window', ['current' => date('H:i:s', $current), 'start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]);
+               } else {
+                       Logger::info('We are outside the maintenance window', ['current' => date('H:i:s', $current), 'start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]);
+               }
+               
+               return $execute;
+       }
 }