*
* This script was taken from http://php.net/manual/en/function.pcntl-fork.php
*/
-function shutdown() {
- posix_kill(posix_getpid(), SIGHUP);
+
+use Friendica\App;
+use Friendica\BaseObject;
+use Friendica\Core\Config;
+use Friendica\Core\Worker;
+
+// Ensure that daemon.php is executed from the base path of the installation
+if (!file_exists("boot.php") && (sizeof($_SERVER["argv"]) != 0)) {
+ $directory = dirname($_SERVER["argv"][0]);
+
+ if (substr($directory, 0, 1) != "/") {
+ $directory = $_SERVER["PWD"]."/".$directory;
+ }
+ $directory = realpath($directory."/..");
+
+ chdir($directory);
+}
+
+require_once "boot.php";
+require_once "include/dba.php";
+
+$a = new App(dirname(__DIR__));
+BaseObject::setApp($a);
+
+require_once ".htconfig.php";
+dba::connect($db_host, $db_user, $db_pass, $db_data);
+
+Config::load();
+
+if (!isset($pidfile)) {
+ die('Please specify a pid file in the variable $pidfile in the .htconfig.php. For example:'."\n".
+ '$pidfile = "/path/to/daemon.pid";'."\n");
}
if (in_array("start", $_SERVER["argv"])) {
die("Unexpected script behaviour. This message should never occur.\n");
}
-// Fetch the base directory
-$directory = dirname($_SERVER["argv"][0]);
+$pid = @file_get_contents($pidfile);
-if (substr($directory, 0, 1) != "/") {
- $directory = $_SERVER["PWD"]."/".$directory;
-}
-$directory = realpath($directory."/..");
-
-include $directory."/.htconfig.php";
-
-if (!isset($pidfile)) {
- die('Please specify a pid file in the variable $pidfile in the .htconfig.php. For example:'."\n".
- '$pidfile = "/path/to/daemon.pid";'."\n");
-}
-
-if (in_array($mode, array("stop", "status"))) {
- $pid = @file_get_contents($pidfile);
-
- if (!$pid) {
- die("Pidfile wasn't found. Is the daemon running?\n");
- }
+if (empty($pid) && in_array($mode, ["stop", "status"])) {
+ Config::set('system', 'worker_daemon_mode', false);
+ die("Pidfile wasn't found. Is the daemon running?\n");
}
if ($mode == "status") {
unlink($pidfile);
+ Config::set('system', 'worker_daemon_mode', false);
die("Daemon process $pid isn't running.\n");
}
unlink($pidfile);
+ logger("Worker daemon process $pid was killed.", LOGGER_DEBUG);
+
+ Config::set('system', 'worker_daemon_mode', false);
die("Worker daemon process $pid was killed.\n");
}
-echo "Starting worker daemon.\n";
-
-if (isset($a->config['php_path'])) {
- $php = $a->config['php_path'];
-} else {
- $php = "php";
+if (!empty($pid) && posix_kill($pid, 0)) {
+ die("Daemon process $pid is already running.\n");
}
+logger('Starting worker daemon.', LOGGER_DEBUG);
+echo "Starting worker daemon.\n";
+
// Switch over to daemon mode.
if ($pid = pcntl_fork())
return; // Parent
if ($pid = pcntl_fork())
return; // Parent
-$pid = getmypid();
-file_put_contents($pidfile, $pid);
-
-// Now running as a daemon.
-while (true) {
- // Just to be sure that this script really runs endlessly
- set_time_limit(0);
+// We lose the database connection upon forking
+dba::connect($db_host, $db_user, $db_pass, $db_data);
+unset($db_host, $db_user, $db_pass, $db_data);
- // Call the worker
- $cmdline = $php.' bin/worker.php';
+Config::set('system', 'worker_daemon_mode', true);
- $executed = false;
+// Just to be sure that this script really runs endlessly
+set_time_limit(0);
- if (function_exists('proc_open')) {
- $resource = proc_open($cmdline . ' &', array(), $foo, $directory);
+$pid = getmypid();
+file_put_contents($pidfile, $pid);
- if (is_resource($resource)) {
- $executed = true;
- proc_close($resource);
- }
- }
+$wait_interval = intval(Config::get('system', 'cron_interval', 5)) * 60;
- if (!$executed) {
- exec($cmdline.' spawn');
- }
+// Now running as a daemon.
+while (true) {
+ logger('Call the worker', LOGGER_DEBUG);
+ Worker::spawnWorker();
+
+ logger("Sleep for $wait_interval seconds - or when a worker needs to be called", LOGGER_DEBUG);
+ $i = 0;
+ do {
+ sleep(1);
+ } while (($i++ < $wait_interval) && !Worker::IPCJobsExists());
+}
- // Now sleep for 5 minutes
- sleep(300);
+function shutdown() {
+ posix_kill(posix_getpid(), SIGHUP);
}
// We now start the process. This is done after the load check since this could increase the load.
self::startProcess();
+ // The daemon doesn't need to fork new workers anymore, since we started a process
+ if (Config::get('system', 'worker_daemon_mode', false)) {
+ self::IPCSetJobState(false);
+ }
+
// Kill stale processes every 5 minutes
- $last_cleanup = Config::get('system', 'poller_last_cleaned', 0);
+ $last_cleanup = Config::get('system', 'worker_last_cleaned', 0);
if (time() > ($last_cleanup + 300)) {
- Config::set('system', 'poller_last_cleaned', time());
+ Config::set('system', 'worker_last_cleaned', time());
self::killStaleWorkers();
}
}
// If possible we will fetch new jobs for this worker
- if (!$refetched && Lock::set('poller_worker_process', 0)) {
+ if (!$refetched && Lock::set('worker_process', 0)) {
$stamp = (float)microtime(true);
$refetched = self::findWorkerProcesses($passing_slow);
self::$db_duration += (microtime(true) - $stamp);
- Lock::remove('poller_worker_process');
+ Lock::remove('worker_process');
}
}
// To avoid the quitting of multiple workers only one worker at a time will execute the check
- if (Lock::set('poller_worker', 0)) {
+ if (Lock::set('worker', 0)) {
$stamp = (float)microtime(true);
// Count active workers and compare them with a maximum value that depends on the load
if (self::tooMuchWorkers()) {
logger('Memory limit reached, quitting.', LOGGER_DEBUG);
return;
}
- Lock::remove('poller_worker');
+ Lock::remove('worker');
self::$db_duration += (microtime(true) - $stamp);
}
return;
}
}
+ if (Config::get('system', 'worker_daemon_mode', false)) {
+ self::IPCSetJobState(false);
+ }
logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
}
$stamp = (float)microtime(true);
if (dba::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) {
- Config::set('system', 'last_poller_execution', DateTimeFormat::utcNow());
+ Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow());
}
self::$db_duration = (microtime(true) - $stamp);
$stamp = (float)microtime(true);
if (dba::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) {
- Config::set('system', 'last_poller_execution', DateTimeFormat::utcNow());
+ Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow());
}
self::$db_duration = (microtime(true) - $stamp);
} else {
dba::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $ids);
}
+ // The daemon doesn't need to fork new workers anymore, since we are inside the worker
+ if (Config::get('system', 'worker_daemon_mode', false)) {
+ self::IPCSetJobState(false);
+ }
+
return $found;
}
dba::close($r);
$stamp = (float)microtime(true);
- if (!Lock::set('poller_worker_process')) {
+ if (!Lock::set('worker_process')) {
return false;
}
self::$lock_duration = (microtime(true) - $stamp);
$found = self::findWorkerProcesses($passing_slow);
self::$db_duration += (microtime(true) - $stamp);
- Lock::remove('poller_worker_process');
+ Lock::remove('worker_process');
if ($found) {
$r = dba::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]);
dba::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, 'priority' => $priority]);
}
+ // We tell the daemon that a new job entry exists
+ if (Config::get('system', 'worker_daemon_mode', false)) {
+ self::IPCSetJobState(true);
+ return true;
+ }
+
// Should we quit and wait for the worker to be called as a cronjob?
if ($dont_fork) {
return true;
}
// If there is a lock then we don't have to check for too much worker
- if (!Lock::set('poller_worker', 0)) {
+ if (!Lock::set('worker', 0)) {
return true;
}
// If there are already enough workers running, don't fork another one
$quit = self::tooMuchWorkers();
- Lock::remove('poller_worker');
+ Lock::remove('worker');
if ($quit) {
return true;
{
return Process::deleteByPid();
}
+
+ private static function checkIPC()
+ {
+ dba::e("CREATE TABLE IF NOT EXISTS `worker-ipc` (`key` integer, `jobs` boolean) ENGINE = MEMORY;");
+ }
+
+ public static function IPCSetJobState($jobs)
+ {
+ self::checkIPC();
+
+ dba::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true);
+ }
+
+ public static function IPCJobsExists()
+ {
+ self::checkIPC();
+
+ $row = dba::selectFirst('worker-ipc', ['jobs'], ['key' => 1]);
+
+ // When we don't have a row, no job is running
+ if (!DBM::is_result($row)) {
+ return false;
+ }
+
+ return (bool)$row['jobs'];
+ }
}