$mode = "status";
}
+$foreground = in_array("--foreground", $_SERVER["argv"]);
+
if (!isset($mode)) {
die("Please use either 'start', 'stop' or 'status'.\n");
}
}
logger('Starting worker daemon.', LOGGER_DEBUG);
-echo "Starting worker daemon.\n";
-// Switch over to daemon mode.
-if ($pid = pcntl_fork()) {
- return; // Parent
-}
+if (!$foreground) {
+ echo "Starting worker daemon.\n";
-fclose(STDIN); // Close all of the standard
-fclose(STDOUT); // file descriptors as we
-fclose(STDERR); // are running as a daemon.
+ // Switch over to daemon mode.
+ if ($pid = pcntl_fork()) {
+ return; // Parent
+ }
-dba::disconnect();
+ fclose(STDIN); // Close all of the standard
+ fclose(STDOUT); // file descriptors as we
+ fclose(STDERR); // are running as a daemon.
-register_shutdown_function('shutdown');
+ dba::disconnect();
-if (posix_setsid() < 0) {
- return;
-}
+ register_shutdown_function('shutdown');
-if ($pid = pcntl_fork()) {
- return; // Parent
+ if (posix_setsid() < 0) {
+ return;
+ }
+
+ if ($pid = pcntl_fork()) {
+ return; // Parent
+ }
+
+ $pid = getmypid();
+ file_put_contents($pidfile, $pid);
+
+ // We lose the database connection upon forking
+ dba::connect($db_host, $db_user, $db_pass, $db_data);
}
-// We lose the database connection upon forking
-dba::connect($db_host, $db_user, $db_pass, $db_data);
unset($db_host, $db_user, $db_pass, $db_data);
Config::set('system', 'worker_daemon_mode', true);
// Just to be sure that this script really runs endlessly
set_time_limit(0);
-$pid = getmypid();
-file_put_contents($pidfile, $pid);
-
$wait_interval = intval(Config::get('system', 'cron_interval', 5)) * 60;
$do_cron = true;
}
logger("Sleeping", LOGGER_DEBUG);
- $i = 0;
+ $start = time();
do {
- sleep(1);
- } while (($i++ < $wait_interval) && !Worker::IPCJobsExists());
+ $seconds = (time() - $start);
+
+ // logarithmic wait time calculation.
+ // Background: After jobs had been started, they often fork many workers.
+ // To not waste too much time, the sleep period increases.
+ $arg = (($seconds + 1) / ($wait_interval / 9)) + 1;
+ $sleep = round(log10($arg) * 1000000, 0);
+ usleep($sleep);
+
+ $timeout = ($seconds >= $wait_interval);
+ } while (!$timeout && !Worker::IPCJobsExists());
- if ($i >= $wait_interval) {
+ if ($timeout) {
$do_cron = true;
logger("Woke up after $wait_interval seconds.", LOGGER_DEBUG);
} else {
// We now start the process. This is done after the load check since this could increase the load.
self::startProcess();
- // The daemon doesn't need to fork new workers anymore, since we started a process
- if (Config::get('system', 'worker_daemon_mode', false)) {
- self::IPCSetJobState(false);
- }
-
// Kill stale processes every 5 minutes
$last_cleanup = Config::get('system', 'worker_last_cleaned', 0);
if (time() > ($last_cleanup + 300)) {
return;
}
}
+
+ // Cleaning up. Possibly not needed, but it doesn't harm anything.
if (Config::get('system', 'worker_daemon_mode', false)) {
self::IPCSetJobState(false);
}
// Are there fewer workers running as possible? Then fork a new one.
if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) {
logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG);
- self::spawnWorker();
+ if (Config::get('system', 'worker_daemon_mode', false)) {
+ self::IPCSetJobState(true);
+ } else {
+ self::spawnWorker();
+ }
}
}
+ // if there are too much worker, we down't spawn a new one.
+ if (Config::get('system', 'worker_daemon_mode', false) && ($active >= $queues)) {
+ self::IPCSetJobState(false);
+ }
+
return $active >= $queues;
}
dba::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $ids);
}
- // The daemon doesn't need to fork new workers anymore, since we are inside the worker
- if (Config::get('system', 'worker_daemon_mode', false)) {
- self::IPCSetJobState(false);
- }
-
return $found;
}
}
get_app()->proc_run($args);
+
+ // after spawning we have to remove the flag.
+ if (Config::get('system', 'worker_daemon_mode', false)) {
+ self::IPCSetJobState(false);
+ }
}
/**
return true;
}
- // We tell the daemon that a new job entry exists
- if (Config::get('system', 'worker_daemon_mode', false)) {
- self::IPCSetJobState(true);
- return true;
- }
-
// If there is a lock then we don't have to check for too much worker
if (!Lock::set('worker', 0)) {
return true;
return true;
}
+ // We tell the daemon that a new job entry exists
+ if (Config::get('system', 'worker_daemon_mode', false)) {
+ // We don't have to set the IPC flag - this is done in "tooMuchWorkers"
+ return true;
+ }
+
// Now call the worker to execute the jobs that we just added to the queue
self::spawnWorker();