]> git.mxchange.org Git - friendica.git/commitdiff
Semaphore based locking and hopefully the fix for the workerqueue
authorMichael <heluecht@pirati.ca>
Wed, 28 Jun 2017 04:53:11 +0000 (04:53 +0000)
committerMichael <heluecht@pirati.ca>
Wed, 28 Jun 2017 04:53:11 +0000 (04:53 +0000)
boot.php
database.sql
include/cron.php
include/dbstructure.php
include/poller.php
src/Util/Lock.php
update.php

index b4f36eb84ebacc52ddcd1c1fd10b6c3bd6710c05..7d7d66ab3e2027b8c6db2c5ef13aa496eb1862b5 100644 (file)
--- a/boot.php
+++ b/boot.php
@@ -42,7 +42,7 @@ define ( 'FRIENDICA_PLATFORM',     'Friendica');
 define ( 'FRIENDICA_CODENAME',     'Asparagus');
 define ( 'FRIENDICA_VERSION',      '3.5.3-dev' );
 define ( 'DFRN_PROTOCOL_VERSION',  '2.23'    );
-define ( 'DB_UPDATE_VERSION',      1230      );
+define ( 'DB_UPDATE_VERSION',      1231      );
 
 /**
  * @brief Constant with a HTML line break.
index 7f7e975e72818ef4ce84f8dd2a070e90ef9bfd07..a3f937587cb52d55bb501a9adbdc395b8bd66531 100644 (file)
@@ -1,6 +1,6 @@
 -- ------------------------------------------
--- Friendica 3.5.3dev (Asparagus)
--- DB_UPDATE_VERSION 1228
+-- Friendica 3.5.3-dev (Asparagus)
+-- DB_UPDATE_VERSION 1231
 -- ------------------------------------------
 
 
@@ -1114,9 +1114,11 @@ CREATE TABLE IF NOT EXISTS `workerqueue` (
        `created` datetime NOT NULL DEFAULT '0001-01-01 00:00:00',
        `pid` int(11) NOT NULL DEFAULT 0,
        `executed` datetime NOT NULL DEFAULT '0001-01-01 00:00:00',
+       `done` tinyint(1) NOT NULL DEFAULT 0,
         PRIMARY KEY(`id`),
         INDEX `pid` (`pid`),
-        INDEX `parameter` (`parameter`(192)),
-        INDEX `priority_created` (`priority`,`created`)
+        INDEX `parameter` (`parameter`(64)),
+        INDEX `priority_created` (`priority`,`created`),
+        INDEX `executed` (`executed`)
 ) DEFAULT COLLATE utf8mb4_general_ci;
 
index 9c2e6aeaa30d5f68f5b3f972befdb0e4845896c3..0e587784404bb515b71f3960a00b5c9c2b6925be 100644 (file)
@@ -84,7 +84,7 @@ function cron_run(&$argv, &$argc){
                proc_run(PRIORITY_LOW, "include/cronjobs.php", "update_photo_albums");
 
                // Delete all done workerqueue entries
-               dba::delete('workerqueue', array('done' => true));
+               dba::e('DELETE FROM `workerqueue` WHERE `done` AND `executed` < UTC_TIMESTAMP() - INTERVAL 12 HOUR');
        }
 
        // Poll contacts
index 268bbbb66944060cb1f9b7d16b9ca464ad3f6716..32fd304f4ba8658177260db1a7fb9cb9c580a219 100644 (file)
@@ -1747,6 +1747,7 @@ function db_definition() {
                                        "pid" => array("pid"),
                                        "parameter" => array("parameter(64)"),
                                        "priority_created" => array("priority", "created"),
+                                       "executed" => array("executed"),
                                        )
                        );
 
index 379aadcd6719f4fe847c5a758b73f16bdaafb3d7..d9394b080bb3a1d3eabbbf650849faa5d671062e 100644 (file)
@@ -85,6 +85,8 @@ function poller_run($argv, $argc){
                poller_run_cron();
        }
 
+       $refetched = false;
+
        $starttime = time();
 
        // We fetch the next queue entry that is about to be executed
@@ -121,6 +123,13 @@ function poller_run($argv, $argc){
                        logger('Process lifetime reached, quitting.', LOGGER_DEBUG);
                        return;
                }
+
+               // If possible we will fetch new jobs for this worker
+               if (!$refetched && Lock::set('poller_worker_process', 0)) {
+                       $refetched = find_worker_processes();
+                       Lock::remove('poller_worker_process');
+               }
+
        }
        logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
 }
@@ -235,7 +244,7 @@ function poller_execute($queue) {
  * @param array $argv Array of values to be passed to the function
  */
 function poller_exec_function($queue, $funcname, $argv) {
-       global $poller_up_start, $poller_db_duration;
+       global $poller_up_start, $poller_db_duration, $poller_lock_duration;
 
        $a = get_app();
 
@@ -284,7 +293,11 @@ function poller_exec_function($queue, $funcname, $argv) {
         * The execution time is the productive time.
         * By changing parameters like the maximum number of workers we can check the effectivness.
        */
-       logger('DB: '.number_format($poller_db_duration, 2).' - Rest: '.number_format($up_duration - $poller_db_duration, 2).' - Execution: '.number_format($duration, 2), LOGGER_DEBUG);
+       logger('DB: '.number_format($poller_db_duration, 2).
+               ' - Lock: '.number_format($poller_lock_duration, 2).
+               ' - Rest: '.number_format($up_duration - $poller_db_duration - $poller_lock_duration, 2).
+               ' - Execution: '.number_format($duration, 2), LOGGER_DEBUG);
+       $poller_lock_duration = 0;
 
        if ($duration > 3600) {
                logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 1 hour (".round($duration/60, 3).")", LOGGER_DEBUG);
@@ -448,7 +461,7 @@ function poller_kill_stale_workers() {
        foreach ($r AS $pid) {
                if (!posix_kill($pid["pid"], 0)) {
                        dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
-                                       array('pid' => $pid["pid"]));
+                                       array('pid' => $pid["pid"], 'done' => false));
                } else {
                        // Kill long running processes
 
@@ -475,7 +488,7 @@ function poller_kill_stale_workers() {
                                // Additionally we are lowering the priority.
                                dba::update('workerqueue',
                                                array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0),
-                                               array('pid' => $pid["pid"]));
+                                               array('pid' => $pid["pid"], 'done' => false));
                        } else {
                                logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
                        }
@@ -512,7 +525,9 @@ function poller_too_much_workers() {
                        $listitem = array();
 
                        // Adding all processes with no workerqueue entry
-                       $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid` AND NOT `done`)");
+                       $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS
+                                               (SELECT id FROM `workerqueue`
+                                               WHERE `workerqueue`.`pid` = `process`.`pid` AND NOT `done` AND `pid` != ?)", getmypid());
                        if ($process = dba::fetch($processes)) {
                                $listitem[0] = "0:".$process["running"];
                        }
@@ -528,7 +543,16 @@ function poller_too_much_workers() {
                                dba::close($processes);
                        }
                        dba::close($entries);
-                       $processlist = ' ('.implode(', ', $listitem).')';
+
+                       $jobs_per_minute = 0;
+
+                       $jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE");
+                       if ($job = dba::fetch($jobs)) {
+                               $jobs_per_minute = number_format($job['jobs'] / 10, 0);
+                       }
+                       dba::close($jobs);
+
+                       $processlist = ' - jpm: '.$jobs_per_minute.' ('.implode(', ', $listitem).')';
                }
 
                $entries = poller_total_entries();
@@ -628,7 +652,10 @@ function find_worker_processes() {
        // Check if we should pass some low priority process
        $highest_priority = 0;
        $found = false;
-       $limit = Config::get('system', 'worker_fetch_limit', 5);
+
+       // The higher the number of parallel workers, the more we prefetch to prevent concurring access
+       $limit = Config::get("system", "worker_queues", 4) * 2;
+       $limit = Config::get('system', 'worker_fetch_limit', $limit);
 
        if (poller_passing_slow($highest_priority)) {
                // Are there waiting processes with a higher priority than the currently highest?
@@ -644,7 +671,7 @@ function find_worker_processes() {
                        // Give slower processes some processing time
                        $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
                                                WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
-                                               ORDER BY `priority`, `created` LIMIT 1",
+                                               ORDER BY `priority`, `created` LIMIT ".intval($limit),
                                        datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
                        if ($result) {
                                $found = (dba::affected_rows() > 0);
@@ -669,14 +696,29 @@ function find_worker_processes() {
  * @return string SQL statement
  */
 function poller_worker_process() {
-       global $poller_db_duration;
+       global $poller_db_duration, $poller_lock_duration;
 
        $stamp = (float)microtime(true);
 
-       $found = find_worker_processes();
+       // There can already be jobs for us in the queue.
+       $r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d AND NOT `done`", intval(getmypid()));
+       if (dbm::is_result($r)) {
+               $poller_db_duration += (microtime(true) - $stamp);
+               return $r;
+       }
 
+       $stamp = (float)microtime(true);
+       if (!Lock::set('poller_worker_process')) {
+               return false;
+       }
+       $poller_lock_duration = (microtime(true) - $stamp);
+
+       $stamp = (float)microtime(true);
+       $found = find_worker_processes();
        $poller_db_duration += (microtime(true) - $stamp);
 
+       Lock::remove('poller_worker_process');
+
        if ($found) {
                $r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d AND NOT `done`", intval(getmypid()));
        }
@@ -689,7 +731,7 @@ function poller_worker_process() {
 function poller_unclaim_process() {
        $mypid = getmypid();
 
-       dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), array('pid' => $mypid));
+       dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), array('pid' => $mypid, 'done' => false));
 }
 
 /**
@@ -793,6 +835,7 @@ if (array_search(__file__,get_included_files())===0){
        get_app()->end_process();
 
        Lock::remove('poller_worker');
+       Lock::remove('poller_worker_process');
 
        killme();
 }
index 36f408cf324b4e1603c82e9c6d963579e0ab85cc..a50faf2d90d8c637cd5252d4974758b80b54f2ec 100644 (file)
@@ -17,6 +17,8 @@ use dbm;
  * @brief This class contain Functions for preventing parallel execution of functions
  */
 class Lock {
+       private static $semaphore = array();
+
        /**
         * @brief Check for memcache and open a connection if configured
         *
@@ -43,6 +45,25 @@ class Lock {
                return $memcache;
        }
 
+       /**
+        * @brief Creates a semaphore key
+        *
+        * @param string $fn_name Name of the lock
+        *
+        * @return ressource the semaphore key
+        */
+       private static function semaphore_key($fn_name) {
+               $temp = get_temppath();
+
+               $file = $temp.'/'.$fn_name.'.sem';
+
+               if (!file_exists($file)) {
+                       file_put_contents($file, $function);
+               }
+
+               return ftok($file, 'f');
+       }
+
        /**
         * @brief Sets a lock for a given name
         *
@@ -55,6 +76,13 @@ class Lock {
                $got_lock = false;
                $start = time();
 
+               if (function_exists('sem_get')) {
+                       self::$semaphore[$fn_name] = sem_get(self::semaphore_key($fn_name));
+                       if (self::$semaphore[$fn_name]) {
+                               return sem_acquire(self::$semaphore[$fn_name], ($timeout == 0));
+                       }
+               }
+
                $memcache = self::connectMemcache();
                if (is_object($memcache)) {
                        $cachekey = get_app()->get_hostname().";lock:".$fn_name;
@@ -128,6 +156,11 @@ class Lock {
         * @param string $fn_name Name of the lock
         */
        public static function remove($fn_name) {
+               if (function_exists('sem_get') && self::$semaphore[$fn_name]) {
+                       sem_release(self::$semaphore[$fn_name]);
+                       return;
+               }
+
                $memcache = self::connectMemcache();
                if (is_object($memcache)) {
                        $cachekey = get_app()->get_hostname().";lock:".$fn_name;
index 09f11918c5ba026831c8c4928b5d653932a7407e..7cfdb231e0b29e4229db2361e21ee4f9017d8893 100644 (file)
@@ -1,6 +1,6 @@
 <?php
 
-define('UPDATE_VERSION' , 1230);
+define('UPDATE_VERSION' , 1231);
 
 /**
  *
@@ -1729,3 +1729,8 @@ function update_1202() {
        $r = q("UPDATE `user` SET `account-type` = %d WHERE `page-flags` IN (%d, %d)",
                dbesc(ACCOUNT_TYPE_COMMUNITY), dbesc(PAGE_COMMUNITY), dbesc(PAGE_PRVGROUP));
 }
+
+function update_1230() {
+       // For this special case we have to use the old update routine
+       $r = q("ALTER TABLE `workerqueue` ADD `done2` tinyint(1) NOT NULL DEFAULT 0");
+}