From: Philipp Holzer Date: Tue, 26 Jun 2018 20:31:04 +0000 (+0200) Subject: Lock abstraction (like the Cache) X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=0218d16335cd4b873c3edd59fbcc110306d87e71;p=friendica.git Lock abstraction (like the Cache) - adding interface - adding seperate drivers - moving Lock to the Core package --- diff --git a/src/Core/Lock.php b/src/Core/Lock.php new file mode 100644 index 0000000000..9e02d14fcb --- /dev/null +++ b/src/Core/Lock.php @@ -0,0 +1,97 @@ +=')) { + self::$driver = new Lock\SemaphoreLockDriver(); + } elseif (Config::get('system', 'cache_driver', 'database') == 'memcache') { + self::$driver = new Lock\MemcacheLockDriver(); + } else { + self::$driver = new Lock\DatabaseLockDriver(); + } + } + } + + /** + * Returns the current cache driver + * + * @return Lock\ILockDriver; + */ + private static function getDriver() + { + if (self::$driver === null) { + self::init(); + } + + return self::$driver; + } + + /** + * @brief Acquires a lock for a given name + * + * @param string $key Name of the lock + * @param integer $timeout Seconds until we give up + * + * @return boolean Was the lock successful? + */ + public static function acquireLock($key, $timeout = 120) + { + return self::getDriver()->acquireLock($key, $timeout); + } + + /** + * @brief Releases a lock if it was set by us + * + * @param string $key Name of the lock + * @return mixed + */ + public static function releaseLock($key) + { + return self::getDriver()->releaseLock($key); + } + + /** + * @brief Releases all lock that were set by us + * @return void + */ + public static function releaseAll() + { + self::getDriver()->releaseAll(); + } +} diff --git a/src/Core/Lock/DatabaseLockDriver.php b/src/Core/Lock/DatabaseLockDriver.php new file mode 100644 index 0000000000..8761a1479e --- /dev/null +++ b/src/Core/Lock/DatabaseLockDriver.php @@ -0,0 +1,83 @@ + $key]); + + if (DBM::is_result($lock)) { + if ($lock['locked']) { + // When the process id isn't used anymore, we can safely claim the lock for us. + if (!posix_kill($lock['pid'], 0)) { + $lock['locked'] = false; + } + // We want to lock something that was already locked by us? So we got the lock. + if ($lock['pid'] == getmypid()) { + $got_lock = true; + } + } + if (!$lock['locked']) { + dba::update('locks', ['locked' => true, 'pid' => getmypid()], ['name' => $key]); + $got_lock = true; + } + } elseif (!DBM::is_result($lock)) { + dba::insert('locks', ['name' => $key, 'locked' => true, 'pid' => getmypid()]); + $got_lock = true; + } + + dba::unlock(); + + if (!$got_lock && ($timeout > 0)) { + usleep(rand(100000, 2000000)); + } + } while (!$got_lock && ((time() - $start) < $timeout)); + + return $got_lock; + } + + /** + * @brief Removes a lock if it was set by us + * + * @param string $key Name of the lock + * + * @return mixed + */ + public function releaseLock($key) + { + dba::update('locks', ['locked' => false, 'pid' => 0], ['name' => $key, 'pid' => getmypid()]); + + return; + } + + /** + * @brief Removes all lock that were set by us + * + * @return void + */ + public function releaseAll() + { + dba::update('locks', ['locked' => false, 'pid' => 0], ['pid' => getmypid()]); + } +} diff --git a/src/Core/Lock/ILockDriver.php b/src/Core/Lock/ILockDriver.php new file mode 100644 index 0000000000..b066361be6 --- /dev/null +++ b/src/Core/Lock/ILockDriver.php @@ -0,0 +1,38 @@ + + */ +interface ILockDriver +{ + /** + * + * @brief Acquires a lock for a given name + * + * @param string $key The Name of the lock + * @param integer $timeout Seconds until we give up + * + * @return boolean Was the lock successful? + */ + public function acquireLock($key, $timeout = 120); + + /** + * @brief Releases a lock if it was set by us + * + * @param string $key Name of the lock + * + * @return mixed + */ + public function releaseLock($key); + + /** + * @brief Releases all lock that were set by us + * + * @return void + */ + public function releaseAll(); +} \ No newline at end of file diff --git a/src/Core/Lock/MemcacheLockDriver.php b/src/Core/Lock/MemcacheLockDriver.php new file mode 100644 index 0000000000..1796d81366 --- /dev/null +++ b/src/Core/Lock/MemcacheLockDriver.php @@ -0,0 +1,86 @@ +get_hostname() . ";lock:" . $key; + + do { + // We only lock to be sure that nothing happens at exactly the same time + dba::lock('locks'); + $lock = Cache::get($cachekey); + + if (!is_bool($lock)) { + $pid = (int)$lock; + + // When the process id isn't used anymore, we can safely claim the lock for us. + // Or we do want to lock something that was already locked by us. + if (!posix_kill($pid, 0) || ($pid == getmypid())) { + $lock = false; + } + } + if (is_bool($lock)) { + Cache::set($cachekey, getmypid(), 300); + $got_lock = true; + } + + dba::unlock(); + + if (!$got_lock && ($timeout > 0)) { + usleep(rand(10000, 200000)); + } + } while (!$got_lock && ((time() - $start) < $timeout)); + + return $got_lock; + } + + /** + * @brief Removes a lock if it was set by us + * + * @param string $key Name of the lock + * + * @return mixed + */ + public function releaseLock($key) + { + $cachekey = get_app()->get_hostname() . ";lock:" . $key; + $lock = Cache::get($cachekey); + + if (!is_bool($lock)) { + if ((int)$lock == getmypid()) { + Cache::delete($cachekey); + } + } + + return; + } + + /** + * @brief Removes all lock that were set by us + * + * @return void + */ + public function releaseAll() + { + // We cannot delete all cache entries, but this doesn't matter with memcache + return; + } +} \ No newline at end of file diff --git a/src/Core/Lock/SemaphoreLockDriver.php b/src/Core/Lock/SemaphoreLockDriver.php new file mode 100644 index 0000000000..93911a3d10 --- /dev/null +++ b/src/Core/Lock/SemaphoreLockDriver.php @@ -0,0 +1,82 @@ +=')) { + self::$semaphore[$key] = sem_get(self::semaphoreKey($key)); + if (self::$semaphore[$key]) { + return sem_acquire(self::$semaphore[$key], ($timeout == 0)); + } + } + } + + /** + * @brief Removes a lock if it was set by us + * + * @param string $key Name of the lock + * + * @return mixed + */ + public function releaseLock($key) + { + if (function_exists('sem_get') && version_compare(PHP_VERSION, '5.6.1', '>=')) { + if (empty(self::$semaphore[$key])) { + return false; + } else { + $success = @sem_release(self::$semaphore[$key]); + unset(self::$semaphore[$key]); + return $success; + } + } + } + + /** + * @brief Removes all lock that were set by us + * + * @return void + */ + public function releaseAll() + { + // not needed/supported + return; + } +} \ No newline at end of file diff --git a/src/Core/Worker.php b/src/Core/Worker.php index c965e05830..b8a9021d0f 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -10,7 +10,6 @@ use Friendica\Core\System; use Friendica\Database\DBM; use Friendica\Model\Process; use Friendica\Util\DateTimeFormat; -use Friendica\Util\Lock; use Friendica\Util\Network; use dba; @@ -108,16 +107,16 @@ class Worker } // If possible we will fetch new jobs for this worker - if (!$refetched && Lock::set('worker_process', 0)) { + if (!$refetched && Lock::acquireLock('worker_process', 0)) { $stamp = (float)microtime(true); $refetched = self::findWorkerProcesses($passing_slow); self::$db_duration += (microtime(true) - $stamp); - Lock::remove('worker_process'); + Lock::releaseLock('worker_process'); } } // To avoid the quitting of multiple workers only one worker at a time will execute the check - if (Lock::set('worker', 0)) { + if (Lock::acquireLock('worker', 0)) { $stamp = (float)microtime(true); // Count active workers and compare them with a maximum value that depends on the load if (self::tooMuchWorkers()) { @@ -130,7 +129,7 @@ class Worker logger('Memory limit reached, quitting.', LOGGER_DEBUG); return; } - Lock::remove('worker'); + Lock::releaseLock('worker'); self::$db_duration += (microtime(true) - $stamp); } @@ -883,7 +882,7 @@ class Worker dba::close($r); $stamp = (float)microtime(true); - if (!Lock::set('worker_process')) { + if (!Lock::acquireLock('worker_process')) { return false; } self::$lock_duration = (microtime(true) - $stamp); @@ -892,7 +891,7 @@ class Worker $found = self::findWorkerProcesses($passing_slow); self::$db_duration += (microtime(true) - $stamp); - Lock::remove('worker_process'); + Lock::releaseLock('worker_process'); if ($found) { $r = dba::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); @@ -1097,13 +1096,13 @@ class Worker } // If there is a lock then we don't have to check for too much worker - if (!Lock::set('worker', 0)) { + if (!Lock::acquireLock('worker', 0)) { return true; } // If there are already enough workers running, don't fork another one $quit = self::tooMuchWorkers(); - Lock::remove('worker'); + Lock::releaseLock('worker'); if ($quit) { return true; diff --git a/src/Protocol/OStatus.php b/src/Protocol/OStatus.php index 2c826221e7..69975c5753 100644 --- a/src/Protocol/OStatus.php +++ b/src/Protocol/OStatus.php @@ -19,7 +19,6 @@ use Friendica\Model\User; use Friendica\Network\Probe; use Friendica\Object\Image; use Friendica\Util\DateTimeFormat; -use Friendica\Util\Lock; use Friendica\Util\Network; use Friendica\Util\XML; use dba; @@ -513,9 +512,9 @@ class OStatus logger("Item with uri ".$item["uri"]." is from a blocked contact.", LOGGER_DEBUG); } else { // We are having duplicated entries. Hopefully this solves it. - if (Lock::set('ostatus_process_item_insert')) { + if (Lock::acquireLock('ostatus_process_item_insert')) { $ret = Item::insert($item); - Lock::remove('ostatus_process_item_insert'); + Lock::releaseLock('ostatus_process_item_insert'); logger("Item with uri ".$item["uri"]." for user ".$importer["uid"].' stored. Return value: '.$ret); } else { $ret = Item::insert($item); diff --git a/src/Util/Lock.php b/src/Util/Lock.php deleted file mode 100644 index eba264ad93..0000000000 --- a/src/Util/Lock.php +++ /dev/null @@ -1,211 +0,0 @@ -connect($memcache_host, $memcache_port)) { - return false; - } - - return $memcache; - } - - /** - * @brief Creates a semaphore key - * - * @param string $fn_name Name of the lock - * - * @return ressource the semaphore key - */ - private static function semaphoreKey($fn_name) - { - $temp = get_temppath(); - - $file = $temp.'/'.$fn_name.'.sem'; - - if (!file_exists($file)) { - file_put_contents($file, $fn_name); - } - - return ftok($file, 'f'); - } - - /** - * @brief Sets a lock for a given name - * - * @param string $fn_name Name of the lock - * @param integer $timeout Seconds until we give up - * - * @return boolean Was the lock successful? - */ - public static function set($fn_name, $timeout = 120) - { - $got_lock = false; - $start = time(); - - // The second parameter for "sem_acquire" doesn't exist before 5.6.1 - if (function_exists('sem_get') && version_compare(PHP_VERSION, '5.6.1', '>=')) { - self::$semaphore[$fn_name] = sem_get(self::semaphoreKey($fn_name)); - if (self::$semaphore[$fn_name]) { - return sem_acquire(self::$semaphore[$fn_name], ($timeout == 0)); - } - } - - $memcache = self::connectMemcache(); - if (is_object($memcache)) { - $cachekey = get_app()->get_hostname().";lock:".$fn_name; - - do { - // We only lock to be sure that nothing happens at exactly the same time - dba::lock('locks'); - $lock = $memcache->get($cachekey); - - if (!is_bool($lock)) { - $pid = (int)$lock; - - // When the process id isn't used anymore, we can safely claim the lock for us. - // Or we do want to lock something that was already locked by us. - if (!posix_kill($pid, 0) || ($pid == getmypid())) { - $lock = false; - } - } - if (is_bool($lock)) { - $memcache->set($cachekey, getmypid(), MEMCACHE_COMPRESSED, 300); - $got_lock = true; - } - - dba::unlock(); - - if (!$got_lock && ($timeout > 0)) { - usleep(rand(10000, 200000)); - } - } while (!$got_lock && ((time() - $start) < $timeout)); - - return $got_lock; - } - - do { - dba::lock('locks'); - $lock = dba::selectFirst('locks', ['locked', 'pid'], ['name' => $fn_name]); - - if (DBM::is_result($lock)) { - if ($lock['locked']) { - // When the process id isn't used anymore, we can safely claim the lock for us. - if (!posix_kill($lock['pid'], 0)) { - $lock['locked'] = false; - } - // We want to lock something that was already locked by us? So we got the lock. - if ($lock['pid'] == getmypid()) { - $got_lock = true; - } - } - if (!$lock['locked']) { - dba::update('locks', ['locked' => true, 'pid' => getmypid()], ['name' => $fn_name]); - $got_lock = true; - } - } elseif (!DBM::is_result($lock)) { - dba::insert('locks', ['name' => $fn_name, 'locked' => true, 'pid' => getmypid()]); - $got_lock = true; - } - - dba::unlock(); - - if (!$got_lock && ($timeout > 0)) { - usleep(rand(100000, 2000000)); - } - } while (!$got_lock && ((time() - $start) < $timeout)); - - return $got_lock; - } - - /** - * @brief Removes a lock if it was set by us - * - * @param string $fn_name Name of the lock - * @return mixed - */ - public static function remove($fn_name) - { - if (function_exists('sem_get') && version_compare(PHP_VERSION, '5.6.1', '>=')) { - if (empty(self::$semaphore[$fn_name])) { - return false; - } else { - $success = @sem_release(self::$semaphore[$fn_name]); - unset(self::$semaphore[$fn_name]); - return $success; - } - } - - $memcache = self::connectMemcache(); - if (is_object($memcache)) { - $cachekey = get_app()->get_hostname().";lock:".$fn_name; - $lock = $memcache->get($cachekey); - - if (!is_bool($lock)) { - if ((int)$lock == getmypid()) { - $memcache->delete($cachekey); - } - } - return; - } - - dba::update('locks', ['locked' => false, 'pid' => 0], ['name' => $fn_name, 'pid' => getmypid()]); - return; - } - - /** - * @brief Removes all lock that were set by us - * @return void - */ - public static function removeAll() - { - $memcache = self::connectMemcache(); - if (is_object($memcache)) { - // We cannot delete all cache entries, but this doesn't matter with memcache - return; - } - - dba::update('locks', ['locked' => false, 'pid' => 0], ['pid' => getmypid()]); - return; - } -}