]> git.mxchange.org Git - friendica.git/blobdiff - include/poller.php
It is better this way
[friendica.git] / include / poller.php
index 82befae51842dc4bd1dd49c2f5f3dabeaa144216..ae249ffe46ddc41f8be766534fdfbaa7b4e1c696 100644 (file)
@@ -1,4 +1,8 @@
 <?php
+
+use Friendica\App;
+use Friendica\Core\Config;
+
 if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) {
        $directory = dirname($_SERVER["argv"][0]);
 
@@ -10,15 +14,13 @@ if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) {
        chdir($directory);
 }
 
-use \Friendica\Core\Config;
-
 require_once("boot.php");
 
 function poller_run($argv, $argc){
        global $a, $db;
 
-       if(is_null($a)) {
-               $a = new App;
+       if (is_null($a)) {
+               $a = new App(dirname(__DIR__));
        }
 
        if(is_null($db)) {
@@ -35,8 +37,16 @@ function poller_run($argv, $argc){
                return;
        }
 
+       $a->set_baseurl(Config::get('system', 'url'));
+
+       load_hooks();
+
        $a->start_process();
 
+       if ($a->min_memory_reached()) {
+               return;
+       }
+
        if (poller_max_connections_reached()) {
                return;
        }
@@ -63,6 +73,11 @@ function poller_run($argv, $argc){
 
        while ($r = poller_worker_process()) {
 
+               // Check free memory
+               if ($a->min_memory_reached()) {
+                       return;
+               }
+
                // Count active workers and compare them with a maximum value that depends on the load
                if (poller_too_much_workers()) {
                        return;
@@ -76,7 +91,6 @@ function poller_run($argv, $argc){
                if (time() > ($starttime + 3600))
                        return;
        }
-
 }
 
 /**
@@ -107,14 +121,10 @@ function poller_execute($queue) {
                return false;
        }
 
-       $upd = q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `pid` = 0",
-               dbesc(datetime_convert()),
-               intval($mypid),
-               intval($queue["id"]));
-
-       if (!$upd) {
+       if (!dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
+                       array('id' => $queue["id"], 'pid' => 0))) {
                logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
-               q("COMMIT");
+               dba::commit();
                return true;
        }
 
@@ -122,18 +132,18 @@ function poller_execute($queue) {
        $id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
        if (!$id) {
                logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG);
-               q("COMMIT");
+               dba::commit();
                return true;
        } elseif ((strtotime($id[0]["executed"]) <= 0) OR ($id[0]["pid"] == 0)) {
                logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG);
-               q("COMMIT");
+               dba::commit();
                return true;
        } elseif ($id[0]["pid"] != $mypid) {
                logger("Queue item ".$queue["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG);
-               q("COMMIT");
+               dba::commit();
                return true;
        }
-       q("COMMIT");
+       dba::commit();
 
        $argv = json_decode($queue["parameter"]);
 
@@ -142,7 +152,7 @@ function poller_execute($queue) {
 
        if (!validate_include($include)) {
                logger("Include file ".$argv[0]." is not valid!");
-               q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
+               dba::delete('workerqueue', array('id' => $queue["id"]));
                return true;
        }
 
@@ -154,7 +164,7 @@ function poller_execute($queue) {
 
                poller_exec_function($queue, $funcname, $argv);
 
-               q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
+               dba::delete('workerqueue', array('id' => $queue["id"]));
        } else {
                logger("Function ".$funcname." does not exist");
        }
@@ -353,18 +363,18 @@ function poller_max_connections_reached() {
  *
  */
 function poller_kill_stale_workers() {
-       $r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` != '0000-00-00 00:00:00'");
+       $r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > '%s'", dbesc(NULL_DATE));
 
        if (!dbm::is_result($r)) {
                // No processing here needed
                return;
        }
 
-       foreach($r AS $pid)
-               if (!posix_kill($pid["pid"], 0))
-                       q("UPDATE `workerqueue` SET `executed` = '0000-00-00 00:00:00', `pid` = 0 WHERE `pid` = %d",
-                               intval($pid["pid"]));
-               else {
+       foreach ($r AS $pid) {
+               if (!posix_kill($pid["pid"], 0)) {
+                       dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
+                                       array('pid' => $pid["pid"]));
+               else {
                        // Kill long running processes
 
                        // Check if the priority is in a valid range
@@ -387,14 +397,14 @@ function poller_kill_stale_workers() {
                                // We killed the stale process.
                                // To avoid a blocking situation we reschedule the process at the beginning of the queue.
                                // Additionally we are lowering the priority.
-                               q("UPDATE `workerqueue` SET `executed` = '0000-00-00 00:00:00', `created` = '%s',
-                                                       `priority` = %d, `pid` = 0 WHERE `pid` = %d",
-                                       dbesc(datetime_convert()),
-                                       intval(PRIORITY_NEGLIGIBLE),
-                                       intval($pid["pid"]));
-                       } else
+                               dba::update('workerqueue',
+                                               array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0),
+                                               array('pid' => $pid["pid"]));
+                       } else {
                                logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
+                       }
                }
+       }
 }
 
 /**
@@ -421,15 +431,15 @@ function poller_too_much_workers() {
                $slope = $maxworkers / pow($maxsysload, $exponent);
                $queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent));
 
-               $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00'");
+               $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
                $entries = $s[0]["total"];
 
                if (Config::get("system", "worker_fastlane", false) AND ($queues > 0) AND ($entries > 0) AND ($active >= $queues)) {
-                       $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00' ORDER BY `priority` LIMIT 1");
+                       $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
                        $top_priority = $s[0]["priority"];
 
-                       $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` != '0000-00-00 00:00:00' LIMIT 1",
-                               intval($top_priority));
+                       $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1",
+                               intval($top_priority), dbesc(NULL_DATE));
                        $high_running = dbm::is_result($s);
 
                        if (!$high_running AND ($top_priority > PRIORITY_UNDEFINED) AND ($top_priority < PRIORITY_NEGLIGIBLE)) {
@@ -464,7 +474,7 @@ function poller_too_much_workers() {
                // Are there fewer workers running as possible? Then fork a new one.
                if (!Config::get("system", "worker_dont_fork") AND ($queues > ($active + 1)) AND ($entries > 1)) {
                        logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG);
-                       $args = array("php", "include/poller.php", "no_cron");
+                       $args = array("include/poller.php", "no_cron");
                        $a = get_app();
                        $a->proc_run($args);
                }
@@ -541,7 +551,7 @@ function poller_passing_slow(&$highest_priority) {
  */
 function poller_worker_process() {
 
-       q("START TRANSACTION;");
+       dba::transaction();
 
        // Check if we should pass some low priority process
        $highest_priority = 0;
@@ -549,21 +559,25 @@ function poller_worker_process() {
        if (poller_passing_slow($highest_priority)) {
                // Are there waiting processes with a higher priority than the currently highest?
                $r = q("SELECT * FROM `workerqueue`
-                               WHERE `executed` = '0000-00-00 00:00:00' AND `priority` < %d
-                               ORDER BY `priority`, `created` LIMIT 1", dbesc($highest_priority));
-               if (dbm::is_result($r))
+                               WHERE `executed` <= '%s' AND `priority` < %d
+                               ORDER BY `priority`, `created` LIMIT 1",
+                               dbesc(NULL_DATE),
+                               intval($highest_priority));
+               if (dbm::is_result($r)) {
                        return $r;
-
+               }
                // Give slower processes some processing time
                $r = q("SELECT * FROM `workerqueue`
-                               WHERE `executed` = '0000-00-00 00:00:00' AND `priority` > %d
-                               ORDER BY `priority`, `created` LIMIT 1", dbesc($highest_priority));
+                               WHERE `executed` <= '%s' AND `priority` > %d
+                               ORDER BY `priority`, `created` LIMIT 1",
+                               dbesc(NULL_DATE),
+                               intval($highest_priority));
        }
 
        // If there is no result (or we shouldn't pass lower processes) we check without priority limit
-       if (($highest_priority == 0) OR !dbm::is_result($r))
-               $r = q("SELECT * FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00' ORDER BY `priority`, `created` LIMIT 1");
-
+       if (($highest_priority == 0) OR !dbm::is_result($r)) {
+               $r = q("SELECT * FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority`, `created` LIMIT 1", dbesc(NULL_DATE));
+       }
        return $r;
 }
 
@@ -571,7 +585,7 @@ function poller_worker_process() {
  * @brief Call the front end worker
  */
 function call_worker() {
-       if (!Config::get("system", "frontend_worker") OR !Config::get("system", "worker")) {
+       if (!Config::get("system", "frontend_worker")) {
                return;
        }
 
@@ -583,7 +597,7 @@ function call_worker() {
  * @brief Call the front end worker if there aren't any active
  */
 function call_worker_if_idle() {
-       if (!Config::get("system", "frontend_worker") OR !Config::get("system", "worker")) {
+       if (!Config::get("system", "frontend_worker")) {
                return;
        }
 
@@ -610,7 +624,7 @@ function call_worker_if_idle() {
 
                logger('Call poller', LOGGER_DEBUG);
 
-               $args = array("php", "include/poller.php", "no_cron");
+               $args = array("include/poller.php", "no_cron");
                $a = get_app();
                $a->proc_run($args);
                return;
@@ -667,4 +681,3 @@ if (array_search(__file__,get_included_files())===0){
 
        killme();
 }
-?>