]> git.mxchange.org Git - friendica.git/blobdiff - include/poller.php
Merge remote-tracking branch 'upstream/develop' into 1706-worker-even-faster
[friendica.git] / include / poller.php
index cc8edce656969ba4e8743adac09d479724d2edee..8784931d3c6106652d3d542490769702f7a3e470 100644 (file)
@@ -86,13 +86,21 @@ function poller_run($argv, $argc){
 
                // If we got that queue entry we claim it for us
                if (!poller_claim_process($r[0])) {
+                       dba::unlock();
                        continue;
+               } else {
+                       // Fetch all workerqueue data while the table is still locked
+                       // This is redundant, but this speeds up the processing
+                       $entries = poller_total_entries();
+                       $top_priority = poller_highest_priority();
+                       $high_running = poller_process_with_priority_active($top_priority);
+                       dba::unlock();
                }
 
                // To avoid the quitting of multiple pollers only one poller at a time will execute the check
                if (Lock::set('poller_worker', 0)) {
                        // Count active workers and compare them with a maximum value that depends on the load
-                       if (poller_too_much_workers()) {
+                       if (poller_too_much_workers($entries, $top_priority, $high_running)) {
                                logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
                                return;
                        }
@@ -120,6 +128,39 @@ function poller_run($argv, $argc){
        logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
 }
 
+/**
+ * @brief Returns the number of non executed entries in the worker queue
+ *
+ * @return integer Number of non executed entries in the worker queue
+ */
+function poller_total_entries() {
+       $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
+       return $s[0]["total"];
+}
+
+/**
+ * @brief Returns the highest priority in the worker queue that isn't executed
+ *
+ * @return integer Number of active poller processes
+ */
+function poller_highest_priority() {
+       $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
+       return $s[0]["priority"];
+}
+
+/**
+ * @brief Returns if a process with the given priority is running
+ *
+ * @param integer $priority The priority that should be checked
+ *
+ * @return integer Is there a process running with that priority?
+ */
+function poller_process_with_priority_active($priority) {
+       $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1",
+                       intval($priority), dbesc(NULL_DATE));
+       return dbm::is_result($s);
+}
+
 /**
  * @brief Execute a worker entry
  *
@@ -421,9 +462,13 @@ function poller_kill_stale_workers() {
 /**
  * @brief Checks if the number of active workers exceeds the given limits
  *
+ * @param integer $entries The number of not executed entries in the worker queue
+ * @param integer $top_priority The highest not executed priority in the worker queue
+ * @param boolean $high_running Is a process with priority "$top_priority" running?
+ *
  * @return bool Are there too much workers running?
  */
-function poller_too_much_workers() {
+function poller_too_much_workers($entries = NULL, $top_priority = NULL, $high_running = NULL) {
        $queues = Config::get("system", "worker_queues", 4);
 
        $maxqueues = $queues;
@@ -442,39 +487,40 @@ function poller_too_much_workers() {
                $slope = $maxworkers / pow($maxsysload, $exponent);
                $queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent));
 
-               // Create a list of queue entries grouped by their priority
-               $listitem = array();
+               if (Config::get('system', 'worker_debug')) {
+                       // Create a list of queue entries grouped by their priority
+                       $listitem = array();
 
-               // Adding all processes with no workerqueue entry
-               $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)");
-               if ($process = dba::fetch($processes)) {
-                       $listitem[0] = "0:".$process["running"];
-               }
-               dba::close($processes);
-
-               // Now adding all processes with workerqueue entries
-               $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`");
-               while ($entry = dba::fetch($entries)) {
-                       $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]);
+                       // Adding all processes with no workerqueue entry
+                       $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)");
                        if ($process = dba::fetch($processes)) {
-                               $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
+                               $listitem[0] = "0:".$process["running"];
                        }
                        dba::close($processes);
-               }
-               dba::close($entries);
-               $processlist = ' ('.implode(', ', $listitem).')';
 
-               $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
-               $entries = $s[0]["total"];
+                       // Now adding all processes with workerqueue entries
+                       $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`");
+                       while ($entry = dba::fetch($entries)) {
+                               $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]);
+                               if ($process = dba::fetch($processes)) {
+                                       $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
+                               }
+                               dba::close($processes);
+                       }
+                       dba::close($entries);
+                       $processlist = ' ('.implode(', ', $listitem).')';
+               }
 
+               if (is_null($entries)) {
+                       $entries = poller_total_entries();
+               }
                if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) {
-                       $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
-                       $top_priority = $s[0]["priority"];
-
-                       $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1",
-                               intval($top_priority), dbesc(NULL_DATE));
-                       $high_running = dbm::is_result($s);
-
+                       if (is_null($top_priority)) {
+                               $top_priority = poller_highest_priority();
+                       }
+                       if (is_null($high_running)) {
+                               $high_running = poller_process_with_priority_active($top_priority);
+                       }
                        if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
                                logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG);
                                $queues = $active + 1;
@@ -620,7 +666,6 @@ function poller_claim_process($queue) {
 
        $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
                        array('id' => $queue["id"], 'pid' => 0));
-       dba::unlock();
 
        if (!$success) {
                logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);