From 9351d55c9cac37f5967e0a75883af7a93ea44a92 Mon Sep 17 00:00:00 2001 From: James Turner Date: Wed, 8 Jun 2016 15:27:47 +0100 Subject: [PATCH] Threadsafe terrasync state updates/reading. --- simgear/scene/tsync/terrasync.cxx | 301 +++++++++++++++++------------- 1 file changed, 173 insertions(+), 128 deletions(-) diff --git a/simgear/scene/tsync/terrasync.cxx b/simgear/scene/tsync/terrasync.cxx index da49af48..bc8b14ac 100644 --- a/simgear/scene/tsync/terrasync.cxx +++ b/simgear/scene/tsync/terrasync.cxx @@ -55,6 +55,9 @@ #include #include #include +#include +#include + #include #include #include @@ -154,7 +157,8 @@ class SyncSlot public: SyncSlot() : isNewDirectory(false), - busy(false) + busy(false), + pendingKBytes(0) {} SyncItem currentItem; @@ -191,6 +195,36 @@ static unsigned int syncSlotForType(SyncItem::Type ty) } } +struct TerrasyncThreadState +{ + TerrasyncThreadState() : + _busy(false), + _stalled(false), + _fail_count(0), + _updated_tile_count(0), + _success_count(0), + _consecutive_errors(0), + _allowed_errors(6), + _cache_hits(0), + _transfer_rate(0), + _total_kb_downloaded(0), + _totalKbPending(0) + {} + + bool _busy; + bool _stalled; + int _fail_count; + int _updated_tile_count; + int _success_count; + int _consecutive_errors; + int _allowed_errors; + int _cache_hits; + int _transfer_rate; + // kbytes, not bytes, because bytes might overflow 2^31 + int _total_kb_downloaded; + unsigned int _totalKbPending; + +}; /////////////////////////////////////////////////////////////////////////////// // SGTerraSync::WorkerThread ////////////////////////////////////////////////// @@ -204,10 +238,31 @@ public: void stop(); bool start(); - bool isIdle() {return !_busy; } + bool isIdle() + { + SGGuard g(_stateLock); + return !_state._busy; + } + + bool isRunning() + { + SGGuard g(_stateLock); + return _running; + } + + bool isStalled() + { + SGGuard g(_stateLock); + return _state._stalled; + } + void request(const SyncItem& dir) {waitingTiles.push_front(dir);} - bool isDirty() { bool r = _is_dirty;_is_dirty = false;return r;} - bool hasNewTiles() { return !_freshTiles.empty();} + + bool hasNewTiles() + { + return !_freshTiles.empty(); + } + SyncItem getNewTile() { return _freshTiles.pop_front();} void setHTTPServer(const std::string& server) @@ -217,28 +272,36 @@ public: void setLocalDir(string dir) { _local_dir = stripPath(dir);} string getLocalDir() { return _local_dir;} - void setAllowedErrorCount(int errors) {_allowed_errors = errors;} - void setCachePath(const SGPath& p) {_persistentCachePath = p;} - void setCacheHits(unsigned int hits) {_cache_hits = hits;} - - volatile bool _active; - volatile bool _running; - volatile bool _busy; - volatile bool _stalled; - volatile int _fail_count; - volatile int _updated_tile_count; - volatile int _success_count; - volatile int _consecutive_errors; - volatile int _allowed_errors; - volatile int _cache_hits; - volatile int _transfer_rate; - // kbytes, not bytes, because bytes might overflow 2^31 - volatile int _total_kb_downloaded; + void setAllowedErrorCount(int errors) + { + SGGuard g(_stateLock); + _state._allowed_errors = errors; + } - unsigned int _totalKbPending; + void setCachePath(const SGPath& p) {_persistentCachePath = p;} + void setCacheHits(unsigned int hits) + { + SGGuard g(_stateLock); + _state._cache_hits = hits; + } + TerrasyncThreadState threadsafeCopyState() + { + TerrasyncThreadState st; + { + SGGuard g(_stateLock); + st = _state; + } + return st; + } private: + void incrementCacheHits() + { + SGGuard g(_stateLock); + _state._cache_hits++; + } + virtual void run(); // internal mode run and helpers @@ -257,8 +320,7 @@ private: HTTP::Client _http; SyncSlot _syncSlots[NUM_SYNC_SLOTS]; - volatile bool _is_dirty; - volatile bool _stop; + bool _stop, _running; SGBlockingDeque waitingTiles; TileAgeCache _completedTiles; @@ -268,24 +330,14 @@ private: string _local_dir; SGPath _persistentCachePath; string _httpServer; + + TerrasyncThreadState _state; + SGMutex _stateLock; }; SGTerraSync::WorkerThread::WorkerThread() : - _active(false), - _running(false), - _busy(false), - _stalled(false), - _fail_count(0), - _updated_tile_count(0), - _success_count(0), - _consecutive_errors(0), - _allowed_errors(6), - _cache_hits(0), - _transfer_rate(0), - _total_kb_downloaded(0), - _totalKbPending(0), - _is_dirty(false), - _stop(false) + _stop(false), + _running(false) { _http.setUserAgent("terrascenery-" SG_STRINGIZE(SIMGEAR_VERSION)); } @@ -295,28 +347,31 @@ void SGTerraSync::WorkerThread::stop() // drop any pending requests waitingTiles.clear(); - if (!_running) + if (!isRunning()) return; // set stop flag and wake up the thread with an empty request - _stop = true; + { + SGGuard g(_stateLock); + _stop = true; + } + SyncItem w(string(), SyncItem::Stop); request(w); join(); - _running = false; } bool SGTerraSync::WorkerThread::start() { - if (_running) + if (isRunning()) return false; if (_local_dir=="") { SG_LOG(SG_TERRASYNC,SG_ALERT, "Cannot start scenery download. Local cache directory is undefined."); - _fail_count++; - _stalled = true; + _state._fail_count++; + _state._stalled = true; return false; } @@ -326,8 +381,8 @@ bool SGTerraSync::WorkerThread::start() SG_LOG(SG_TERRASYNC,SG_ALERT, "Cannot start scenery download. Directory '" << _local_dir << "' does not exist. Set correct directory path or create directory folder."); - _fail_count++; - _stalled = true; + _state._fail_count++; + _state._stalled = true; return false; } @@ -337,18 +392,13 @@ bool SGTerraSync::WorkerThread::start() SG_LOG(SG_TERRASYNC,SG_ALERT, "Cannot start scenery download. Directory '" << _local_dir << "' contains the base package. Use a separate directory."); - _fail_count++; - _stalled = true; + _state._fail_count++; + _state._stalled = true; return false; } - _fail_count = 0; - _updated_tile_count = 0; - _success_count = 0; - _consecutive_errors = 0; _stop = false; - _stalled = false; - _running = true; + _state = TerrasyncThreadState(); // clean state string status = "Using built-in HTTP support."; @@ -365,7 +415,11 @@ bool SGTerraSync::WorkerThread::start() void SGTerraSync::WorkerThread::run() { - _active = true; + { + SGGuard g(_stateLock); + _running = true; + } + initCompletedTilesPersistentCache(); if (_httpServer == "automatic" ) { @@ -429,9 +483,11 @@ void SGTerraSync::WorkerThread::run() runInternal(); - _active = false; - _running = false; - _is_dirty = true; + { + SGGuard g(_stateLock); + _running = false; + } + } void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot) @@ -444,8 +500,9 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot) SG_LOG(SG_TERRASYNC, SG_INFO, "HTTP request count:" << _http.hasActiveRequests()); slot.nextWarnTimeout += 10000; } - slot.pendingKBytes = slot.repository->bytesToDownload(); #endif + // convert bytes to kbytes here + slot.pendingKBytes = (slot.repository->bytesToDownload() >> 10); return; // easy, still working } @@ -518,9 +575,12 @@ void SGTerraSync::WorkerThread::runInternal() SG_LOG(SG_TERRASYNC, SG_WARN, "failure doing HTTP update" << e.getFormattedMessage()); } - _transfer_rate = _http.transferRateBytesPerSec(); - // convert from bytes to kbytes - _total_kb_downloaded = static_cast(_http.totalBytesDownloaded() / 1024); + { + SGGuard g(_stateLock); + _state._transfer_rate = _http.transferRateBytesPerSec(); + // convert from bytes to kbytes + _state._total_kb_downloaded = static_cast(_http.totalBytesDownloaded() / 1024); + } if (_stop) break; @@ -530,11 +590,10 @@ void SGTerraSync::WorkerThread::runInternal() SyncItem next = waitingTiles.pop_front(); SyncItem::Status cacheStatus = isPathCached(next); if (cacheStatus != SyncItem::Invalid) { - _cache_hits++; + incrementCacheHits(); SG_LOG(SG_TERRASYNC, SG_DEBUG, "\nTerraSync Cache hit for: '" << next._dir << "'"); next._status = cacheStatus; _freshTiles.push_back(next); - _is_dirty = true; continue; } @@ -552,8 +611,12 @@ void SGTerraSync::WorkerThread::runInternal() anySlotBusy |= _syncSlots[slot].busy; } - _totalKbPending = newPendingCount; // approximately atomic update - _busy = anySlotBusy; + { + SGGuard g(_stateLock); + _state._totalKbPending = newPendingCount; // approximately atomic update + _state._busy = anySlotBusy; + } + if (!anySlotBusy) { // wait on the blocking deque here, otherwise we spin // the loop very fast, since _http::update with no connections @@ -587,15 +650,15 @@ SyncItem::Status SGTerraSync::WorkerThread::isPathCached(const SyncItem& next) c void SGTerraSync::WorkerThread::fail(SyncItem failedItem) { + SGGuard g(_stateLock); time_t now = time(0); - _consecutive_errors++; - _fail_count++; + _state._consecutive_errors++; + _state._fail_count++; failedItem._status = SyncItem::Failed; _freshTiles.push_back(failedItem); SG_LOG(SG_TERRASYNC,SG_INFO, "Failed to sync'" << failedItem._dir << "'"); _completedTiles[ failedItem._dir ] = now + UpdateInterval::FailedAttempt; - _is_dirty = true; } void SGTerraSync::WorkerThread::notFound(SyncItem item) @@ -608,27 +671,29 @@ void SGTerraSync::WorkerThread::notFound(SyncItem item) time_t now = time(0); item._status = SyncItem::NotFound; _freshTiles.push_back(item); - _is_dirty = true; _notFoundItems[ item._dir ] = now + UpdateInterval::SuccessfulAttempt; writeCompletedTilesPersistentCache(); } void SGTerraSync::WorkerThread::updated(SyncItem item, bool isNewDirectory) { - time_t now = time(0); - _consecutive_errors = 0; - _success_count++; - SG_LOG(SG_TERRASYNC,SG_INFO, - "Successfully synchronized directory '" << item._dir << "'"); + { + SGGuard g(_stateLock); + time_t now = time(0); + _state._consecutive_errors = 0; + _state._success_count++; + SG_LOG(SG_TERRASYNC,SG_INFO, + "Successfully synchronized directory '" << item._dir << "'"); + + item._status = SyncItem::Updated; + if (item._type == SyncItem::Tile) { + _state._updated_tile_count++; + } - item._status = SyncItem::Updated; - if (item._type == SyncItem::Tile) { - _updated_tile_count++; + _freshTiles.push_back(item); + _completedTiles[ item._dir ] = now + UpdateInterval::SuccessfulAttempt; } - _freshTiles.push_back(item); - _completedTiles[ item._dir ] = now + UpdateInterval::SuccessfulAttempt; - _is_dirty = true; writeCompletedTilesPersistentCache(); } @@ -747,8 +812,7 @@ void SGTerraSync::shutdown() void SGTerraSync::reinit() { // do not reinit when enabled and we're already up and running - if ((_terraRoot->getBoolValue("enabled",false))&& - (_workerThread->_active && _workerThread->_running)) + if ((_terraRoot->getBoolValue("enabled",false)) && _workerThread->isRunning()) { return; } @@ -769,7 +833,7 @@ void SGTerraSync::reinit() } } - _stalledNode->setBoolValue(_workerThread->_stalled); + _stalledNode->setBoolValue(_workerThread->isStalled()); } void SGTerraSync::bind() @@ -779,20 +843,6 @@ void SGTerraSync::bind() } _bound = true; - _tiedProperties.Tie( _terraRoot->getNode("busy", true), (bool*) &_workerThread->_busy ); - _tiedProperties.Tie( _terraRoot->getNode("active", true), (bool*) &_workerThread->_active ); - _tiedProperties.Tie( _terraRoot->getNode("update-count", true), (int*) &_workerThread->_success_count ); - _tiedProperties.Tie( _terraRoot->getNode("error-count", true), (int*) &_workerThread->_fail_count ); - _tiedProperties.Tie( _terraRoot->getNode("tile-count", true), (int*) &_workerThread->_updated_tile_count ); - _tiedProperties.Tie( _terraRoot->getNode("cache-hits", true), (int*) &_workerThread->_cache_hits ); - _tiedProperties.Tie( _terraRoot->getNode("transfer-rate-bytes-sec", true), (int*) &_workerThread->_transfer_rate ); - - // use kbytes here because propety doesn't support 64-bit and we might conceivably - // download more than 2G in a single session - _tiedProperties.Tie( _terraRoot->getNode("downloaded-kbytes", true), (int*) &_workerThread->_total_kb_downloaded ); - - _tiedProperties.Tie( _terraRoot->getNode("pending-kbytes", true), (int*) &_workerThread->_totalKbPending ); - _terraRoot->getNode("busy", true)->setAttribute(SGPropertyNode::WRITE,false); _terraRoot->getNode("active", true)->setAttribute(SGPropertyNode::WRITE,false); @@ -801,9 +851,10 @@ void SGTerraSync::bind() _terraRoot->getNode("tile-count", true)->setAttribute(SGPropertyNode::WRITE,false); _terraRoot->getNode("use-built-in-svn", true)->setAttribute(SGPropertyNode::USERARCHIVE,false); _terraRoot->getNode("use-svn", true)->setAttribute(SGPropertyNode::USERARCHIVE,false); + // stalled is used as a signal handler (to connect listeners triggering GUI pop-ups) _stalledNode = _terraRoot->getNode("stalled", true); - _stalledNode->setBoolValue(_workerThread->_stalled); + _stalledNode->setBoolValue(_workerThread->isStalled()); _stalledNode->setAttribute(SGPropertyNode::PRESERVE,true); } @@ -821,34 +872,28 @@ void SGTerraSync::unbind() void SGTerraSync::update(double) { - static SGBucket bucket; - if (_workerThread->isDirty()) - { - if (!_workerThread->_active) - { - if (_workerThread->_stalled) - { - SG_LOG(SG_TERRASYNC,SG_ALERT, - "Automatic scenery download/synchronization stalled. Too many errors."); - } - else - { - // not really an alert - just always show this message - SG_LOG(SG_TERRASYNC,SG_ALERT, - "Automatic scenery download/synchronization has stopped."); - } - _stalledNode->setBoolValue(_workerThread->_stalled); - } + TerrasyncThreadState copiedState(_workerThread->threadsafeCopyState()); - while (_workerThread->hasNewTiles()) - { - SyncItem next = _workerThread->getNewTile(); + _terraRoot->setBoolValue("busy", copiedState._busy); + _terraRoot->setIntValue("update-count", copiedState._success_count); + _terraRoot->setIntValue("error-count", copiedState._fail_count); + _terraRoot->setIntValue("tile-count", copiedState._updated_tile_count); + _terraRoot->setIntValue("cache-hits", copiedState._cache_hits); + _terraRoot->setIntValue("transfer-rate-bytes-sec", copiedState._transfer_rate); + _terraRoot->setIntValue("downloaded-kbytes", copiedState._total_kb_downloaded); + _terraRoot->setIntValue("pending-kbytes", copiedState._totalKbPending); - if ((next._type == SyncItem::Tile) || (next._type == SyncItem::AIData)) { - _activeTileDirs.erase(next._dir); - } - } // of freshly synced items - } + _stalledNode->setBoolValue(_workerThread->isStalled()); + _terraRoot->setBoolValue("active", _workerThread->isRunning()); + + while (_workerThread->hasNewTiles()) + { + SyncItem next = _workerThread->getNewTile(); + + if ((next._type == SyncItem::Tile) || (next._type == SyncItem::AIData)) { + _activeTileDirs.erase(next._dir); + } + } // of freshly synced items } bool SGTerraSync::isIdle() {return _workerThread->isIdle();} @@ -897,7 +942,7 @@ bool SGTerraSync::scheduleTile(const SGBucket& bucket) bool SGTerraSync::isTileDirPending(const std::string& sceneryDir) const { - if (!_workerThread->_running) { + if (!_workerThread->isRunning()) { return false; } @@ -926,7 +971,7 @@ void SGTerraSync::scheduleDataDir(const std::string& dataDir) bool SGTerraSync::isDataDirPending(const std::string& dataDir) const { - if (!_workerThread->_running) { + if (!_workerThread->isRunning()) { return false; } -- 2.39.5