]> git.mxchange.org Git - friendica.git/commitdiff
Merge pull request #3522 from annando/1706-lock
authorTobias Diekershoff <tobias.diekershoff@gmx.net>
Wed, 7 Jun 2017 07:29:54 +0000 (09:29 +0200)
committerGitHub <noreply@github.com>
Wed, 7 Jun 2017 07:29:54 +0000 (09:29 +0200)
Some more poller speed and lock functionality

15 files changed:
boot.php
database.sql
doc/database/db_locks.md
include/cron.php
include/dba.php
include/dbstructure.php
include/lock.php [deleted file]
include/notifier.php
include/poller.php
include/pubsubpublish.php
include/queue.php
include/socgraph.php
src/App.php
src/Util/Lock.php [new file with mode: 0644]
update.php

index 9428cf5a389d0757757aaa5139a5caac07bccc34..50b8f03f6322afc99db00d9b86774cab582306c5 100644 (file)
--- a/boot.php
+++ b/boot.php
@@ -35,12 +35,13 @@ require_once 'include/features.php';
 require_once 'include/identity.php';
 require_once 'update.php';
 require_once 'include/dbstructure.php';
+require_once 'include/poller.php';
 
 define ( 'FRIENDICA_PLATFORM',     'Friendica');
 define ( 'FRIENDICA_CODENAME',     'Asparagus');
 define ( 'FRIENDICA_VERSION',      '3.5.3-dev' );
 define ( 'DFRN_PROTOCOL_VERSION',  '2.23'    );
