]> git.mxchange.org Git - friendica.git/blobdiff - include/poller.php
It is better this way
[friendica.git] / include / poller.php
index 27f8c7831a12657032cff3dbf8e2c91492a9a964..ae249ffe46ddc41f8be766534fdfbaa7b4e1c696 100644 (file)
@@ -1,4 +1,8 @@
 <?php
+
+use Friendica\App;
+use Friendica\Core\Config;
+
 if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) {
        $directory = dirname($_SERVER["argv"][0]);
 
@@ -10,18 +14,16 @@ if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) {
        chdir($directory);
 }
 
-use \Friendica\Core\Config;
-
 require_once("boot.php");
 
 function poller_run($argv, $argc){
        global $a, $db;
 
        if (is_null($a)) {
-               $a = new App;
+               $a = new App(dirname(__DIR__));
        }
 
-       if (is_null($db)) {
+       if(is_null($db)) {
                @include(".htconfig.php");
                require_once("include/dba.php");
                $db = new dba($db_host, $db_user, $db_pass, $db_data);
@@ -41,6 +43,10 @@ function poller_run($argv, $argc){
 
        $a->start_process();
 
+       if ($a->min_memory_reached()) {
+               return;
+       }
+
        if (poller_max_connections_reached()) {
                return;
        }
@@ -49,7 +55,7 @@ function poller_run($argv, $argc){
                return;
        }
 
-       if (($argc <= 1) OR ($argv[1] != "no_cron")) {
+       if(($argc <= 1) OR ($argv[1] != "no_cron")) {
                poller_run_cron();
        }
 
@@ -67,6 +73,11 @@ function poller_run($argv, $argc){
 
        while ($r = poller_worker_process()) {
 
+               // Check free memory
+               if ($a->min_memory_reached()) {
+                       return;
+               }
+
                // Count active workers and compare them with a maximum value that depends on the load
                if (poller_too_much_workers()) {
                        return;
@@ -80,7 +91,6 @@ function poller_run($argv, $argc){
                if (time() > ($starttime + 3600))
                        return;
        }
-
 }
 
 /**
@@ -111,14 +121,10 @@ function poller_execute($queue) {
                return false;
        }
 
-       $upd = q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `pid` = 0",
-               dbesc(datetime_convert()),
-               intval($mypid),
-               intval($queue["id"]));
-
-       if (!$upd) {
+       if (!dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
+                       array('id' => $queue["id"], 'pid' => 0))) {
                logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
-               q("COMMIT");
+               dba::commit();
                return true;
        }
 
@@ -126,18 +132,18 @@ function poller_execute($queue) {
        $id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
        if (!$id) {
                logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG);
-               q("COMMIT");
+               dba::commit();
                return true;
        } elseif ((strtotime($id[0]["executed"]) <= 0) OR ($id[0]["pid"] == 0)) {
                logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG);
-               q("COMMIT");
+               dba::commit();
                return true;
        } elseif ($id[0]["pid"] != $mypid) {
                logger("Queue item ".$queue["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG);
-               q("COMMIT");
+               dba::commit();
                return true;
        }
-       q("COMMIT");
+       dba::commit();
 
        $argv = json_decode($queue["parameter"]);
 
@@ -146,7 +152,7 @@ function poller_execute($queue) {
 
        if (!validate_include($include)) {
                logger("Include file ".$argv[0]." is not valid!");
-               q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
+               dba::delete('workerqueue', array('id' => $queue["id"]));
                return true;
        }
 
@@ -158,7 +164,7 @@ function poller_execute($queue) {
 
                poller_exec_function($queue, $funcname, $argv);
 
-               q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
+               dba::delete('workerqueue', array('id' => $queue["id"]));
        } else {
                logger("Function ".$funcname." does not exist");
        }
@@ -364,17 +370,16 @@ function poller_kill_stale_workers() {
                return;
        }
 
-       foreach ($r AS $pid)
+       foreach ($r AS $pid) {
                if (!posix_kill($pid["pid"], 0)) {
-                       q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = 0 WHERE `pid` = %d",
-                               dbesc(NULL_DATE), intval($pid["pid"]));
+                       dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
+                                       array('pid' => $pid["pid"]));
                } else {
                        // Kill long running processes
 
                        // Check if the priority is in a valid range
-                       if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) {
+                       if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE)))
                                $pid["priority"] = PRIORITY_MEDIUM;
-                       }
 
                        // Define the maximum durations
                        $max_duration_defaults = array(PRIORITY_CRITICAL => 360, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 360);
@@ -392,12 +397,9 @@ function poller_kill_stale_workers() {
                                // We killed the stale process.
                                // To avoid a blocking situation we reschedule the process at the beginning of the queue.
                                // Additionally we are lowering the priority.
-                               q("UPDATE `workerqueue` SET `executed` = '%s', `created` = '%s',
-                                                       `priority` = %d, `pid` = 0 WHERE `pid` = %d",
-                                       dbesc(NULL_DATE),
-                                       dbesc(datetime_convert()),
-                                       intval(PRIORITY_NEGLIGIBLE),
-                                       intval($pid["pid"]));
+                               dba::update('workerqueue',
+                                               array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0),
+                                               array('pid' => $pid["pid"]));
                        } else {
                                logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
                        }
@@ -419,7 +421,7 @@ function poller_too_much_workers() {
 
        // Decrease the number of workers at higher load
        $load = current_load();
-       if ($load) {
+       if($load) {
                $maxsysload = intval(Config::get("system", "maxloadavg", 50));
 
                $maxworkers = $queues;
@@ -549,7 +551,7 @@ function poller_passing_slow(&$highest_priority) {
  */
 function poller_worker_process() {
 
-       q("START TRANSACTION;");
+       dba::transaction();
 
        // Check if we should pass some low priority process
        $highest_priority = 0;
@@ -679,4 +681,3 @@ if (array_search(__file__,get_included_files())===0){
 
        killme();
 }
-?>