]> git.mxchange.org Git - friendica.git/commitdiff
Merge remote-tracking branch 'upstream/develop' into 1706-worker-even-faster
authorMichael <heluecht@pirati.ca>
Sun, 11 Jun 2017 07:49:34 +0000 (07:49 +0000)
committerMichael <heluecht@pirati.ca>
Sun, 11 Jun 2017 07:49:34 +0000 (07:49 +0000)
Conflicts:
include/poller.php

boot.php
doc/htconfig.md
include/notifier.php
include/poller.php
include/pubsubpublish.php

index 5417e0fa23646708d77ac1a252f2470fc1593bb6..fe1ee63e06b37504138be60067e4b532f4b6d560 100644 (file)
--- a/boot.php
+++ b/boot.php
@@ -1069,6 +1069,7 @@ function proc_run($cmd) {
 
        $priority = PRIORITY_MEDIUM;
        $dont_fork = get_config("system", "worker_dont_fork");
+       $created = datetime_convert();
 
        if (is_int($run_parameter)) {
                $priority = $run_parameter;
@@ -1076,6 +1077,9 @@ function proc_run($cmd) {
                if (isset($run_parameter['priority'])) {
                        $priority = $run_parameter['priority'];
                }
+               if (isset($run_parameter['created'])) {
+                       $created = $run_parameter['created'];
+               }
                if (isset($run_parameter['dont_fork'])) {
                        $dont_fork = $run_parameter['dont_fork'];
                }
@@ -1088,7 +1092,7 @@ function proc_run($cmd) {
        $found = dba::select('workerqueue', array('id'), array('parameter' => $parameters), array('limit' => 1));
 
        if (!dbm::is_result($found)) {
-               dba::insert('workerqueue', array('parameter' => $parameters, 'created' => datetime_convert(), 'priority' => $priority));
+               dba::insert('workerqueue', array('parameter' => $parameters, 'created' => $created, 'priority' => $priority));
        }
 
        // Should we quit and wait for the poller to be called as a cronjob?
index 9c556bb7cd49a647a588f173a508d86c1c3d046f..ec961c200f680ae63c8d1f4887a70e7000e0c181 100644 (file)
@@ -68,6 +68,7 @@ Example: To set the directory value please add this line to your .htconfig.php:
 * **ostatus_poll_timeframe** - Defines how old an item can be to try to complete the conversation with it.
 * **paranoia** (Boolean) - Log out users if their IP address changed.
 * **permit_crawling** (Boolean) - Restricts the search for not logged in users to one search per minute.
+* **worker_debug** (Boolean) - If activated, it prints out the number of running processes split by priority.
 * **profiler** (Boolean) - Enable internal timings to help optimize code. Needed for "rendertime" addon. Default is false.
 * **free_crawls** - Number of "free" searches when "permit_crawling" is activated (Default value is 10)
 * **crawl_permit_period** - Period in seconds between allowed searches when the number of free searches is reached and "permit_crawling" is activated (Default value is 60)
index 7f6318182f63f673fd6606e22a8bfe5fd6aaa776..d7e8a9ccec5a6760cf380de608ad0d8929f39c15 100644 (file)
@@ -56,13 +56,15 @@ function notifier_run(&$argv, &$argc){
        }
 
        // Inherit the priority
-       $queue = dba::select('workerqueue', array('priority'), array('pid' => getmypid()), array('limit' => 1));
+       $queue = dba::select('workerqueue', array('priority', 'created'), array('pid' => getmypid()), array('limit' => 1));
        if (dbm::is_result($queue)) {
                $priority = (int)$queue['priority'];
+               $process_created = $queue['created'];
                logger('inherited priority: '.$priority);
        } else {
                // Normally this shouldn't happen.
                $priority = PRIORITY_HIGH;
+               $process_created = datetime_convert();
                logger('no inherited priority! Something is wrong.');
        }
 
@@ -498,7 +500,8 @@ function notifier_run(&$argv, &$argc){
                        }
                        logger("Deliver ".$target_item["guid"]." to ".$contact['url']." via network ".$contact['network'], LOGGER_DEBUG);
 
-                       proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $contact['id']);
+                       proc_run(array('priority' => $priority, 'created' => $process_created, 'dont_fork' => true),
+                                       'include/delivery.php', $cmd, $item_id, $contact['id']);
                }
        }
 
@@ -563,7 +566,8 @@ function notifier_run(&$argv, &$argc){
 
                                if ((! $mail) && (! $fsuggest) && (! $followup)) {
                                        logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]);
-                                       proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $rr['id']);
+                                       proc_run(array('priority' => $priority, 'created' => $process_created, 'dont_fork' => true),
+                                                       'include/delivery.php', $cmd, $item_id, $rr['id']);
                                }
                        }
                }
@@ -603,7 +607,8 @@ function notifier_run(&$argv, &$argc){
                }
 
                // Handling the pubsubhubbub requests
-               proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php');
+               proc_run(array('priority' => PRIORITY_HIGH, 'created' => $process_created, 'dont_fork' => true),
+                               'include/pubsubpublish.php');
        }
 
        logger('notifier: calling hooks', LOGGER_DEBUG);
