]> git.mxchange.org Git - friendica.git/commitdiff
Merge remote-tracking branch 'upstream/develop' into 1706-worker-even-faster
authorMichael <heluecht@pirati.ca>
Sun, 11 Jun 2017 07:49:34 +0000 (07:49 +0000)
committerMichael <heluecht@pirati.ca>
Sun, 11 Jun 2017 07:49:34 +0000 (07:49 +0000)
Conflicts:
include/poller.php

1  2 
boot.php
doc/htconfig.md
include/notifier.php
include/poller.php

diff --combined boot.php
index 2ad057d4cab100f5eb9b7c7d1cf840c482d8b784,5417e0fa23646708d77ac1a252f2470fc1593bb6..fe1ee63e06b37504138be60067e4b532f4b6d560
+++ b/boot.php
@@@ -993,7 -993,7 +993,7 @@@ function notice($s) 
  function info($s) {
        $a = get_app();
  
-       if (local_user() AND get_pconfig(local_user(), 'system', 'ignore_info')) {
+       if (local_user() && get_pconfig(local_user(), 'system', 'ignore_info')) {
                return;
        }
  
@@@ -1063,13 -1063,12 +1063,13 @@@ function proc_run($cmd) 
        $arr = array('args' => $args, 'run_cmd' => true);
  
        call_hooks("proc_run", $arr);
-       if (!$arr['run_cmd'] OR ! count($args)) {
+       if (!$arr['run_cmd'] || ! count($args)) {
                return;
        }
  
        $priority = PRIORITY_MEDIUM;
        $dont_fork = get_config("system", "worker_dont_fork");
 +      $created = datetime_convert();
  
        if (is_int($run_parameter)) {
                $priority = $run_parameter;
                if (isset($run_parameter['priority'])) {
                        $priority = $run_parameter['priority'];
                }
 +              if (isset($run_parameter['created'])) {
 +                      $created = $run_parameter['created'];
 +              }
                if (isset($run_parameter['dont_fork'])) {
                        $dont_fork = $run_parameter['dont_fork'];
                }
        $found = dba::select('workerqueue', array('id'), array('parameter' => $parameters), array('limit' => 1));
  
        if (!dbm::is_result($found)) {
 -              dba::insert('workerqueue', array('parameter' => $parameters, 'created' => datetime_convert(), 'priority' => $priority));
 +              dba::insert('workerqueue', array('parameter' => $parameters, 'created' => $created, 'priority' => $priority));
        }
  
        // Should we quit and wait for the poller to be called as a cronjob?
@@@ -1411,7 -1407,7 +1411,7 @@@ function clear_cache($basepath = "", $p
                $path = $basepath;
        }
  
-       if (($path == "") OR (!is_dir($path))) {
+       if (($path == "") || (!is_dir($path))) {
                return;
        }
  
                if ($dh = opendir($path)) {
                        while (($file = readdir($dh)) !== false) {
                                $fullpath = $path . "/" . $file;
-                               if ((filetype($fullpath) == "dir") and ($file != ".") and ($file != "..")) {
+                               if ((filetype($fullpath) == "dir") && ($file != ".") && ($file != "..")) {
                                        clear_cache($basepath, $fullpath);
                                }
-                               if ((filetype($fullpath) == "file") and (filectime($fullpath) < (time() - $cachetime))) {
+                               if ((filetype($fullpath) == "file") && (filectime($fullpath) < (time() - $cachetime))) {
                                        unlink($fullpath);
                                }
                        }
@@@ -1448,7 -1444,7 +1448,7 @@@ function get_itemcachepath() 
        }
  
        $itemcache = get_config('system', 'itemcache');
-       if (($itemcache != "") AND App::directory_usable($itemcache)) {
+       if (($itemcache != "") && App::directory_usable($itemcache)) {
                return $itemcache;
        }
  
   */
  function get_spoolpath() {
        $spoolpath = get_config('system', 'spoolpath');
-       if (($spoolpath != "") AND App::directory_usable($spoolpath)) {
+       if (($spoolpath != "") && App::directory_usable($spoolpath)) {
                // We have a spool path and it is usable
                return $spoolpath;
        }
@@@ -1510,7 -1506,7 +1510,7 @@@ function get_temppath() 
  
        $temppath = get_config("system", "temppath");
  
-       if (($temppath != "") AND App::directory_usable($temppath)) {
+       if (($temppath != "") && App::directory_usable($temppath)) {
                // We have a temp path and it is usable
                return $temppath;
        }
        $temppath = sys_get_temp_dir();
  
        // Check if it is usable
-       if (($temppath != "") AND App::directory_usable($temppath)) {
+       if (($temppath != "") && App::directory_usable($temppath)) {
                // To avoid any interferences with other systems we create our own directory
                $new_temppath = $temppath . "/" . $a->get_hostname();
                if (!is_dir($new_temppath)) {
@@@ -1642,7 -1638,7 +1642,7 @@@ function argv($x) 
  function infinite_scroll_data($module) {
  
        if (get_pconfig(local_user(), 'system', 'infinite_scroll')
-               AND ($module == "network") AND ($_GET["mode"] != "minimal")) {
+               && ($module == "network") && ($_GET["mode"] != "minimal")) {
  
                // get the page number
                if (is_string($_GET["page"])) {
  
                // try to get the uri from which we load the content
                foreach ($_GET AS $param => $value) {
-                       if (($param != "page") AND ($param != "q")) {
+                       if (($param != "page") && ($param != "q")) {
                                $reload_uri .= "&" . $param . "=" . urlencode($value);
                        }
                }
  
-               if (($a->page_offset != "") AND ! strstr($reload_uri, "&offset=")) {
+               if (($a->page_offset != "") && ! strstr($reload_uri, "&offset=")) {
                        $reload_uri .= "&offset=" . urlencode($a->page_offset);
                }
  
diff --combined doc/htconfig.md
index 5fbd6e0cb8807d4486c63aa44995bd457485645e,9c556bb7cd49a647a588f173a508d86c1c3d046f..ec961c200f680ae63c8d1f4887a70e7000e0c181
@@@ -68,7 -68,6 +68,7 @@@ Example: To set the directory value ple
  * **ostatus_poll_timeframe** - Defines how old an item can be to try to complete the conversation with it.
  * **paranoia** (Boolean) - Log out users if their IP address changed.
  * **permit_crawling** (Boolean) - Restricts the search for not logged in users to one search per minute.
 +* **worker_debug** (Boolean) - If activated, it prints out the number of running processes split by priority.
  * **profiler** (Boolean) - Enable internal timings to help optimize code. Needed for "rendertime" addon. Default is false.
  * **free_crawls** - Number of "free" searches when "permit_crawling" is activated (Default value is 10)
  * **crawl_permit_period** - Period in seconds between allowed searches when the number of free searches is reached and "permit_crawling" is activated (Default value is 60)
@@@ -118,3 -117,10 +118,10 @@@ If more then one account should be abl
  If you want to have a more personalized closing line for the notification emails you can set a variable for the admin_name.
  
      $a->config['admin_name'] = "Marvin";
+ ## Database Settings
+ The configuration variables db_host, db_user, db_pass and db_data are holding your credentials for the database connection.
+ If you need to specify a port to access the database, you can do so by appending ":portnumber" to the db_host variable.
+     $db_host = 'your.mysqlhost.com:123456';
diff --combined include/notifier.php
index a5f378a55c21d1e6900bf7c8d4ed1306be92884a,7f6318182f63f673fd6606e22a8bfe5fd6aaa776..d7e8a9ccec5a6760cf380de608ad0d8929f39c15
@@@ -56,15 -56,13 +56,15 @@@ function notifier_run(&$argv, &$argc)
        }
  
        // Inherit the priority
 -      $queue = dba::select('workerqueue', array('priority'), array('pid' => getmypid()), array('limit' => 1));
 +      $queue = dba::select('workerqueue', array('priority', 'created'), array('pid' => getmypid()), array('limit' => 1));
        if (dbm::is_result($queue)) {
                $priority = (int)$queue['priority'];
 +              $process_created = $queue['created'];
                logger('inherited priority: '.$priority);
        } else {
                // Normally this shouldn't happen.
                $priority = PRIORITY_HIGH;
 +              $process_created = datetime_convert();
                logger('no inherited priority! Something is wrong.');
        }
  
                $recipients_relocate = q("SELECT * FROM contact WHERE uid = %d  AND self = 0 AND network = '%s'" , intval($uid), NETWORK_DFRN);
        } else {
                // find ancestors
-               $r = q("SELECT * FROM `item` WHERE `id` = %d and visible = 1 and moderated = 0 LIMIT 1",
+               $r = q("SELECT * FROM `item` WHERE `id` = %d AND visible = 1 AND moderated = 0 LIMIT 1",
                        intval($item_id)
                );
  
                $updated = $r[0]['edited'];
  
                $items = q("SELECT `item`.*, `sign`.`signed_text`,`sign`.`signature`,`sign`.`signer`
-                       FROM `item` LEFT JOIN `sign` ON `sign`.`iid` = `item`.`id` WHERE `parent` = %d and visible = 1 and moderated = 0 ORDER BY `id` ASC",
+                       FROM `item` LEFT JOIN `sign` ON `sign`.`iid` = `item`.`id` WHERE `parent` = %d AND visible = 1 AND moderated = 0 ORDER BY `id` ASC",
                        intval($parent_id)
                );
  
                        $recipients = array($parent['contact-id']);
                        $recipients_followup  = array($parent['contact-id']);
  
-                       //if (!$target_item['private'] AND $target_item['wall'] AND
-                       if (!$target_item['private'] AND
+                       //if (!$target_item['private'] && $target_item['wall'] &&
+                       if (!$target_item['private'] &&
                                (strlen($target_item['allow_cid'].$target_item['allow_gid'].
                                        $target_item['deny_cid'].$target_item['deny_gid']) == 0))
                                $push_notify = true;
  
-                       if (($thr_parent AND ($thr_parent[0]['network'] == NETWORK_OSTATUS)) OR ($parent['network'] == NETWORK_OSTATUS)) {
+                       if (($thr_parent && ($thr_parent[0]['network'] == NETWORK_OSTATUS)) || ($parent['network'] == NETWORK_OSTATUS)) {
  
                                $push_notify = true;
  
  
                // If the thread parent is OStatus then do some magic to distribute the messages.
                // We have not only to look at the parent, since it could be a Friendica thread.
-               if (($thr_parent AND ($thr_parent[0]['network'] == NETWORK_OSTATUS)) OR ($parent['network'] == NETWORK_OSTATUS)) {
+               if (($thr_parent && ($thr_parent[0]['network'] == NETWORK_OSTATUS)) || ($parent['network'] == NETWORK_OSTATUS)) {
  
                        $diaspora_delivery = false;
  
                        }
                        logger("Deliver ".$target_item["guid"]." to ".$contact['url']." via network ".$contact['network'], LOGGER_DEBUG);
  
 -                      proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $contact['id']);
 +                      proc_run(array('priority' => $priority, 'created' => $process_created, 'dont_fork' => true),
 +                                      'include/delivery.php', $cmd, $item_id, $contact['id']);
                }
        }
  
  
                                if ((! $mail) && (! $fsuggest) && (! $followup)) {
                                        logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]);
 -                                      proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $rr['id']);
 +                                      proc_run(array('priority' => $priority, 'created' => $process_created, 'dont_fork' => true),
 +                                                      'include/delivery.php', $cmd, $item_id, $rr['id']);
                                }
                        }
                }
        }
  
        // Notify PuSH subscribers (Used for OStatus distribution of regular posts)
-       if ($push_notify AND strlen($hub)) {
+       if ($push_notify && strlen($hub)) {
                $hubs = explode(',', $hub);
                if (count($hubs)) {
                        foreach ($hubs as $h) {
                }
  
                // Handling the pubsubhubbub requests
 -              proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php');
 +              proc_run(array('priority' => PRIORITY_HIGH, 'created' => $process_created, 'dont_fork' => true),
 +                              'include/pubsubpublish.php');
        }
  
        logger('notifier: calling hooks', LOGGER_DEBUG);
diff --combined include/poller.php
index a1266d62c91bce67e755fd6bee9ddc00923eded6,cc8edce656969ba4e8743adac09d479724d2edee..8784931d3c6106652d3d542490769702f7a3e470
@@@ -4,7 -4,7 +4,7 @@@ use Friendica\App
  use Friendica\Core\Config;
  use Friendica\Util\Lock;
  
- if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) {
+ if (!file_exists("boot.php") && (sizeof($_SERVER["argv"]) != 0)) {
        $directory = dirname($_SERVER["argv"][0]);
  
        if (substr($directory, 0, 1) != "/") {
@@@ -75,7 -75,7 +75,7 @@@ function poller_run($argv, $argc)
        }
  
        // Now we start additional cron processes if we should do so
-       if (($argc <= 1) OR ($argv[1] != "no_cron")) {
+       if (($argc <= 1) || ($argv[1] != "no_cron")) {
                poller_run_cron();
        }
  
  
                // If we got that queue entry we claim it for us
                if (!poller_claim_process($r[0])) {
 +                      dba::unlock();
                        continue;
 +              } else {
 +                      // Fetch all workerqueue data while the table is still locked
 +                      // This is redundant, but this speeds up the processing
 +                      $entries = poller_total_entries();
 +                      $top_priority = poller_highest_priority();
 +                      $high_running = poller_process_with_priority_active($top_priority);
 +                      dba::unlock();
                }
  
                // To avoid the quitting of multiple pollers only one poller at a time will execute the check
                if (Lock::set('poller_worker', 0)) {
                        // Count active workers and compare them with a maximum value that depends on the load
 -                      if (poller_too_much_workers()) {
 +                      if (poller_too_much_workers($entries, $top_priority, $high_running)) {
                                logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
                                return;
                        }
        logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
  }
  
 +/**
 + * @brief Returns the number of non executed entries in the worker queue
 + *
 + * @return integer Number of non executed entries in the worker queue
 + */
 +function poller_total_entries() {
 +      $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
 +      return $s[0]["total"];
 +}
 +
 +/**
 + * @brief Returns the highest priority in the worker queue that isn't executed
 + *
 + * @return integer Number of active poller processes
 + */
 +function poller_highest_priority() {
 +      $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
 +      return $s[0]["priority"];
 +}
 +
 +/**
 + * @brief Returns if a process with the given priority is running
 + *
 + * @param integer $priority The priority that should be checked
 + *
 + * @return integer Is there a process running with that priority?
 + */
 +function poller_process_with_priority_active($priority) {
 +      $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1",
 +                      intval($priority), dbesc(NULL_DATE));
 +      return dbm::is_result($s);
 +}
 +
  /**
   * @brief Execute a worker entry
   *
@@@ -462,13 -421,9 +462,13 @@@ function poller_kill_stale_workers() 
  /**
   * @brief Checks if the number of active workers exceeds the given limits
   *
 + * @param integer $entries The number of not executed entries in the worker queue
 + * @param integer $top_priority The highest not executed priority in the worker queue
 + * @param boolean $high_running Is a process with priority "$top_priority" running?
 + *
   * @return bool Are there too much workers running?
   */
 -function poller_too_much_workers() {
 +function poller_too_much_workers($entries = NULL, $top_priority = NULL, $high_running = NULL) {
        $queues = Config::get("system", "worker_queues", 4);
  
        $maxqueues = $queues;
                $slope = $maxworkers / pow($maxsysload, $exponent);
                $queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent));
  
 -              // Create a list of queue entries grouped by their priority
 -              $listitem = array();
 +              if (Config::get('system', 'worker_debug')) {
 +                      // Create a list of queue entries grouped by their priority
 +                      $listitem = array();
  
 -              // Adding all processes with no workerqueue entry
 -              $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)");
 -              if ($process = dba::fetch($processes)) {
 -                      $listitem[0] = "0:".$process["running"];
 -              }
 -              dba::close($processes);
 -
 -              // Now adding all processes with workerqueue entries
 -              $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`");
 -              while ($entry = dba::fetch($entries)) {
 -                      $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]);
 +                      // Adding all processes with no workerqueue entry
 +                      $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)");
                        if ($process = dba::fetch($processes)) {
 -                              $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
 +                              $listitem[0] = "0:".$process["running"];
                        }
                        dba::close($processes);
 -              }
 -              dba::close($entries);
 -              $processlist = ' ('.implode(', ', $listitem).')';
  
 -              $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
 -              $entries = $s[0]["total"];
 +                      // Now adding all processes with workerqueue entries
 +                      $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`");
 +                      while ($entry = dba::fetch($entries)) {
 +                              $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]);
 +                              if ($process = dba::fetch($processes)) {
 +                                      $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
 +                              }
 +                              dba::close($processes);
 +                      }
 +                      dba::close($entries);
 +                      $processlist = ' ('.implode(', ', $listitem).')';
 +              }
  
-               if (Config::get("system", "worker_fastlane", false) AND ($queues > 0) AND ($entries > 0) AND ($active >= $queues)) {
 +              if (is_null($entries)) {
 +                      $entries = poller_total_entries();
 +              }
 -                      $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
 -                      $top_priority = $s[0]["priority"];
 -
 -                      $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1",
 -                              intval($top_priority), dbesc(NULL_DATE));
 -                      $high_running = dbm::is_result($s);
 -
+               if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) {
-                       if (!$high_running AND ($top_priority > PRIORITY_UNDEFINED) AND ($top_priority < PRIORITY_NEGLIGIBLE)) {
 +                      if (is_null($top_priority)) {
 +                              $top_priority = poller_highest_priority();
 +                      }
 +                      if (is_null($high_running)) {
 +                              $high_running = poller_process_with_priority_active($top_priority);
 +                      }
+                       if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
                                logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG);
                                $queues = $active + 1;
                        }
                logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries.$processlist." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG);
  
                // Are there fewer workers running as possible? Then fork a new one.
-               if (!Config::get("system", "worker_dont_fork") AND ($queues > ($active + 1)) AND ($entries > 1)) {
+               if (!Config::get("system", "worker_dont_fork") && ($queues > ($active + 1)) && ($entries > 1)) {
                        logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG);
                        $args = array("include/poller.php", "no_cron");
                        $a = get_app();
@@@ -666,6 -620,7 +666,6 @@@ function poller_claim_process($queue) 
  
        $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
                        array('id' => $queue["id"], 'pid' => 0));
 -      dba::unlock();
  
        if (!$success) {
                logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
        if (!$id) {
                logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG);
                return false;
-       } elseif ((strtotime($id[0]["executed"]) <= 0) OR ($id[0]["pid"] == 0)) {
+       } elseif ((strtotime($id[0]["executed"]) <= 0) || ($id[0]["pid"] == 0)) {
                logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG);
                return false;
        } elseif ($id[0]["pid"] != $mypid) {