namespace Friendica\Core;
use Friendica\Database\DBA;
-use Friendica\Database\DBM;
use Friendica\Model\Process;
use Friendica\Util\DateTimeFormat;
use Friendica\Util\Network;
+use Friendica\BaseObject;
require_once 'include/dba.php';
}
// Do we have too few memory?
- if ($a->min_memory_reached()) {
+ if ($a->isMinMemoryReached()) {
logger('Pre check: Memory limit reached, quitting.', LOGGER_DEBUG);
return;
}
// Count active workers and compare them with a maximum value that depends on the load
if (self::tooMuchWorkers()) {
logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
+ Lock::release('worker');
return;
}
// Check free memory
- if ($a->min_memory_reached()) {
+ if ($a->isMinMemoryReached()) {
logger('Memory limit reached, quitting.', LOGGER_DEBUG);
+ Lock::release('worker');
return;
}
Lock::release('worker');
*/
private static function totalEntries()
{
- return DBA::count('workerqueue', ["`executed` <= ? AND NOT `done`", NULL_DATE]);
+ return DBA::count('workerqueue', ["`executed` <= ? AND NOT `done` AND `next_try` < ?",
+ NULL_DATE, DateTimeFormat::utcNow()]);
}
/**
*/
private static function highestPriority()
{
- $condition = ["`executed` <= ? AND NOT `done`", NULL_DATE];
+ $condition = ["`executed` <= ? AND NOT `done` AND `next_try` < ?", NULL_DATE, DateTimeFormat::utcNow()];
$workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]);
- if (DBM::is_result($workerqueue)) {
+ if (DBA::isResult($workerqueue)) {
return $workerqueue["priority"];
} else {
return 0;
*/
private static function processWithPriorityActive($priority)
{
- $condition = ["`priority` <= ? AND `executed` > ? AND NOT `done`", $priority, NULL_DATE];
+ $condition = ["`priority` <= ? AND `executed` > ? AND NOT `done` AND `next_try` < ?",
+ $priority, NULL_DATE, DateTimeFormat::utcNow()];
return DBA::exists('workerqueue', $condition);
}
self::execFunction($queue, $include, $argv, true);
$stamp = (float)microtime(true);
- if (DBA::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) {
+ if (DBA::update('workerqueue', ['done' => true], ['id' => $queue['id']])) {
Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow());
}
self::$db_duration = (microtime(true) - $stamp);
if ($max == 0) {
// the maximum number of possible user connections can be a system variable
- $r = DBA::fetch_first("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
- if (DBM::is_result($r)) {
+ $r = DBA::fetchFirst("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
+ if (DBA::isResult($r)) {
$max = $r["Value"];
}
// Or it can be granted. This overrides the system variable
// The processlist only shows entries of the current user
if ($max != 0) {
$r = DBA::p('SHOW PROCESSLIST');
- $used = DBA::num_rows($r);
+ $used = DBA::numRows($r);
DBA::close($r);
logger("Connection usage (user values): ".$used."/".$max, LOGGER_DEBUG);
// We will now check for the system values.
// This limit could be reached although the user limits are fine.
- $r = DBA::fetch_first("SHOW VARIABLES WHERE `variable_name` = 'max_connections'");
- if (!DBM::is_result($r)) {
+ $r = DBA::fetchFirst("SHOW VARIABLES WHERE `variable_name` = 'max_connections'");
+ if (!DBA::isResult($r)) {
return false;
}
$max = intval($r["Value"]);
if ($max == 0) {
return false;
}
- $r = DBA::fetch_first("SHOW STATUS WHERE `variable_name` = 'Threads_connected'");
- if (!DBM::is_result($r)) {
+ $r = DBA::fetchFirst("SHOW STATUS WHERE `variable_name` = 'Threads_connected'");
+ if (!DBA::isResult($r)) {
return false;
}
$used = intval($r["Value"]);
// How long is the process already running?
$duration = (time() - strtotime($entry["executed"])) / 60;
if ($duration > $max_duration) {
- logger("Worker process ".$entry["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now.");
+ logger("Worker process ".$entry["pid"]." (".substr(json_encode($argv), 0, 50).") took more than ".$max_duration." minutes. It will be killed now.");
posix_kill($entry["pid"], SIGTERM);
// We killed the stale process.
$active = self::activeWorkers();
// Decrease the number of workers at higher load
- $load = current_load();
+ $load = System::currentLoad();
if ($load) {
$maxsysload = intval(Config::get("system", "maxloadavg", 50));
);
// No active processes at all? Fine
- if (!DBM::is_result($r)) {
+ if (!DBA::isResult($r)) {
return false;
}
$priorities = [];
$result = DBA::select(
'workerqueue',
['id'],
- ["`executed` <= ? AND `priority` < ? AND NOT `done`", NULL_DATE, $highest_priority],
+ ["`executed` <= ? AND `priority` < ? AND NOT `done` AND `next_try` < ?",
+ NULL_DATE, $highest_priority, DateTimeFormat::utcNow()],
['limit' => $limit, 'order' => ['priority', 'created']]
);
$result = DBA::select(
'workerqueue',
['id'],
- ["`executed` <= ? AND `priority` > ? AND NOT `done`", NULL_DATE, $highest_priority],
+ ["`executed` <= ? AND `priority` > ? AND NOT `done` AND `next_try` < ?",
+ NULL_DATE, $highest_priority, DateTimeFormat::utcNow()],
['limit' => $limit, 'order' => ['priority', 'created']]
);
$result = DBA::select(
'workerqueue',
['id'],
- ["`executed` <= ? AND NOT `done`", NULL_DATE],
+ ["`executed` <= ? AND NOT `done` AND `next_try` < ?",
+ NULL_DATE, DateTimeFormat::utcNow()],
['limit' => $limit, 'order' => ['priority', 'created']]
);
// There can already be jobs for us in the queue.
$r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]);
- if (DBM::is_result($r)) {
+ if (DBA::isResult($r)) {
self::$db_duration += (microtime(true) - $stamp);
- return DBA::inArray($r);
+ return DBA::toArray($r);
}
DBA::close($r);
if ($found) {
$r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]);
- return DBA::inArray($r);
+ return DBA::toArray($r);
}
return false;
}
*/
public static function spawnWorker($do_cron = false)
{
- $args = ["bin/worker.php"];
+ $command = 'bin/worker.php';
- if (!$do_cron) {
- $args[] = "no_cron";
- }
+ $args = ['no_cron' => !$do_cron];
- get_app()->proc_run($args);
+ get_app()->proc_run($command, $args);
// after spawning we have to remove the flag.
if (Config::get('system', 'worker_daemon_mode', false)) {
return true;
}
+ /**
+ * Defers the current worker entry
+ */
+ public static function defer()
+ {
+ if (empty(BaseObject::getApp()->queue)) {
+ return;
+ }
+
+ $queue = BaseObject::getApp()->queue;
+
+ $retrial = $queue['retrial'];
+ $id = $queue['id'];
+
+ if ($retrial > 14) {
+ logger('Id ' . $id . ' had been tried 14 times, it will be deleted now.', LOGGER_DEBUG);
+ DBA::delete('workerqueue', ['id' => $id]);
+ }
+
+ // Calculate the delay until the next trial
+ $delay = (($retrial + 3) ** 4) + (rand(1, 30) * ($retrial + 1));
+ $next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
+
+ logger('Defer execution ' . $retrial . ' of id ' . $id . ' to ' . $next, LOGGER_DEBUG);
+
+ $fields = ['retrial' => $retrial + 1, 'next_try' => $next, 'executed' => NULL_DATE, 'pid' => 0];
+ DBA::update('workerqueue', $fields, ['id' => $id]);
+ }
+
/**
* Log active processes into the "process" table
*
$row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]);
// When we don't have a row, no job is running
- if (!DBM::is_result($row)) {
+ if (!DBA::isResult($row)) {
return false;
}