<?php
+
+use Friendica\App;
+use Friendica\Core\Config;
+
if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) {
$directory = dirname($_SERVER["argv"][0]);
chdir($directory);
}
-use \Friendica\Core\Config;
-
require_once("boot.php");
function poller_run($argv, $argc){
global $a, $db;
if (is_null($a)) {
- $a = new App;
+ $a = new App(dirname(__DIR__));
}
- if (is_null($db)) {
+ if(is_null($db)) {
@include(".htconfig.php");
require_once("include/dba.php");
$db = new dba($db_host, $db_user, $db_pass, $db_data);
$a->start_process();
+ if ($a->min_memory_reached()) {
+ return;
+ }
+
if (poller_max_connections_reached()) {
return;
}
return;
}
- if (($argc <= 1) OR ($argv[1] != "no_cron")) {
+ if(($argc <= 1) OR ($argv[1] != "no_cron")) {
poller_run_cron();
}
while ($r = poller_worker_process()) {
+ // Check free memory
+ if ($a->min_memory_reached()) {
+ return;
+ }
+
// Count active workers and compare them with a maximum value that depends on the load
if (poller_too_much_workers()) {
return;
if (time() > ($starttime + 3600))
return;
}
-
}
/**
return false;
}
- $upd = q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `pid` = 0",
- dbesc(datetime_convert()),
- intval($mypid),
- intval($queue["id"]));
-
- if (!$upd) {
+ if (!dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
+ array('id' => $queue["id"], 'pid' => 0))) {
logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
- q("COMMIT");
+ dba::commit();
return true;
}
$id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
if (!$id) {
logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG);
- q("COMMIT");
+ dba::commit();
return true;
} elseif ((strtotime($id[0]["executed"]) <= 0) OR ($id[0]["pid"] == 0)) {
logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG);
- q("COMMIT");
+ dba::commit();
return true;
} elseif ($id[0]["pid"] != $mypid) {
logger("Queue item ".$queue["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG);
- q("COMMIT");
+ dba::commit();
return true;
}
- q("COMMIT");
+ dba::commit();
$argv = json_decode($queue["parameter"]);
if (!validate_include($include)) {
logger("Include file ".$argv[0]." is not valid!");
- q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
+ dba::delete('workerqueue', array('id' => $queue["id"]));
return true;
}
poller_exec_function($queue, $funcname, $argv);
- q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
+ dba::delete('workerqueue', array('id' => $queue["id"]));
} else {
logger("Function ".$funcname." does not exist");
}
return;
}
- foreach ($r AS $pid)
+ foreach ($r AS $pid) {
if (!posix_kill($pid["pid"], 0)) {
- q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = 0 WHERE `pid` = %d",
- dbesc(NULL_DATE), intval($pid["pid"]));
+ dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
+ array('pid' => $pid["pid"]));
} else {
// Kill long running processes
// Check if the priority is in a valid range
- if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) {
+ if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE)))
$pid["priority"] = PRIORITY_MEDIUM;
- }
// Define the maximum durations
$max_duration_defaults = array(PRIORITY_CRITICAL => 360, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 360);
// We killed the stale process.
// To avoid a blocking situation we reschedule the process at the beginning of the queue.
// Additionally we are lowering the priority.
- q("UPDATE `workerqueue` SET `executed` = '%s', `created` = '%s',
- `priority` = %d, `pid` = 0 WHERE `pid` = %d",
- dbesc(NULL_DATE),
- dbesc(datetime_convert()),
- intval(PRIORITY_NEGLIGIBLE),
- intval($pid["pid"]));
+ dba::update('workerqueue',
+ array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0),
+ array('pid' => $pid["pid"]));
} else {
logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
}
// Decrease the number of workers at higher load
$load = current_load();
- if ($load) {
+ if($load) {
$maxsysload = intval(Config::get("system", "maxloadavg", 50));
$maxworkers = $queues;
*/
function poller_worker_process() {
- q("START TRANSACTION;");
+ dba::transaction();
// Check if we should pass some low priority process
$highest_priority = 0;
killme();
}
-?>