]> git.mxchange.org Git - simgear.git/commitdiff
Threadsafe terrasync state updates/reading.
authorJames Turner <zakalawe@mac.com>
Wed, 8 Jun 2016 14:27:47 +0000 (15:27 +0100)
committerRoland Haeder <roland@mxchange.org>
Sat, 13 Aug 2016 08:21:16 +0000 (10:21 +0200)
simgear/scene/tsync/terrasync.cxx

index da49af48b1092cd30e14d442fc1c53e9a4979859..bc8b14ac23f51a34a78dc40b5976c2687f08daea 100644 (file)
@@ -55,6 +55,9 @@
 #include <simgear/misc/sg_path.hxx>
 #include <simgear/misc/strutils.hxx>
 #include <simgear/threads/SGQueue.hxx>
+#include <simgear/threads/SGThread.hxx>
+#include <simgear/threads/SGGuard.hxx>
+
 #include <simgear/misc/sg_dir.hxx>
 #include <simgear/debug/BufferedLogCallback.hxx>
 #include <simgear/props/props_io.hxx>
@@ -154,7 +157,8 @@ class SyncSlot
 public:
     SyncSlot() :
         isNewDirectory(false),
-        busy(false)
+        busy(false),
+        pendingKBytes(0)
     {}
 
     SyncItem currentItem;
@@ -191,6 +195,36 @@ static unsigned int syncSlotForType(SyncItem::Type ty)
     }
 }
 
+struct TerrasyncThreadState
+{
+    TerrasyncThreadState() :
+        _busy(false),
+        _stalled(false),
+        _fail_count(0),
+        _updated_tile_count(0),
+        _success_count(0),
+        _consecutive_errors(0),
+        _allowed_errors(6),
+        _cache_hits(0),
+        _transfer_rate(0),
+        _total_kb_downloaded(0),
+        _totalKbPending(0)
+    {}
+
+    bool _busy;
+    bool _stalled;
+    int  _fail_count;
+    int  _updated_tile_count;
+    int  _success_count;
+    int  _consecutive_errors;
+    int  _allowed_errors;
+    int  _cache_hits;
+    int _transfer_rate;
+    // kbytes, not bytes, because bytes might overflow 2^31
+    int _total_kb_downloaded;
+    unsigned int _totalKbPending;
+
+};
 
 ///////////////////////////////////////////////////////////////////////////////
 // SGTerraSync::WorkerThread //////////////////////////////////////////////////
@@ -204,10 +238,31 @@ public:
    void stop();
    bool start();
 
-    bool isIdle() {return !_busy; }
+    bool isIdle()
+    {
+        SGGuard<SGMutex> g(_stateLock);
+        return !_state._busy;
+    }
+
+    bool isRunning()
+    {
+         SGGuard<SGMutex> g(_stateLock);
+        return _running;
+    }
+
+    bool isStalled()
+    {
+        SGGuard<SGMutex> g(_stateLock);
+        return _state._stalled;
+    }
+
    void request(const SyncItem& dir) {waitingTiles.push_front(dir);}
-   bool isDirty() { bool r = _is_dirty;_is_dirty = false;return r;}
-   bool hasNewTiles() { return !_freshTiles.empty();}
+
+    bool hasNewTiles()
+    {
+        return !_freshTiles.empty();
+    }
+
    SyncItem getNewTile() { return _freshTiles.pop_front();}
 
    void   setHTTPServer(const std::string& server)
@@ -217,28 +272,36 @@ public:
 
    void   setLocalDir(string dir)           { _local_dir    = stripPath(dir);}
    string getLocalDir()                     { return _local_dir;}
-   void   setAllowedErrorCount(int errors)  {_allowed_errors = errors;}
 
