]> git.mxchange.org Git - friendica.git/blobdiff - include/poller.php
Issue 3857: There is the possibility of a bad handling of dislikes
[friendica.git] / include / poller.php
index 4c0f66596388b50ba93d3c29b264156e781e8bfa..b8f0d7189fdf76f761966c255e79d6e3ef800956 100644 (file)
@@ -1,6 +1,7 @@
 <?php
 
 use Friendica\App;
+use Friendica\Core\System;
 use Friendica\Core\Config;
 use Friendica\Util\Lock;
 
@@ -18,19 +19,24 @@ if (!file_exists("boot.php") && (sizeof($_SERVER["argv"]) != 0)) {
 require_once("boot.php");
 
 function poller_run($argv, $argc){
-       global $a, $db, $poller_up_start, $poller_db_duration;
+       global $a, $poller_up_start, $poller_db_duration;
 
        $poller_up_start = microtime(true);
 
-       $a = new App(dirname(__DIR__));
+       if (empty($a)) {
+               $a = new App(dirname(__DIR__));
+       }
 
-       @include(".htconfig.php");
-       require_once("include/dba.php");
-       $db = new dba($db_host, $db_user, $db_pass, $db_data);
+       require_once ".htconfig.php";
+       require_once "include/dba.php";
+       dba::connect($db_host, $db_user, $db_pass, $db_data);
        unset($db_host, $db_user, $db_pass, $db_data);
 
        Config::load();
 
+       // Check the database structure and possibly fixes it
+       check_db(true);
+
        // Quit when in maintenance
        if (Config::get('system', 'maintenance', true)) {
                return;
@@ -88,9 +94,11 @@ function poller_run($argv, $argc){
        $starttime = time();
 
        // We fetch the next queue entry that is about to be executed
-       while ($r = poller_worker_process()) {
+       while ($r = poller_worker_process($passing_slow)) {
 
-               $refetched = false;
+               // When we are processing jobs with a lower priority, we don't refetch new jobs
+               // Otherwise fast jobs could wait behind slow ones and could be blocked.
+               $refetched = $passing_slow;
 
                foreach ($r AS $entry) {
                        // Assure that the priority is an integer value
@@ -105,7 +113,7 @@ function poller_run($argv, $argc){
                        // If possible we will fetch new jobs for this worker
                        if (!$refetched && Lock::set('poller_worker_process', 0)) {
                                $stamp = (float)microtime(true);
-                               $refetched = find_worker_processes();
+                               $refetched = find_worker_processes($passing_slow);
                                $poller_db_duration += (microtime(true) - $stamp);
                                Lock::remove('poller_worker_process');
                        }
@@ -245,7 +253,9 @@ function poller_execute($queue) {
                poller_exec_function($queue, $funcname, $argv);
 
                $stamp = (float)microtime(true);
-               dba::update('workerqueue', array('done' => true), array('id' => $queue["id"]));
+               if (dba::update('workerqueue', array('done' => true), array('id' => $queue["id"]))) {
+                       Config::set('system', 'last_poller_execution', datetime_convert());
+               }
                $poller_db_duration = (microtime(true) - $stamp);
        } else {
                logger("Function ".$funcname." does not exist");
@@ -271,7 +281,9 @@ function poller_exec_function($queue, $funcname, $argv) {
 
        $argc = count($argv);
 
-       logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." ".$queue["parameter"]);
+       $new_process_id = uniqid("wrk", true);
+
+       logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." ".$queue["parameter"]." - Process PID: ".$new_process_id);
 
        $stamp = (float)microtime(true);
 
@@ -293,11 +305,14 @@ function poller_exec_function($queue, $funcname, $argv) {
        // For better logging create a new process id for every worker call
        // But preserve the old one for the worker
        $old_process_id = $a->process_id;
-       $a->process_id = uniqid("wrk", true);
+       $a->process_id = $new_process_id;
        $a->queue = $queue;
 
        $up_duration = number_format(microtime(true) - $poller_up_start, 3);
 
+       // Reset global data to avoid interferences
+       unset($_SESSION);
+
        $funcname($argv, $argc);
 
        $a->process_id = $old_process_id;
@@ -328,7 +343,7 @@ function poller_exec_function($queue, $funcname, $argv) {
                logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 2 minutes (".round($duration/60, 3).")", LOGGER_DEBUG);
        }
 
-       logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." - done in ".$duration." seconds.");
+       logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." - done in ".$duration." seconds. Process PID: ".$new_process_id);
 
        // Write down the performance values into the log
        if (Config::get("system", "profiler")) {
@@ -470,8 +485,9 @@ function poller_max_connections_reached() {
  *
  */
 function poller_kill_stale_workers() {
-       $entries = dba::p("SELECT `id`, `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > ? AND NOT `done` AND `pid` != 0 ORDER BY `priority`, `created`", NULL_DATE);
-
+       $entries = dba::select('workerqueue', array('id', 'pid', 'executed', 'priority', 'parameter'),
+                               array('`executed` > ? AND NOT `done` AND `pid` != 0', NULL_DATE),
+                               array('order' => array('priority', 'created')));
        while ($entry = dba::fetch($entries)) {
                if (!posix_kill($entry["pid"], 0)) {
                        dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
@@ -666,26 +682,36 @@ function poller_passing_slow(&$highest_priority) {
 /**
  * @brief Find and claim the next worker process for us
  *
+ * @param boolean $passing_slow Returns if we had passed low priority processes
  * @return boolean Have we found something?
  */
-function find_worker_processes() {
+function find_worker_processes(&$passing_slow) {
 
        $mypid = getmypid();
 
        // Check if we should pass some low priority process
        $highest_priority = 0;
        $found = false;
+       $passing_slow = false;
 
        // The higher the number of parallel workers, the more we prefetch to prevent concurring access
-       $limit = Config::get("system", "worker_queues", 4);
-       $limit = Config::get('system', 'worker_fetch_limit', $limit);
+       // We decrease the limit with the number of entries left in the queue
+       $worker_queues = Config::get("system", "worker_queues", 4);
+       $queue_length = Config::get('system', 'worker_fetch_limit', 1);
+       $lower_job_limit = $worker_queues * $queue_length * 2;
+       $jobs = poller_total_entries();
+
+       // Now do some magic
+       $exponent = 2;
+       $slope = $queue_length / pow($lower_job_limit, $exponent);
+       $limit = min($queue_length, ceil($slope * pow($jobs, $exponent)));
+
+       logger('Total: '.$jobs.' - Maximum: '.$queue_length.' - jobs per queue: '.$limit, LOGGER_DEBUG);
 
        if (poller_passing_slow($highest_priority)) {
                // Are there waiting processes with a higher priority than the currently highest?
-               $result = dba::p("SELECT `id` FROM `workerqueue`
-                                       WHERE `executed` <= ? AND `priority` < ? AND NOT `done`
-                                       ORDER BY `priority`, `created` LIMIT ".intval($limit),
-                               NULL_DATE, $highest_priority);
+               $result = dba::select('workerqueue', array('id'), array("`executed` <= ? AND `priority` < ? AND NOT `done`", NULL_DATE, $highest_priority),
+                               array('limit' => $limit, 'order' => array('priority', 'created'), 'only_query' => true));
 
                while ($id = dba::fetch($result)) {
                        $ids[] = $id["id"];
@@ -696,10 +722,8 @@ function find_worker_processes() {
 
                if (!$found) {
                        // Give slower processes some processing time
-                       $result = dba::p("SELECT `id` FROM `workerqueue`
-                                               WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
-                                               ORDER BY `priority`, `created` LIMIT ".intval($limit),
-                                       NULL_DATE, $highest_priority);
+                       $result = dba::select('workerqueue', array('id'), array("`executed` <= ? AND `priority` > ? AND NOT `done`", NULL_DATE, $highest_priority),
+                                       array('limit' => $limit, 'order' => array('priority', 'created'), 'only_query' => true));
 
                        while ($id = dba::fetch($result)) {
                                $ids[] = $id["id"];
@@ -707,12 +731,14 @@ function find_worker_processes() {
                        dba::close($result);
 
                        $found = (count($ids) > 0);
+                       $passing_slow = $found;
                }
        }
 
        // If there is no result (or we shouldn't pass lower processes) we check without priority limit
        if (!$found) {
-               $result = dba::p("SELECT `id` FROM `workerqueue` WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), NULL_DATE);
+               $result = dba::select('workerqueue', array('id'), array("`executed` <= ? AND NOT `done`", NULL_DATE),
+                               array('limit' => $limit, 'order' => array('priority', 'created'), 'only_query' => true));
 
                while ($id = dba::fetch($result)) {
                        $ids[] = $id["id"];
@@ -723,9 +749,9 @@ function find_worker_processes() {
        }
 
        if ($found) {
-               $sql = "UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`;";
-               array_unshift($ids, datetime_convert(), $mypid);
-               dba::e($sql, $ids);
+               $condition = "`id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`";
+               array_unshift($ids, $condition);
+               dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid), $ids);
        }
 
        return $found;
@@ -734,9 +760,10 @@ function find_worker_processes() {
 /**
  * @brief Returns the next worker process
  *
+ * @param boolean $passing_slow Returns if we had passed low priority processes
  * @return string SQL statement
  */
-function poller_worker_process() {
+function poller_worker_process(&$passing_slow) {
        global $poller_db_duration, $poller_lock_duration;
 
        $stamp = (float)microtime(true);
@@ -755,7 +782,7 @@ function poller_worker_process() {
        $poller_lock_duration = (microtime(true) - $stamp);
 
        $stamp = (float)microtime(true);
-       $found = find_worker_processes();
+       $found = find_worker_processes($passing_slow);
        $poller_db_duration += (microtime(true) - $stamp);
 
        Lock::remove('poller_worker_process');
@@ -783,7 +810,7 @@ function call_worker() {
                return;
        }
 
-       $url = App::get_baseurl()."/worker";
+       $url = System::baseUrl()."/worker";
        fetch_url($url, false, $redirects, 1);
 }
 
@@ -867,7 +894,7 @@ function poller_run_cron() {
        poller_kill_stale_workers();
 }
 
-if (array_search(__file__,get_included_files())===0){
+if (array_search(__file__,get_included_files())===0) {
        poller_run($_SERVER["argv"],$_SERVER["argc"]);
 
        poller_unclaim_process();