index cc8edce656969ba4e8743adac09d479724d2edee..8784931d3c6106652d3d542490769702f7a3e470 100644 (file)
@@ -86,13 +86,21 @@ function poller_run($argv, $argc){
 
                // If we got that queue entry we claim it for us
                if (!poller_claim_process($r[0])) {
+                       dba::unlock();
                        continue;
+               } else {
+                       // Fetch all workerqueue data while the table is still locked
+                       // This is redundant, but this speeds up the processing
+                       $entries = poller_total_entries();
+                       $top_priority = poller_highest_priority();
+                       $high_running = poller_process_with_priority_active($top_priority);
+                       dba::unlock();
                }
 
                // To avoid the quitting of multiple pollers only one poller at a time will execute the check
                if (Lock::set('poller_worker', 0)) {
                        // Count active workers and compare them with a maximum value that depends on the load
-                       if (poller_too_much_workers()) {
+                       if (poller_too_much_workers($entries, $top_priority, $high_running)) {
                                logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
                                return;
                        }
@@ -120,6 +128,39 @@ function poller_run($argv, $argc){
        logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
 }
 
+/**
+ * @brief Returns the number of non executed entries in the worker queue
+ *
+ * @return integer Number of non executed entries in the worker queue
+ */
+function poller_total_entries() {
+       $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
+       return $s[0]["total"];
+}
+
+/**
+ * @brief Returns the highest priority in the worker queue that isn't executed
+ *
+ * @return integer Number of active poller processes
+ */
+function poller_highest_priority() {
+       $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
+       return $s[0]["priority"];
+}
+
+/**
+ * @brief Returns if a process with the given priority is running
+ *
+ * @param integer $priority The priority that should be checked
+ *
+ * @return integer Is there a process running with that priority?
+ */
+function poller_process_with_priority_active($priority) {
+       $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1",
+                       intval($priority), dbesc(NULL_DATE));
+       return dbm::is_result($s);
+}
+
 /**
  * @brief Execute a worker entry
  *
@@ -421,9 +462,13 @@ function poller_kill_stale_workers() {
 /**
  * @brief Checks if the number of active workers exceeds the given limits
  *
+ * @param integer $entries The number of not executed entries in the worker queue
+ * @param integer $top_priority The highest not executed priority in the worker queue
+ * @param boolean $high_running Is a process with priority "$top_priority" running?
+ *
  * @return bool Are there too much workers running?
  */
-function poller_too_much_workers() {
+function poller_too_much_workers($entries = NULL, $top_priority = NULL, $high_running = NULL) {
        $queues = Config::get("system", "worker_queues", 4);
 
        $maxqueues = $queues;
@@ -442,39 +487,40 @@ function poller_too_much_workers() {
                $slope = $maxworkers / pow($maxsysload, $exponent);
                $queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent));
 
-               // Create a list of queue entries grouped by their priority
-               $listitem = array();
+               if (Config::get('system', 'worker_debug')) {
+                       // Create a list of queue entries grouped by their priority
+                       $listitem = array();
 
-               // Adding all processes with no workerqueue entry
-               $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)");
-               if ($process = dba::fetch($processes)) {
-                       $listitem[0] = "0:".$process["running"];
-               }
-               dba::close($processes);
-
-               // Now adding all processes with workerqueue entries
-               $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`");
-               while ($entry = dba::fetch($entries)) {
-                       $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]);
+                       // Adding all processes with no workerqueue entry
+                       $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)");
                        if ($process = dba::fetch($processes)) {
-                               $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
+                               $listitem[0] = "0:".$process["running"];
                        }
                        dba::close($processes);
-               }
-               dba::close($entries);
-               $processlist = ' ('.implode(', ', $listitem).')';
 
-               $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
-               $entries = $s[0]["total"];
+                       // Now adding all processes with workerqueue entries
+                       $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`");
+                       while ($entry = dba::fetch($entries)) {
+                               $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]);
+                               if ($process = dba::fetch($processes)) {
+                                       $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
+                               }
+                               dba::close($processes);
+                       }
+                       dba::close($entries);
+                       $processlist = ' ('.implode(', ', $listitem).')';
+               }
 
+               if (is_null($entries)) {
+                       $entries = poller_total_entries();
+               }
                if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) {
-                       $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
-                       $top_priority = $s[0]["priority"];
-
-                       $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1",
-                               intval($top_priority), dbesc(NULL_DATE));
-                       $high_running = dbm::is_result($s);
-
+                       if (is_null($top_priority)) {
+                               $top_priority = poller_highest_priority();
+                       }
+                       if (is_null($high_running)) {
+                               $high_running = poller_process_with_priority_active($top_priority);
+                       }
                        if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
                                logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG);
                                $queues = $active + 1;
@@ -620,7 +666,6 @@ function poller_claim_process($queue) {
 
        $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
                        array('id' => $queue["id"], 'pid' => 0));
-       dba::unlock();
 
        if (!$success) {
                logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
index 1112969f2705ee3a9f773aceded4c0dfef8558e1..7c70059f90fd1a151f06b03d0ffe5e8cdcdce468 100644 (file)
@@ -11,13 +11,24 @@ function pubsubpublish_run(&$argv, &$argc){
        if ($argc > 1) {
                $pubsubpublish_id = intval($argv[1]);
        } else {
+               // Inherit the creation time
+               $queue = dba::select('workerqueue', array('created'), array('pid' => getmypid()), array('limit' => 1));
+               if (dbm::is_result($queue)) {
+                       $process_created = $queue['created'];
+               } else {
+                       // Normally this shouldn't happen.
+                       $process_created = datetime_convert();
+                       logger('no inherited priority! Something is wrong.');
+               }
+
                // We'll push to each subscriber that has push > 0,
                // i.e. there has been an update (set in notifier.php).
                $r = q("SELECT `id`, `callback_url` FROM `push_subscriber` WHERE `push` > 0");
 
                foreach ($r as $rr) {
                        logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG);
-                       proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php', $rr["id"]);
+                       proc_run(array('priority' => PRIORITY_HIGH, 'created' => $process_created, 'dont_fork' => true),
+                                       'include/pubsubpublish.php', $rr["id"]);
                }
        }