-   void   setCachePath(const SGPath& p)     {_persistentCachePath = p;}
-   void   setCacheHits(unsigned int hits)   {_cache_hits = hits;}
-
-   volatile bool _active;
-   volatile bool _running;
-   volatile bool _busy;
-   volatile bool _stalled;
-   volatile int  _fail_count;
-   volatile int  _updated_tile_count;
-   volatile int  _success_count;
-   volatile int  _consecutive_errors;
-   volatile int  _allowed_errors;
-   volatile int  _cache_hits;
-   volatile int _transfer_rate;
-   // kbytes, not bytes, because bytes might overflow 2^31
-   volatile int _total_kb_downloaded;
+   void   setAllowedErrorCount(int errors)
+    {
+        SGGuard<SGMutex> g(_stateLock);
+        _state._allowed_errors = errors;
+    }
 
-    unsigned int _totalKbPending;
+   void   setCachePath(const SGPath& p)     {_persistentCachePath = p;}
+   void   setCacheHits(unsigned int hits)
+    {
+        SGGuard<SGMutex> g(_stateLock);
+        _state._cache_hits = hits;
+    }
 
+    TerrasyncThreadState threadsafeCopyState()
+    {
+        TerrasyncThreadState st;
+        {
+            SGGuard<SGMutex> g(_stateLock);
+            st = _state;
+        }
+        return st;
+    }
 private:
+    void incrementCacheHits()
+    {
+        SGGuard<SGMutex> g(_stateLock);
+        _state._cache_hits++;
+    }
+
    virtual void run();
 
     // internal mode run and helpers
@@ -257,8 +320,7 @@ private:
     HTTP::Client _http;
     SyncSlot _syncSlots[NUM_SYNC_SLOTS];
 
-   volatile bool _is_dirty;
-   volatile bool _stop;
+    bool _stop, _running;
    SGBlockingDeque <SyncItem> waitingTiles;
 
    TileAgeCache _completedTiles;
@@ -268,24 +330,14 @@ private:
    string _local_dir;
    SGPath _persistentCachePath;
    string _httpServer;
+
+    TerrasyncThreadState _state;
+    SGMutex _stateLock;
 };
 
 SGTerraSync::WorkerThread::WorkerThread() :
-    _active(false),
-    _running(false),
-    _busy(false),
-    _stalled(false),
-    _fail_count(0),
-    _updated_tile_count(0),
-    _success_count(0),
-    _consecutive_errors(0),
-    _allowed_errors(6),
-    _cache_hits(0),
-    _transfer_rate(0),
-    _total_kb_downloaded(0),
-    _totalKbPending(0),
-    _is_dirty(false),
-    _stop(false)
+    _stop(false),
+    _running(false)
 {
     _http.setUserAgent("terrascenery-" SG_STRINGIZE(SIMGEAR_VERSION));
 }
@@ -295,28 +347,31 @@ void SGTerraSync::WorkerThread::stop()
     // drop any pending requests
     waitingTiles.clear();
 
-    if (!_running)
+    if (!isRunning())
         return;
 
     // set stop flag and wake up the thread with an empty request
-    _stop = true;
+    {
+        SGGuard<SGMutex> g(_stateLock);
+        _stop = true;
+    }
+
     SyncItem w(string(), SyncItem::Stop);
     request(w);
     join();
-    _running = false;
 }
 
 bool SGTerraSync::WorkerThread::start()
 {
-    if (_running)
+    if (isRunning())
         return false;
 
     if (_local_dir=="")
     {
         SG_LOG(SG_TERRASYNC,SG_ALERT,
                "Cannot start scenery download. Local cache directory is undefined.");
-        _fail_count++;
-        _stalled = true;
+        _state._fail_count++;
+        _state._stalled = true;
         return false;
     }
 
@@ -326,8 +381,8 @@ bool SGTerraSync::WorkerThread::start()
         SG_LOG(SG_TERRASYNC,SG_ALERT,
                "Cannot start scenery download. Directory '" << _local_dir <<
                "' does not exist. Set correct directory path or create directory folder.");
-        _fail_count++;
-        _stalled = true;
+        _state._fail_count++;
+        _state._stalled = true;
         return false;
     }
 
@@ -337,18 +392,13 @@ bool SGTerraSync::WorkerThread::start()
         SG_LOG(SG_TERRASYNC,SG_ALERT,
                "Cannot start scenery download. Directory '" << _local_dir <<
                "' contains the base package. Use a separate directory.");