-define ( 'DB_UPDATE_VERSION',      1227      );
+define ( 'DB_UPDATE_VERSION',      1228      );
 
 /**
  * @brief Constant with a HTML line break.
@@ -1095,18 +1096,8 @@ function proc_run($cmd) {
                return;
        }
 
-       // Checking number of workers
-       $workers = q("SELECT COUNT(*) AS `workers` FROM `workerqueue` WHERE `executed` > '%s'", dbesc(NULL_DATE));
-
-       // Get number of allowed number of worker threads
-       $queues = intval(get_config("system", "worker_queues"));
-
-       if ($queues == 0) {
-               $queues = 4;
-       }
-
        // If there are already enough workers running, don't fork another one
-       if ($workers[0]["workers"] >= $queues) {
+       if (poller_too_much_workers()) {
                return;
        }
 
index 4a5946ef38dc747b8c210e7f4f5b84ea60e046a4..7f7e975e72818ef4ce84f8dd2a070e90ef9bfd07 100644 (file)
@@ -1,6 +1,6 @@
 -- ------------------------------------------
--- Friendica 3.5.2-rc (Asparagus)
--- DB_UPDATE_VERSION 1227
+-- Friendica 3.5.3dev (Asparagus)
+-- DB_UPDATE_VERSION 1228
 -- ------------------------------------------
 
 
@@ -580,7 +580,7 @@ CREATE TABLE IF NOT EXISTS `locks` (
        `id` int(11) NOT NULL auto_increment,
        `name` varchar(128) NOT NULL DEFAULT '',
        `locked` tinyint(1) NOT NULL DEFAULT 0,
-       `created` datetime DEFAULT '0001-01-01 00:00:00',
+       `pid` int(10) unsigned NOT NULL DEFAULT 0,
         PRIMARY KEY(`id`)
 ) DEFAULT COLLATE utf8mb4_general_ci;
 
@@ -1116,6 +1116,7 @@ CREATE TABLE IF NOT EXISTS `workerqueue` (
        `executed` datetime NOT NULL DEFAULT '0001-01-01 00:00:00',
         PRIMARY KEY(`id`),
         INDEX `pid` (`pid`),
+        INDEX `parameter` (`parameter`(192)),
         INDEX `priority_created` (`priority`,`created`)
 ) DEFAULT COLLATE utf8mb4_general_ci;
 
index 00556dd9538f484b0cdc417785d8eb1c3ba3bc52..4de6fbf961c179fbd47ceb686658065f501f685b 100644 (file)
@@ -1,11 +1,11 @@
 Table locks
 ===========
 
-| Field   | Description      | Type         | Null | Key | Default             | Extra          |
-|---------|------------------|--------------|------|-----|---------------------|----------------|
-| id      | sequential ID    | int(11)      | NO   | PRI | NULL                | auto_increment |
-| name    |                  | varchar(128) | NO   |     |                     |                |
-| locked  |                  | tinyint(1)   | NO   |     | 0                   |                |
-| created |                  | datetime     | YES  |     | 0001-01-01 00:00:00 |                |
+| Field   | Description      | Type             | Null | Key | Default             | Extra          |
+|---------|------------------|------------------|------|-----|---------------------|----------------|
+| id      | sequential ID    | int(11)          | NO   | PRI | NULL                | auto_increment |
+| name    |                  | varchar(128)     | NO   |     |                     |                |
+| locked  |                  | tinyint(1)       | NO   |     | 0                   |                |
+| pid     | Process ID       | int(10) unsigned | NO   |     | 0                   |                |
 
 Return to [database documentation](help/database)
index 3702bf8b36707dfae5f3d1113602c0318d2634b2..70b87e969618cac8dbbbc9b87a727e8e657e81cc 100644 (file)
@@ -246,10 +246,11 @@ function cron_poll_contacts($argc, $argv) {
                        logger("Polling " . $contact["network"] . " " . $contact["id"] . " " . $contact["nick"] . " " . $contact["name"]);
 
                        if (($contact['network'] == NETWORK_FEED) AND ($contact['priority'] <= 3)) {
-                               proc_run(PRIORITY_MEDIUM, 'include/onepoll.php', intval($contact['id']));
+                               $priority = PRIORITY_MEDIUM;
                        } else {
-                               proc_run(PRIORITY_LOW, 'include/onepoll.php', intval($contact['id']));
+                               $priority = PRIORITY_LOW;
                        }
+                       proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/onepoll.php', intval($contact['id']));
                }
        }
 }
index b9e6c32d56acff766aeec3ffc91af5b09f5a023c..1f428bf4651b2b43ed0c47dc785761b3b0d1d443 100644 (file)
@@ -806,6 +806,41 @@ class dba {
                return self::e($sql, $param);
        }
 
+       /**
+        * @brief Locks a table for exclusive write access
+        *
+        * This function can be extended in the future to accept a table array as well.
+        *
+        * @param string $table Table name
+        *
+        * @return boolean was the lock successful?
+        */
+       static public function lock($table) {
+               // See here: https://dev.mysql.com/doc/refman/5.7/en/lock-tables-and-transactions.html
+               self::e("SET autocommit=0");
+               $success = self::e("LOCK TABLES `".self::$dbo->escape($table)."` WRITE");
+               if (!$success) {
+                       self::e("SET autocommit=1");
+               } else {
+                       self::$in_transaction = true;
+               }
+               return $success;
+       }
+
+       /**
+        * @brief Unlocks all locked tables
+        *
+        * @return boolean was the unlock successful?
+        */
+       static public function unlock() {
+               // See here: https://dev.mysql.com/doc/refman/5.7/en/lock-tables-and-transactions.html
+               self::e("COMMIT");
+               $success = self::e("UNLOCK TABLES");
+               self::e("SET autocommit=1");
+               self::$in_transaction = false;
+               return $success;
+       }
+
        /**
         * @brief Starts a transaction
         *
index 156453d935dcde5e2d3f65dd8f113dc6f3ac1dde..161df46f698e26c00f805e4f0a6e03c9d7bad098 100644 (file)
@@ -1205,7 +1205,7 @@ function db_definition() {
                                        "id" => array("type" => "int(11)", "not null" => "1", "extra" => "auto_increment", "primary" => "1"),
                                        "name" => array("type" => "varchar(128)", "not null" => "1", "default" => ""),
                                        "locked" => array("type" => "tinyint(1)", "not null" => "1", "default" => "0"),
-                                       "created" => array("type" => "datetime", "default" => NULL_DATE),
+                                       "pid" => array("type" => "int(10) unsigned", "not null" => "1", "default" => "0"),
                                        ),
                        "indexes" => array(
                                        "PRIMARY" => array("id"),
@@ -1743,6 +1743,7 @@ function db_definition() {
                        "indexes" => array(
                                        "PRIMARY" => array("id"),
                                        "pid" => array("pid"),
+                                       "parameter" => array("parameter(192)"),
                                        "priority_created" => array("priority", "created"),
                                        )
                        );
diff --git a/include/lock.php b/include/lock.php
deleted file mode 100644 (file)
index 64f6319..0000000
+++ /dev/null
@@ -1,80 +0,0 @@
-<?php
-
-// Provide some ability to lock a PHP function so that multiple processes
-// can't run the function concurrently
-if (! function_exists('lock_function')) {
-function lock_function($fn_name, $block = true, $wait_sec = 2, $timeout = 30) {
-       if ( $wait_sec == 0 )
-               $wait_sec = 2;  // don't let the user pick a value that's likely to crash the system
-
-       $got_lock = false;
-       $start = time();
-
-       do {
-               q("LOCK TABLE `locks` WRITE");
-               $r = q("SELECT `locked`, `created` FROM `locks` WHERE `name` = '%s' LIMIT 1",
-                       dbesc($fn_name)
-               );
-
-               if ((dbm::is_result($r)) AND (!$r[0]['locked'] OR (strtotime($r[0]['created']) < time() - 3600))) {
-                       q("UPDATE `locks` SET `locked` = 1, `created` = '%s' WHERE `name` = '%s'",
-                               dbesc(datetime_convert()),
-                               dbesc($fn_name)
-                       );
-                       $got_lock = true;
-               }
-               elseif (! dbm::is_result($r)) {
-                       /// @TODO the Boolean value for count($r) should be equivalent to the Boolean value of $r
-                       q("INSERT INTO `locks` (`name`, `created`, `locked`) VALUES ('%s', '%s', 1)",
-                               dbesc($fn_name),
-                               dbesc(datetime_convert())
-                       );
-                       $got_lock = true;
-               }
-
-               q("UNLOCK TABLES");
-
-               if (($block) && (! $got_lock))
-                       sleep($wait_sec);
-
-       } while (($block) && (! $got_lock) && ((time() - $start) < $timeout));
-
-       logger('lock_function: function ' . $fn_name . ' with blocking = ' . $block . ' got_lock = ' . $got_lock . ' time = ' . (time() - $start), LOGGER_DEBUG);
-
-       return $got_lock;
-}}
-
-
-if (! function_exists('block_on_function_lock')) {
-function block_on_function_lock($fn_name, $wait_sec = 2, $timeout = 30) {
-       if ( $wait_sec == 0 )
-               $wait_sec = 2;  // don't let the user pick a value that's likely to crash the system
-
-       $start = time();
-
-       do {
-               $r = q("SELECT locked FROM locks WHERE name = '%s' LIMIT 1",
-                       dbesc($fn_name)
-               );
-
-               if (dbm::is_result($r) && $r[0]['locked']) {
-                       sleep($wait_sec);
-               }
-
-       } while (dbm::is_result($r) && $r[0]['locked'] && ((time() - $start) < $timeout));
-
-       return;
-}}
-
-
-if (! function_exists('unlock_function')) {
-function unlock_function($fn_name) {
-       $r = q("UPDATE `locks` SET `locked` = 0, `created` = '%s' WHERE `name` = '%s'",
-                       dbesc(NULL_DATE),
-                       dbesc($fn_name)
-            );
-
-       logger('unlock_function: released lock for function ' . $fn_name, LOGGER_DEBUG);
-
-       return;
-}}
index 74cfabb6cd48c1be400025c67a0eac6f40ca160c..a08057f6772fa49611bb79f9fb29cec0f9f35ffb 100644 (file)
@@ -498,7 +498,7 @@ function notifier_run(&$argv, &$argc){
                        }
                        logger("Deliver ".$target_item["guid"]." to ".$contact['url']." via network ".$contact['network'], LOGGER_DEBUG);
 
-                       proc_run($priority, 'include/delivery.php', $cmd, $item_id, $contact['id']);
+                       proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $contact['id']);
                }
        }
 
@@ -563,7 +563,7 @@ function notifier_run(&$argv, &$argc){
 
                                if ((! $mail) && (! $fsuggest) && (! $followup)) {
                                        logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]);
-                                       proc_run($priority, 'include/delivery.php', $cmd, $item_id, $rr['id']);
+                                       proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $rr['id']);
                                }
                        }
                }
@@ -603,7 +603,7 @@ function notifier_run(&$argv, &$argc){
                }
 
                // Handling the pubsubhubbub requests
-               proc_run(PRIORITY_HIGH, 'include/pubsubpublish.php');
+               proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php');
        }
 
        logger('notifier: calling hooks', LOGGER_DEBUG);
index 0011559c6929a05417152f63b6522ef62c411391..fcabe5d8ee9bf14ee388bdbc6b40c7a1cf8d71b3 100644 (file)
@@ -2,13 +2,14 @@
 
 use Friendica\App;
 use Friendica\Core\Config;
+use Friendica\Util\Lock;
 
 if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) {
        $directory = dirname($_SERVER["argv"][0]);
 
-       if (substr($directory, 0, 1) != "/")
+       if (substr($directory, 0, 1) != "/") {
                $directory = $_SERVER["PWD"]."/".$directory;
-
+       }
        $directory = realpath($directory."/..");
 
        chdir($directory);
@@ -19,16 +20,12 @@ require_once("boot.php");
 function poller_run($argv, $argc){
        global $a, $db;
 
-       if (is_null($a)) {
-               $a = new App(dirname(__DIR__));
-       }
+       $a = new App(dirname(__DIR__));
 
-       if (is_null($db)) {
-               @include(".htconfig.php");
-               require_once("include/dba.php");
-               $db = new dba($db_host, $db_user, $db_pass, $db_data);
-               unset($db_host, $db_user, $db_pass, $db_data);
-       };
+       @include(".htconfig.php");
+       require_once("include/dba.php");
+       $db = new dba($db_host, $db_user, $db_pass, $db_data);
+       unset($db_host, $db_user, $db_pass, $db_data);
 
        Config::load();
 
@@ -41,54 +38,74 @@ function poller_run($argv, $argc){
 
        load_hooks();
 
-       $a->start_process();
-
-       if ($a->min_memory_reached()) {
+       // At first check the maximum load. We shouldn't continue with a high load
+       if ($a->maxload_reached()) {
+               logger('Pre check: maximum load reached, quitting.', LOGGER_DEBUG);
                return;
        }
 
-       if (poller_max_connections_reached()) {
+       // We now start the process. This is done after the load check since this could increase the load.
+       $a->start_process();
+
+       // At first we check the number of workers and quit if there are too much of them
+       // This is done at the top to avoid that too much code is executed without a need to do so,
+       // since the poller mostly quits here.
+       if (poller_too_much_workers()) {
+               poller_kill_stale_workers();
+               logger('Pre check: Active worker limit reached, quitting.', LOGGER_DEBUG);
                return;
        }
 
-       if ($a->maxload_reached()) {
+       // Do we have too few memory?
+       if ($a->min_memory_reached()) {
+               logger('Pre check: Memory limit reached, quitting.', LOGGER_DEBUG);
                return;
        }
 
-       if (($argc <= 1) OR ($argv[1] != "no_cron")) {
-               poller_run_cron();
+       // Possibly there are too much database connections
+       if (poller_max_connections_reached()) {
+               logger('Pre check: maximum connections reached, quitting.', LOGGER_DEBUG);
+               return;
        }
 
+       // Possibly there are too much database processes that block the system
        if ($a->max_processes_reached()) {
+               logger('Pre check: maximum processes reached, quitting.', LOGGER_DEBUG);
                return;
        }
 
-       // Checking the number of workers
-       if (poller_too_much_workers()) {
-               poller_kill_stale_workers();
-               return;
+       // Now we start additional cron processes if we should do so
+       if (($argc <= 1) OR ($argv[1] != "no_cron")) {
+               poller_run_cron();
        }
 
        $starttime = time();
 
+       // We fetch the next queue entry that is about to be executed
        while ($r = poller_worker_process()) {
 
+               // If we got that queue entry we claim it for us
                if (!poller_claim_process($r[0])) {
                        continue;
                }
 
-               // Check free memory
-               if ($a->min_memory_reached()) {
-                       logger('Memory limit reached, quitting.', LOGGER_DEBUG);
-                       return;
-               }
+               // To avoid the quitting of multiple pollers only one poller at a time will execute the check
+               if (Lock::set('poller_worker', 0)) {
+                       // Count active workers and compare them with a maximum value that depends on the load
+                       if (poller_too_much_workers()) {
+                               logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
+                               return;
+                       }
 
-               // Count active workers and compare them with a maximum value that depends on the load
-               if (poller_too_much_workers()) {
-                       logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
-                       return;
+                       // Check free memory
+                       if ($a->min_memory_reached()) {
+                               logger('Memory limit reached, quitting.', LOGGER_DEBUG);
+                               return;
+                       }
+                       Lock::remove('poller_worker');
                }
 
+               // finally the work will be done
                if (!poller_execute($r[0])) {
                        logger('Process execution failed, quitting.', LOGGER_DEBUG);
                        return;
@@ -96,7 +113,7 @@ function poller_run($argv, $argc){
 
                // Quit the poller once every hour
                if (time() > ($starttime + 3600)) {
-                       logger('Process lifetime reachted, quitting.', LOGGER_DEBUG);
+                       logger('Process lifetime reached, quitting.', LOGGER_DEBUG);
                        return;
                }
        }
@@ -150,7 +167,6 @@ function poller_execute($queue) {
        $funcname = str_replace(".php", "", basename($argv[0]))."_run";
 
        if (function_exists($funcname)) {
-
                poller_exec_function($queue, $funcname, $argv);
                dba::delete('workerqueue', array('id' => $queue["id"]));
        } else {
@@ -226,24 +242,27 @@ function poller_exec_function($queue, $funcname, $argv) {
                                $o = "\nDatabase Read:\n";
                                foreach ($a->callstack["database"] AS $func => $time) {
                                        $time = round($time, 3);
-                                       if ($time > 0)
+                                       if ($time > 0) {
                                                $o .= $func.": ".$time."\n";
+                                       }
                                }
                        }
                        if (isset($a->callstack["database_write"])) {
                                $o .= "\nDatabase Write:\n";
                                foreach ($a->callstack["database_write"] AS $func => $time) {
                                        $time = round($time, 3);
-                                       if ($time > 0)
+                                       if ($time > 0) {
                                                $o .= $func.": ".$time."\n";
+                                       }
                                }
                        }
                        if (isset($a->callstack["network"])) {
                                $o .= "\nNetwork:\n";
                                foreach ($a->callstack["network"] AS $func => $time) {
                                        $time = round($time, 3);
-                                       if ($time > 0)
+                                       if ($time > 0) {
                                                $o .= $func.": ".$time."\n";
+                                       }
                                }
                        }
                } else {
@@ -284,27 +303,30 @@ function poller_max_connections_reached() {
        if ($max == 0) {
                // the maximum number of possible user connections can be a system variable
                $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
-               if ($r)
+               if (dbm::is_result($r)) {
                        $max = $r[0]["Value"];
-
+               }
                // Or it can be granted. This overrides the system variable
                $r = q("SHOW GRANTS");
-               if ($r)
+               if (dbm::is_result($r)) {
                        foreach ($r AS $grants) {
                                $grant = array_pop($grants);
-                               if (stristr($grant, "GRANT USAGE ON"))
-                                       if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match))
+                               if (stristr($grant, "GRANT USAGE ON")) {
+                                       if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match)) {
                                                $max = $match[1];
+                                       }
+                               }
                        }
+               }
        }
 
        // If $max is set we will use the processlist to determine the current number of connections
        // The processlist only shows entries of the current user
        if ($max != 0) {
                $r = q("SHOW PROCESSLIST");
-               if (!dbm::is_result($r))
+               if (!dbm::is_result($r)) {
                        return false;
-
+               }
                $used = count($r);
 
                logger("Connection usage (user values): ".$used."/".$max, LOGGER_DEBUG);
@@ -320,28 +342,28 @@ function poller_max_connections_reached() {
        // We will now check for the system values.
        // This limit could be reached although the user limits are fine.
        $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_connections'");
-       if (!$r)
+       if (!dbm::is_result($r)) {
                return false;
-
+       }
        $max = intval($r[0]["Value"]);
-       if ($max == 0)
+       if ($max == 0) {
                return false;
-
+       }
        $r = q("SHOW STATUS WHERE `variable_name` = 'Threads_connected'");
-       if (!$r)
+       if (!dbm::is_result($r)) {
                return false;
-
+       }
        $used = intval($r[0]["Value"]);
-       if ($used == 0)
+       if ($used == 0) {
                return false;
-
+       }
        logger("Connection usage (system values): ".$used."/".$max, LOGGER_DEBUG);
 
        $level = $used / $max * 100;
 
-       if ($level < $maxlevel)
+       if ($level < $maxlevel) {
                return false;
-
+       }
        logger("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
        return true;
 }
@@ -440,8 +462,7 @@ function poller_too_much_workers() {
                        dba::close($processes);
                }
                dba::close($entries);
-
-               $processlist = implode(', ', $listitem);
+               $processlist = ' ('.implode(', ', $listitem).')';
 
                $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
                $entries = $s[0]["total"];
@@ -460,7 +481,7 @@ function poller_too_much_workers() {
                        }
                }
 
-               logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries." (".$processlist.") - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG);
+               logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries.$processlist." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG);
 
                // Are there fewer workers running as possible? Then fork a new one.
                if (!Config::get("system", "worker_dont_fork") AND ($queues > ($active + 1)) AND ($entries > 1)) {
@@ -471,7 +492,7 @@ function poller_too_much_workers() {
                }
        }
 
-       return($active >= $queues);
+       return $active >= $queues;
 }
 
 /**
@@ -482,7 +503,7 @@ function poller_too_much_workers() {
 function poller_active_workers() {
        $workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'poller.php'");
 
-       return($workers[0]["processes"]);
+       return $workers[0]["processes"];
 }
 
 /**
@@ -503,36 +524,37 @@ function poller_passing_slow(&$highest_priority) {
                INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid`");
 
        // No active processes at all? Fine
-       if (!dbm::is_result($r))
-               return(false);
-
+       if (!dbm::is_result($r)) {
+               return false;
+       }
        $priorities = array();
-       foreach ($r AS $line)
+       foreach ($r AS $line) {
                $priorities[] = $line["priority"];
-
+       }
        // Should not happen
-       if (count($priorities) == 0)
-               return(false);
-
+       if (count($priorities) == 0) {
+               return false;
+       }
        $highest_priority = min($priorities);
 
        // The highest process is already the slowest one?
        // Then we quit
-       if ($highest_priority == PRIORITY_NEGLIGIBLE)
-               return(false);
-
+       if ($highest_priority == PRIORITY_NEGLIGIBLE) {
+               return false;
+       }
        $high = 0;
-       foreach ($priorities AS $priority)
-               if ($priority == $highest_priority)
+       foreach ($priorities AS $priority) {
+               if ($priority == $highest_priority) {
                        ++$high;
-
+               }
+       }
        logger("Highest priority: ".$highest_priority." Total processes: ".count($priorities)." Count high priority processes: ".$high, LOGGER_DEBUG);
        $passing_slow = (($high/count($priorities)) > (2/3));
 
-       if ($passing_slow)
+       if ($passing_slow) {
                logger("Passing slower processes than priority ".$highest_priority, LOGGER_DEBUG);
-
-       return($passing_slow);
+       }
+       return $passing_slow;
 }
 
 /**
@@ -546,7 +568,7 @@ function poller_worker_process() {
        $highest_priority = 0;
 
        if (poller_passing_slow($highest_priority)) {
-               dba::e('LOCK TABLES `workerqueue` WRITE');
+               dba::lock('workerqueue');
 
                // Are there waiting processes with a higher priority than the currently highest?
                $r = q("SELECT * FROM `workerqueue`
@@ -568,7 +590,7 @@ function poller_worker_process() {
                        return $r;
                }
        } else {
-               dba::e('LOCK TABLES `workerqueue` WRITE');
+               dba::lock('workerqueue');
        }
 
        // If there is no result (or we shouldn't pass lower processes) we check without priority limit
@@ -578,7 +600,7 @@ function poller_worker_process() {
 
        // We only unlock the tables here, when we got no data
        if (!dbm::is_result($r)) {
-               dba::e('UNLOCK TABLES');
+               dba::unlock();
        }
 
        return $r;
@@ -598,7 +620,7 @@ function poller_claim_process($queue) {
 
        $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
                        array('id' => $queue["id"], 'pid' => 0));
-       dba::e('UNLOCK TABLES');
+       dba::unlock();
 
        if (!$success) {
                logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
@@ -729,5 +751,7 @@ if (array_search(__file__,get_included_files())===0){
 
        get_app()->end_process();
 
+       Lock::remove('poller_worker');
+
        killme();
 }
index cde256a4034f266dafd4061db8c9de2042a2f308..1112969f2705ee3a9f773aceded4c0dfef8558e1 100644 (file)
@@ -17,7 +17,7 @@ function pubsubpublish_run(&$argv, &$argc){
 
                foreach ($r as $rr) {
                        logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG);
-                       proc_run(PRIORITY_HIGH, 'include/pubsubpublish.php', $rr["id"]);
+                       proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php', $rr["id"]);
                }
        }
 
index dbed4604735e7eaedf0b672f665c4e7d6ac8be4b..cf5ed9a8156c28cd3e9a33fa8e449789f30c343a 100644 (file)
@@ -27,7 +27,7 @@ function queue_run(&$argv, &$argc){
                logger('queue: start');
 
                // Handling the pubsubhubbub requests
-               proc_run(PRIORITY_HIGH,'include/pubsubpublish.php');
+               proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php');
 
                $r = q("SELECT `queue`.*, `contact`.`name`, `contact`.`uid` FROM `queue`
                        INNER JOIN `contact` ON `queue`.`cid` = `contact`.`id`
@@ -51,7 +51,7 @@ function queue_run(&$argv, &$argc){
                if (dbm::is_result($r)) {
                        foreach ($r as $q_item) {
                                logger('Call queue for id '.$q_item['id']);
-                               proc_run(PRIORITY_LOW, "include/queue.php", $q_item['id']);
+                               proc_run(array('priority' => PRIORITY_LOW, 'dont_fork' => true), "include/queue.php", $q_item['id']);
                        }
                }
                return;
index fbac08cc9736b080bb394ac2b643055a49c072e3..7a39e388be8bba407ef3ba73fe11a39c1ea070ce 100644 (file)
@@ -1995,10 +1995,11 @@ function get_gcontact_id($contact) {
        if (in_array($contact["network"], array(NETWORK_DFRN, NETWORK_DIASPORA, NETWORK_OSTATUS)))
                $contact["url"] = clean_contact_url($contact["url"]);
 
-       $r = q("SELECT `id`, `last_contact`, `last_failure`, `network` FROM `gcontact` WHERE `nurl` = '%s' LIMIT 2",
+       dba::lock('gcontact');
+       $r = q("SELECT `id`, `last_contact`, `last_failure`, `network` FROM `gcontact` WHERE `nurl` = '%s' LIMIT 1",
                dbesc(normalise_link($contact["url"])));
 
-       if ($r) {
+       if (dbm::is_result($r)) {
                $gcontact_id = $r[0]["id"];
 
                // Update every 90 days
@@ -2036,17 +2037,13 @@ function get_gcontact_id($contact) {
                        $doprobing = in_array($r[0]["network"], array(NETWORK_DFRN, NETWORK_DIASPORA, NETWORK_OSTATUS, ""));
                }
        }
+       dba::unlock();
 
        if ($doprobing) {
                logger("Last Contact: ". $last_contact_str." - Last Failure: ".$last_failure_str." - Checking: ".$contact["url"], LOGGER_DEBUG);
                proc_run(PRIORITY_LOW, 'include/gprobe.php', bin2hex($contact["url"]));
        }
 
-       if ((dbm::is_result($r)) AND (count($r) > 1) AND ($gcontact_id > 0) AND ($contact["url"] != ""))
-        q("DELETE FROM `gcontact` WHERE `nurl` = '%s' AND `id` != %d",
-               dbesc(normalise_link($contact["url"])),
-               intval($gcontact_id));
-
        return $gcontact_id;
 }
 
index d671c5f1abf3ffbff872cd47924e911ecfe1da45..26cfcaadb1021f0527ddd4e85b78c3b60d739309 100644 (file)
@@ -797,6 +797,8 @@ class App {
         * @return bool Is the limit reached?
         */
        function max_processes_reached() {
+               // Deactivated, needs more investigating if this check really makes sense
+               return false;
 
                if ($this->is_backend()) {
                        $process = 'backend';
diff --git a/src/Util/Lock.php b/src/Util/Lock.php
new file mode 100644 (file)
index 0000000..26ccdc9
--- /dev/null
@@ -0,0 +1,159 @@
+<?php
+
+namespace Friendica\Util;
+
+/**
+ * @file src/Util/Lock.php
+ * @brief Functions for preventing parallel execution of functions
+ *
+ */
+
+use Friendica\Core\Config;
+use Memcache;
+use dba;
+use dbm;
+
+/**
+ * @brief This class contain Functions for preventing parallel execution of functions
+ */
+class Lock {
+       /**
+        * @brief Check for memcache and open a connection if configured
+        *
+        * @return object|boolean The memcache object - or "false" if not successful
+        */
+       private static function connectMemcache() {
+               if (!function_exists('memcache_connect')) {
+                       return false;
+               }
+
+               if (!Config::get('system', 'memcache')) {
+                       return false;
+               }
+
+               $memcache_host = Config::get('system', 'memcache_host', '127.0.0.1');
+               $memcache_port = Config::get('system', 'memcache_port', 11211);
+
+               $memcache = new Memcache;
+
+               if (!$memcache->connect($memcache_host, $memcache_port)) {
+                       return false;
+               }
+
+               return $memcache;
+       }
+
+       /**
+        * @brief Sets a lock for a given name
+        *
+        * @param string $fn_name Name of the lock
+        * @param integer $timeout Seconds until we give up
+        *
+        * @return boolean Was the lock successful?
+        */
+       public static function set($fn_name, $timeout = 120) {
+               $got_lock = false;
+               $start = time();
+
+               $memcache = self::connectMemcache();
+               if (is_object($memcache)) {
+                       $wait_sec = 0.2;
+                       $cachekey = get_app()->get_hostname().";lock:".$fn_name;
+
+                       do {
+                               $lock = $memcache->get($cachekey);
+
+                               if (!is_bool($lock)) {
+                                       $pid = (int)$lock;
+
+                                       // When the process id isn't used anymore, we can safely claim the lock for us.
+                                       // Or we do want to lock something that was already locked by us.
+                                       if (!posix_kill($pid, 0) OR ($pid == getmypid())) {
+                                               $lock = false;
+                                       }
+                               }
+                               if (is_bool($lock)) {
+                                       $memcache->set($cachekey, getmypid(), MEMCACHE_COMPRESSED, 300);
+                                       $got_lock = true;
+                               }
+                               if (!$got_lock AND ($timeout > 0)) {
+                                       usleep($wait_sec * 1000000);
+                               }
+                       } while (!$got_lock AND ((time() - $start) < $timeout));
+
+                       return $got_lock;
+               }
+
+               $wait_sec = 2;
+
+               do {
+                       dba::lock('locks');
+                       $lock = dba::select('locks', array('locked', 'pid'), array('name' => $fn_name), array('limit' => 1));
+
+                       if (dbm::is_result($lock)) {
+                               if ($lock['locked']) {
+                                       // When the process id isn't used anymore, we can safely claim the lock for us.
+                                       if (!posix_kill($lock['pid'], 0)) {
+                                               $lock['locked'] = false;
+                                       }
+                                       // We want to lock something that was already locked by us? So we got the lock.
+                                       if ($lock['pid'] == getmypid()) {
+                                               $got_lock = true;
+                                       }
+                               }
+                               if (!$lock['locked']) {
+                                       dba::update('locks', array('locked' => true, 'pid' => getmypid()), array('name' => $fn_name));
+                                       $got_lock = true;
+                               }
+                       } elseif (!dbm::is_result($lock)) {
+                               dba::insert('locks', array('name' => $fn_name, 'locked' => true, 'pid' => getmypid()));
+                               $got_lock = true;
+                       }
+
+                       dba::unlock();
+
+                       if (!$got_lock AND ($timeout > 0)) {
+                               sleep($wait_sec);
+                       }
+               } while (!$got_lock AND ((time() - $start) < $timeout));
+
+               return $got_lock;
+       }
+
+       /**
+        * @brief Removes a lock if it was set by us
+        *
+        * @param string $fn_name Name of the lock
+        */
+       public static function remove($fn_name) {
+               $memcache = self::connectMemcache();
+               if (is_object($memcache)) {
+                       $cachekey = get_app()->get_hostname().";lock:".$fn_name;
+                       $lock = $memcache->get($cachekey);
+
+                       if (!is_bool($lock)) {
+                               if ((int)$lock == getmypid()) {
+                                       $memcache->delete($cachekey);
+                               }
+                       }
+                       return;
+               }
+
+               dba::update('locks', array('locked' => false, 'pid' => 0), array('name' => $fn_name, 'pid' => getmypid()));
+               return;
+       }
+
+       /**
+        * @brief Removes all lock that were set by us
+        */
+       public static function removeAll() {
+               $memcache = self::connectMemcache();
+               if (is_object($memcache)) {
+                       // We cannot delete all cache entries, but this doesn't matter with memcache
+                       return;
+               }
+
+               dba::update('locks', array('locked' => false, 'pid' => 0), array('pid' => getmypid()));
+               return;
+       }
+}
index 7561a9af19546e0d388dd2750da1683fb22f088b..76620a48a4629eb4b26ac5169798714bac0064b1 100644 (file)
@@ -1,6 +1,6 @@
 <?php
 
-define('UPDATE_VERSION' , 1227);
+define('UPDATE_VERSION' , 1228);
 
 /**
  *