X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=include%2Fpoller.php;h=ae249ffe46ddc41f8be766534fdfbaa7b4e1c696;hb=cb3077b7a9023a65b6dc13df065bfbbe7f7633fb;hp=27f8c7831a12657032cff3dbf8e2c91492a9a964;hpb=09851331a9dc8601919cd0c9200686b92843d235;p=friendica.git diff --git a/include/poller.php b/include/poller.php index 27f8c7831a..ae249ffe46 100644 --- a/include/poller.php +++ b/include/poller.php @@ -1,4 +1,8 @@ start_process(); + if ($a->min_memory_reached()) { + return; + } + if (poller_max_connections_reached()) { return; } @@ -49,7 +55,7 @@ function poller_run($argv, $argc){ return; } - if (($argc <= 1) OR ($argv[1] != "no_cron")) { + if(($argc <= 1) OR ($argv[1] != "no_cron")) { poller_run_cron(); } @@ -67,6 +73,11 @@ function poller_run($argv, $argc){ while ($r = poller_worker_process()) { + // Check free memory + if ($a->min_memory_reached()) { + return; + } + // Count active workers and compare them with a maximum value that depends on the load if (poller_too_much_workers()) { return; @@ -80,7 +91,6 @@ function poller_run($argv, $argc){ if (time() > ($starttime + 3600)) return; } - } /** @@ -111,14 +121,10 @@ function poller_execute($queue) { return false; } - $upd = q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `pid` = 0", - dbesc(datetime_convert()), - intval($mypid), - intval($queue["id"])); - - if (!$upd) { + if (!dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid), + array('id' => $queue["id"], 'pid' => 0))) { logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG); - q("COMMIT"); + dba::commit(); return true; } @@ -126,18 +132,18 @@ function poller_execute($queue) { $id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($queue["id"])); if (!$id) { logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG); - q("COMMIT"); + dba::commit(); return true; } elseif ((strtotime($id[0]["executed"]) <= 0) OR ($id[0]["pid"] == 0)) { logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG); - q("COMMIT"); + dba::commit(); return true; } elseif ($id[0]["pid"] != $mypid) { logger("Queue item ".$queue["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG); - q("COMMIT"); + dba::commit(); return true; } - q("COMMIT"); + dba::commit(); $argv = json_decode($queue["parameter"]); @@ -146,7 +152,7 @@ function poller_execute($queue) { if (!validate_include($include)) { logger("Include file ".$argv[0]." is not valid!"); - q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($queue["id"])); + dba::delete('workerqueue', array('id' => $queue["id"])); return true; } @@ -158,7 +164,7 @@ function poller_execute($queue) { poller_exec_function($queue, $funcname, $argv); - q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($queue["id"])); + dba::delete('workerqueue', array('id' => $queue["id"])); } else { logger("Function ".$funcname." does not exist"); } @@ -364,17 +370,16 @@ function poller_kill_stale_workers() { return; } - foreach ($r AS $pid) + foreach ($r AS $pid) { if (!posix_kill($pid["pid"], 0)) { - q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = 0 WHERE `pid` = %d", - dbesc(NULL_DATE), intval($pid["pid"])); + dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), + array('pid' => $pid["pid"])); } else { // Kill long running processes // Check if the priority is in a valid range - if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) { + if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) $pid["priority"] = PRIORITY_MEDIUM; - } // Define the maximum durations $max_duration_defaults = array(PRIORITY_CRITICAL => 360, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 360); @@ -392,12 +397,9 @@ function poller_kill_stale_workers() { // We killed the stale process. // To avoid a blocking situation we reschedule the process at the beginning of the queue. // Additionally we are lowering the priority. - q("UPDATE `workerqueue` SET `executed` = '%s', `created` = '%s', - `priority` = %d, `pid` = 0 WHERE `pid` = %d", - dbesc(NULL_DATE), - dbesc(datetime_convert()), - intval(PRIORITY_NEGLIGIBLE), - intval($pid["pid"])); + dba::update('workerqueue', + array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0), + array('pid' => $pid["pid"])); } else { logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG); } @@ -419,7 +421,7 @@ function poller_too_much_workers() { // Decrease the number of workers at higher load $load = current_load(); - if ($load) { + if($load) { $maxsysload = intval(Config::get("system", "maxloadavg", 50)); $maxworkers = $queues; @@ -549,7 +551,7 @@ function poller_passing_slow(&$highest_priority) { */ function poller_worker_process() { - q("START TRANSACTION;"); + dba::transaction(); // Check if we should pass some low priority process $highest_priority = 0; @@ -679,4 +681,3 @@ if (array_search(__file__,get_included_files())===0){ killme(); } -?>