-        _fail_count++;
-        _stalled = true;
+        _state._fail_count++;
+        _state._stalled = true;
         return false;
     }
 
-    _fail_count = 0;
-    _updated_tile_count = 0;
-    _success_count = 0;
-    _consecutive_errors = 0;
     _stop = false;
-    _stalled = false;
-    _running = true;
+    _state = TerrasyncThreadState(); // clean state
 
     string status = "Using built-in HTTP support.";
 
@@ -365,7 +415,11 @@ bool SGTerraSync::WorkerThread::start()
 
 void SGTerraSync::WorkerThread::run()
 {
-    _active = true;
+    {
+         SGGuard<SGMutex> g(_stateLock);
+        _running = true;
+    }
+
     initCompletedTilesPersistentCache();
 
     if (_httpServer == "automatic" ) {
@@ -429,9 +483,11 @@ void SGTerraSync::WorkerThread::run()
 
     runInternal();
 
-    _active = false;
-    _running = false;
-    _is_dirty = true;
+    {
+        SGGuard<SGMutex> g(_stateLock);
+        _running = false;
+    }
+
 }
 
 void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
@@ -444,8 +500,9 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
                 SG_LOG(SG_TERRASYNC, SG_INFO, "HTTP request count:" << _http.hasActiveRequests());
                 slot.nextWarnTimeout += 10000;
             }
-            slot.pendingKBytes = slot.repository->bytesToDownload();
 #endif
+            // convert bytes to kbytes here
+            slot.pendingKBytes = (slot.repository->bytesToDownload() >> 10);
             return; // easy, still working
         }
 
@@ -518,9 +575,12 @@ void SGTerraSync::WorkerThread::runInternal()
             SG_LOG(SG_TERRASYNC, SG_WARN, "failure doing HTTP update" << e.getFormattedMessage());
         }
 
-        _transfer_rate = _http.transferRateBytesPerSec();
-        // convert from bytes to kbytes
-        _total_kb_downloaded = static_cast<int>(_http.totalBytesDownloaded() / 1024);
+        {
+            SGGuard<SGMutex> g(_stateLock);
+            _state._transfer_rate = _http.transferRateBytesPerSec();
+            // convert from bytes to kbytes
+            _state._total_kb_downloaded = static_cast<int>(_http.totalBytesDownloaded() / 1024);
+        }
 
         if (_stop)
             break;
@@ -530,11 +590,10 @@ void SGTerraSync::WorkerThread::runInternal()
             SyncItem next = waitingTiles.pop_front();
             SyncItem::Status cacheStatus = isPathCached(next);
             if (cacheStatus != SyncItem::Invalid) {
-                _cache_hits++;
+                incrementCacheHits();
                 SG_LOG(SG_TERRASYNC, SG_DEBUG, "\nTerraSync Cache hit for: '" << next._dir << "'");
                 next._status = cacheStatus;
                 _freshTiles.push_back(next);
-                _is_dirty = true;
                 continue;
             }
 
@@ -552,8 +611,12 @@ void SGTerraSync::WorkerThread::runInternal()
             anySlotBusy |= _syncSlots[slot].busy;
         }
 
-        _totalKbPending = newPendingCount; // approximately atomic update
-        _busy = anySlotBusy;
+        {
+            SGGuard<SGMutex> g(_stateLock);
+            _state._totalKbPending = newPendingCount; // approximately atomic update
+            _state._busy = anySlotBusy;
+        }
+
         if (!anySlotBusy) {
             // wait on the blocking deque here, otherwise we spin
             // the loop very fast, since _http::update with no connections
@@ -587,15 +650,15 @@ SyncItem::Status SGTerraSync::WorkerThread::isPathCached(const SyncItem& next) c
 
 void SGTerraSync::WorkerThread::fail(SyncItem failedItem)
 {
+    SGGuard<SGMutex> g(_stateLock);
     time_t now = time(0);
-    _consecutive_errors++;
-    _fail_count++;
+    _state._consecutive_errors++;
+    _state._fail_count++;
     failedItem._status = SyncItem::Failed;
     _freshTiles.push_back(failedItem);
     SG_LOG(SG_TERRASYNC,SG_INFO,
            "Failed to sync'" << failedItem._dir << "'");
     _completedTiles[ failedItem._dir ] = now + UpdateInterval::FailedAttempt;
-    _is_dirty = true;
 }
 
 void SGTerraSync::WorkerThread::notFound(SyncItem item)
@@ -608,27 +671,29 @@ void SGTerraSync::WorkerThread::notFound(SyncItem item)
     time_t now = time(0);
     item._status = SyncItem::NotFound;
     _freshTiles.push_back(item);
-    _is_dirty = true;
     _notFoundItems[ item._dir ] = now + UpdateInterval::SuccessfulAttempt;
     writeCompletedTilesPersistentCache();
 }
 
 void SGTerraSync::WorkerThread::updated(SyncItem item, bool isNewDirectory)
 {
-    time_t now = time(0);
-    _consecutive_errors = 0;
-    _success_count++;
-    SG_LOG(SG_TERRASYNC,SG_INFO,
-           "Successfully synchronized directory '" << item._dir << "'");
+    {
+        SGGuard<SGMutex> g(_stateLock);
+        time_t now = time(0);
+        _state._consecutive_errors = 0;
+        _state._success_count++;
+        SG_LOG(SG_TERRASYNC,SG_INFO,
+               "Successfully synchronized directory '" << item._dir << "'");
+
+        item._status = SyncItem::Updated;
+        if (item._type == SyncItem::Tile) {
+            _state._updated_tile_count++;
+        }
 
-    item._status = SyncItem::Updated;
-    if (item._type == SyncItem::Tile) {
-        _updated_tile_count++;
+        _freshTiles.push_back(item);
+        _completedTiles[ item._dir ] = now + UpdateInterval::SuccessfulAttempt;
     }
 
-    _freshTiles.push_back(item);
-    _completedTiles[ item._dir ] = now + UpdateInterval::SuccessfulAttempt;
-    _is_dirty = true;
     writeCompletedTilesPersistentCache();
 }
 
@@ -747,8 +812,7 @@ void SGTerraSync::shutdown()
 void SGTerraSync::reinit()
 {
     // do not reinit when enabled and we're already up and running
-    if ((_terraRoot->getBoolValue("enabled",false))&&
-         (_workerThread->_active && _workerThread->_running))
+    if ((_terraRoot->getBoolValue("enabled",false)) && _workerThread->isRunning())
     {
         return;
     }
@@ -769,7 +833,7 @@ void SGTerraSync::reinit()
         }
     }
 
-    _stalledNode->setBoolValue(_workerThread->_stalled);
+    _stalledNode->setBoolValue(_workerThread->isStalled());
 }
 
 void SGTerraSync::bind()
@@ -779,20 +843,6 @@ void SGTerraSync::bind()
     }
 
     _bound = true;
-    _tiedProperties.Tie( _terraRoot->getNode("busy", true), (bool*) &_workerThread->_busy );
-    _tiedProperties.Tie( _terraRoot->getNode("active", true), (bool*) &_workerThread->_active );
-    _tiedProperties.Tie( _terraRoot->getNode("update-count", true), (int*) &_workerThread->_success_count );
-    _tiedProperties.Tie( _terraRoot->getNode("error-count", true), (int*) &_workerThread->_fail_count );
-    _tiedProperties.Tie( _terraRoot->getNode("tile-count", true), (int*) &_workerThread->_updated_tile_count );
-    _tiedProperties.Tie( _terraRoot->getNode("cache-hits", true), (int*) &_workerThread->_cache_hits );
-    _tiedProperties.Tie( _terraRoot->getNode("transfer-rate-bytes-sec", true), (int*) &_workerThread->_transfer_rate );
-
-    // use kbytes here because propety doesn't support 64-bit and we might conceivably
-    // download more than 2G in a single session
-    _tiedProperties.Tie( _terraRoot->getNode("downloaded-kbytes", true), (int*) &_workerThread->_total_kb_downloaded );
-
-    _tiedProperties.Tie( _terraRoot->getNode("pending-kbytes", true), (int*) &_workerThread->_totalKbPending );
-
 
     _terraRoot->getNode("busy", true)->setAttribute(SGPropertyNode::WRITE,false);
     _terraRoot->getNode("active", true)->setAttribute(SGPropertyNode::WRITE,false);
@@ -801,9 +851,10 @@ void SGTerraSync::bind()
     _terraRoot->getNode("tile-count", true)->setAttribute(SGPropertyNode::WRITE,false);
     _terraRoot->getNode("use-built-in-svn", true)->setAttribute(SGPropertyNode::USERARCHIVE,false);
     _terraRoot->getNode("use-svn", true)->setAttribute(SGPropertyNode::USERARCHIVE,false);
+
     // stalled is used as a signal handler (to connect listeners triggering GUI pop-ups)
     _stalledNode = _terraRoot->getNode("stalled", true);
-    _stalledNode->setBoolValue(_workerThread->_stalled);
+    _stalledNode->setBoolValue(_workerThread->isStalled());
     _stalledNode->setAttribute(SGPropertyNode::PRESERVE,true);
 }
 
@@ -821,34 +872,28 @@ void SGTerraSync::unbind()
 
 void SGTerraSync::update(double)
 {
-    static SGBucket bucket;
-    if (_workerThread->isDirty())
-    {
-        if (!_workerThread->_active)
-        {
-            if (_workerThread->_stalled)
-            {
-                SG_LOG(SG_TERRASYNC,SG_ALERT,
-                       "Automatic scenery download/synchronization stalled. Too many errors.");
-            }
-            else
-            {
-                // not really an alert - just always show this message
-                SG_LOG(SG_TERRASYNC,SG_ALERT,
-                        "Automatic scenery download/synchronization has stopped.");
-            }
-            _stalledNode->setBoolValue(_workerThread->_stalled);
-        }
+    TerrasyncThreadState copiedState(_workerThread->threadsafeCopyState());
 
-        while (_workerThread->hasNewTiles())
-        {
-            SyncItem next = _workerThread->getNewTile();
+    _terraRoot->setBoolValue("busy", copiedState._busy);
+    _terraRoot->setIntValue("update-count", copiedState._success_count);
+    _terraRoot->setIntValue("error-count", copiedState._fail_count);
+    _terraRoot->setIntValue("tile-count", copiedState._updated_tile_count);
+    _terraRoot->setIntValue("cache-hits", copiedState._cache_hits);
+    _terraRoot->setIntValue("transfer-rate-bytes-sec", copiedState._transfer_rate);
+    _terraRoot->setIntValue("downloaded-kbytes", copiedState._total_kb_downloaded);
+    _terraRoot->setIntValue("pending-kbytes", copiedState._totalKbPending);
 
-            if ((next._type == SyncItem::Tile) || (next._type == SyncItem::AIData)) {
-                _activeTileDirs.erase(next._dir);
-            }
-        } // of freshly synced items
-    }
+    _stalledNode->setBoolValue(_workerThread->isStalled());
+    _terraRoot->setBoolValue("active", _workerThread->isRunning());
+
+    while (_workerThread->hasNewTiles())
+    {
+        SyncItem next = _workerThread->getNewTile();
+
+        if ((next._type == SyncItem::Tile) || (next._type == SyncItem::AIData)) {
+            _activeTileDirs.erase(next._dir);
+        }
+    } // of freshly synced items
 }
 
 bool SGTerraSync::isIdle() {return _workerThread->isIdle();}
@@ -897,7 +942,7 @@ bool SGTerraSync::scheduleTile(const SGBucket& bucket)
 
 bool SGTerraSync::isTileDirPending(const std::string& sceneryDir) const
 {
-    if (!_workerThread->_running) {
+    if (!_workerThread->isRunning()) {
         return false;
     }
 
@@ -926,7 +971,7 @@ void SGTerraSync::scheduleDataDir(const std::string& dataDir)
 
 bool SGTerraSync::isDataDirPending(const std::string& dataDir) const
 {
-    if (!_workerThread->_running) {
+    if (!_workerThread->isRunning()) {
         return false;
     }