#include <simgear/misc/sg_path.hxx>
#include <simgear/misc/strutils.hxx>
#include <simgear/threads/SGQueue.hxx>
+#include <simgear/threads/SGThread.hxx>
+#include <simgear/threads/SGGuard.hxx>
+
#include <simgear/misc/sg_dir.hxx>
#include <simgear/debug/BufferedLogCallback.hxx>
#include <simgear/props/props_io.hxx>
public:
SyncSlot() :
isNewDirectory(false),
- busy(false)
+ busy(false),
+ pendingKBytes(0)
{}
SyncItem currentItem;
}
}
+struct TerrasyncThreadState
+{
+ TerrasyncThreadState() :
+ _busy(false),
+ _stalled(false),
+ _fail_count(0),
+ _updated_tile_count(0),
+ _success_count(0),
+ _consecutive_errors(0),
+ _allowed_errors(6),
+ _cache_hits(0),
+ _transfer_rate(0),
+ _total_kb_downloaded(0),
+ _totalKbPending(0)
+ {}
+
+ bool _busy;
+ bool _stalled;
+ int _fail_count;
+ int _updated_tile_count;
+ int _success_count;
+ int _consecutive_errors;
+ int _allowed_errors;
+ int _cache_hits;
+ int _transfer_rate;
+ // kbytes, not bytes, because bytes might overflow 2^31
+ int _total_kb_downloaded;
+ unsigned int _totalKbPending;
+
+};
///////////////////////////////////////////////////////////////////////////////
// SGTerraSync::WorkerThread //////////////////////////////////////////////////
void stop();
bool start();
- bool isIdle() {return !_busy; }
+ bool isIdle()
+ {
+ SGGuard<SGMutex> g(_stateLock);
+ return !_state._busy;
+ }
+
+ bool isRunning()
+ {
+ SGGuard<SGMutex> g(_stateLock);
+ return _running;
+ }
+
+ bool isStalled()
+ {
+ SGGuard<SGMutex> g(_stateLock);
+ return _state._stalled;
+ }
+
void request(const SyncItem& dir) {waitingTiles.push_front(dir);}
- bool isDirty() { bool r = _is_dirty;_is_dirty = false;return r;}
- bool hasNewTiles() { return !_freshTiles.empty();}
+
+ bool hasNewTiles()
+ {
+ return !_freshTiles.empty();
+ }
+
SyncItem getNewTile() { return _freshTiles.pop_front();}
void setHTTPServer(const std::string& server)
void setLocalDir(string dir) { _local_dir = stripPath(dir);}
string getLocalDir() { return _local_dir;}
- void setAllowedErrorCount(int errors) {_allowed_errors = errors;}
- void setCachePath(const SGPath& p) {_persistentCachePath = p;}
- void setCacheHits(unsigned int hits) {_cache_hits = hits;}
-
- volatile bool _active;
- volatile bool _running;
- volatile bool _busy;
- volatile bool _stalled;
- volatile int _fail_count;
- volatile int _updated_tile_count;
- volatile int _success_count;
- volatile int _consecutive_errors;
- volatile int _allowed_errors;
- volatile int _cache_hits;
- volatile int _transfer_rate;
- // kbytes, not bytes, because bytes might overflow 2^31
- volatile int _total_kb_downloaded;
+ void setAllowedErrorCount(int errors)
+ {
+ SGGuard<SGMutex> g(_stateLock);
+ _state._allowed_errors = errors;
+ }
- unsigned int _totalKbPending;
+ void setCachePath(const SGPath& p) {_persistentCachePath = p;}
+ void setCacheHits(unsigned int hits)
+ {
+ SGGuard<SGMutex> g(_stateLock);
+ _state._cache_hits = hits;
+ }
+ TerrasyncThreadState threadsafeCopyState()
+ {
+ TerrasyncThreadState st;
+ {
+ SGGuard<SGMutex> g(_stateLock);
+ st = _state;
+ }
+ return st;
+ }
private:
+ void incrementCacheHits()
+ {
+ SGGuard<SGMutex> g(_stateLock);
+ _state._cache_hits++;
+ }
+
virtual void run();
// internal mode run and helpers
HTTP::Client _http;
SyncSlot _syncSlots[NUM_SYNC_SLOTS];
- volatile bool _is_dirty;
- volatile bool _stop;
+ bool _stop, _running;
SGBlockingDeque <SyncItem> waitingTiles;
TileAgeCache _completedTiles;
string _local_dir;
SGPath _persistentCachePath;
string _httpServer;
+
+ TerrasyncThreadState _state;
+ SGMutex _stateLock;
};
SGTerraSync::WorkerThread::WorkerThread() :
- _active(false),
- _running(false),
- _busy(false),
- _stalled(false),
- _fail_count(0),
- _updated_tile_count(0),
- _success_count(0),
- _consecutive_errors(0),
- _allowed_errors(6),
- _cache_hits(0),
- _transfer_rate(0),
- _total_kb_downloaded(0),
- _totalKbPending(0),
- _is_dirty(false),
- _stop(false)
+ _stop(false),
+ _running(false)
{
_http.setUserAgent("terrascenery-" SG_STRINGIZE(SIMGEAR_VERSION));
}
// drop any pending requests
waitingTiles.clear();
- if (!_running)
+ if (!isRunning())
return;
// set stop flag and wake up the thread with an empty request
- _stop = true;
+ {
+ SGGuard<SGMutex> g(_stateLock);
+ _stop = true;
+ }
+
SyncItem w(string(), SyncItem::Stop);
request(w);
join();
- _running = false;
}
bool SGTerraSync::WorkerThread::start()
{
- if (_running)
+ if (isRunning())
return false;
if (_local_dir=="")
{
SG_LOG(SG_TERRASYNC,SG_ALERT,
"Cannot start scenery download. Local cache directory is undefined.");
- _fail_count++;
- _stalled = true;
+ _state._fail_count++;
+ _state._stalled = true;
return false;
}
SG_LOG(SG_TERRASYNC,SG_ALERT,
"Cannot start scenery download. Directory '" << _local_dir <<
"' does not exist. Set correct directory path or create directory folder.");
- _fail_count++;
- _stalled = true;
+ _state._fail_count++;
+ _state._stalled = true;
return false;
}
SG_LOG(SG_TERRASYNC,SG_ALERT,
"Cannot start scenery download. Directory '" << _local_dir <<
"' contains the base package. Use a separate directory.");
- _fail_count++;
- _stalled = true;
+ _state._fail_count++;
+ _state._stalled = true;
return false;
}
- _fail_count = 0;
- _updated_tile_count = 0;
- _success_count = 0;
- _consecutive_errors = 0;
_stop = false;
- _stalled = false;
- _running = true;
+ _state = TerrasyncThreadState(); // clean state
string status = "Using built-in HTTP support.";
void SGTerraSync::WorkerThread::run()
{
- _active = true;
+ {
+ SGGuard<SGMutex> g(_stateLock);
+ _running = true;
+ }
+
initCompletedTilesPersistentCache();
if (_httpServer == "automatic" ) {
runInternal();
- _active = false;
- _running = false;
- _is_dirty = true;
+ {
+ SGGuard<SGMutex> g(_stateLock);
+ _running = false;
+ }
+
}
void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
SG_LOG(SG_TERRASYNC, SG_INFO, "HTTP request count:" << _http.hasActiveRequests());
slot.nextWarnTimeout += 10000;
}
- slot.pendingKBytes = slot.repository->bytesToDownload();
#endif
+ // convert bytes to kbytes here
+ slot.pendingKBytes = (slot.repository->bytesToDownload() >> 10);
return; // easy, still working
}
SG_LOG(SG_TERRASYNC, SG_WARN, "failure doing HTTP update" << e.getFormattedMessage());
}
- _transfer_rate = _http.transferRateBytesPerSec();
- // convert from bytes to kbytes
- _total_kb_downloaded = static_cast<int>(_http.totalBytesDownloaded() / 1024);
+ {
+ SGGuard<SGMutex> g(_stateLock);
+ _state._transfer_rate = _http.transferRateBytesPerSec();
+ // convert from bytes to kbytes
+ _state._total_kb_downloaded = static_cast<int>(_http.totalBytesDownloaded() / 1024);
+ }
if (_stop)
break;
SyncItem next = waitingTiles.pop_front();
SyncItem::Status cacheStatus = isPathCached(next);
if (cacheStatus != SyncItem::Invalid) {
- _cache_hits++;
+ incrementCacheHits();
SG_LOG(SG_TERRASYNC, SG_DEBUG, "\nTerraSync Cache hit for: '" << next._dir << "'");
next._status = cacheStatus;
_freshTiles.push_back(next);
- _is_dirty = true;
continue;
}
anySlotBusy |= _syncSlots[slot].busy;
}
- _totalKbPending = newPendingCount; // approximately atomic update
- _busy = anySlotBusy;
+ {
+ SGGuard<SGMutex> g(_stateLock);
+ _state._totalKbPending = newPendingCount; // approximately atomic update
+ _state._busy = anySlotBusy;
+ }
+
if (!anySlotBusy) {
// wait on the blocking deque here, otherwise we spin
// the loop very fast, since _http::update with no connections
void SGTerraSync::WorkerThread::fail(SyncItem failedItem)
{
+ SGGuard<SGMutex> g(_stateLock);
time_t now = time(0);
- _consecutive_errors++;
- _fail_count++;
+ _state._consecutive_errors++;
+ _state._fail_count++;
failedItem._status = SyncItem::Failed;
_freshTiles.push_back(failedItem);
SG_LOG(SG_TERRASYNC,SG_INFO,
"Failed to sync'" << failedItem._dir << "'");
_completedTiles[ failedItem._dir ] = now + UpdateInterval::FailedAttempt;
- _is_dirty = true;
}
void SGTerraSync::WorkerThread::notFound(SyncItem item)
time_t now = time(0);
item._status = SyncItem::NotFound;
_freshTiles.push_back(item);
- _is_dirty = true;
_notFoundItems[ item._dir ] = now + UpdateInterval::SuccessfulAttempt;
writeCompletedTilesPersistentCache();
}
void SGTerraSync::WorkerThread::updated(SyncItem item, bool isNewDirectory)
{
- time_t now = time(0);
- _consecutive_errors = 0;
- _success_count++;
- SG_LOG(SG_TERRASYNC,SG_INFO,
- "Successfully synchronized directory '" << item._dir << "'");
+ {
+ SGGuard<SGMutex> g(_stateLock);
+ time_t now = time(0);
+ _state._consecutive_errors = 0;
+ _state._success_count++;
+ SG_LOG(SG_TERRASYNC,SG_INFO,
+ "Successfully synchronized directory '" << item._dir << "'");
+
+ item._status = SyncItem::Updated;
+ if (item._type == SyncItem::Tile) {
+ _state._updated_tile_count++;
+ }
- item._status = SyncItem::Updated;
- if (item._type == SyncItem::Tile) {
- _updated_tile_count++;
+ _freshTiles.push_back(item);
+ _completedTiles[ item._dir ] = now + UpdateInterval::SuccessfulAttempt;
}
- _freshTiles.push_back(item);
- _completedTiles[ item._dir ] = now + UpdateInterval::SuccessfulAttempt;
- _is_dirty = true;
writeCompletedTilesPersistentCache();
}
void SGTerraSync::reinit()
{
// do not reinit when enabled and we're already up and running
- if ((_terraRoot->getBoolValue("enabled",false))&&
- (_workerThread->_active && _workerThread->_running))
+ if ((_terraRoot->getBoolValue("enabled",false)) && _workerThread->isRunning())
{
return;
}
}
}
- _stalledNode->setBoolValue(_workerThread->_stalled);
+ _stalledNode->setBoolValue(_workerThread->isStalled());
}
void SGTerraSync::bind()
}
_bound = true;
- _tiedProperties.Tie( _terraRoot->getNode("busy", true), (bool*) &_workerThread->_busy );
- _tiedProperties.Tie( _terraRoot->getNode("active", true), (bool*) &_workerThread->_active );
- _tiedProperties.Tie( _terraRoot->getNode("update-count", true), (int*) &_workerThread->_success_count );
- _tiedProperties.Tie( _terraRoot->getNode("error-count", true), (int*) &_workerThread->_fail_count );
- _tiedProperties.Tie( _terraRoot->getNode("tile-count", true), (int*) &_workerThread->_updated_tile_count );
- _tiedProperties.Tie( _terraRoot->getNode("cache-hits", true), (int*) &_workerThread->_cache_hits );
- _tiedProperties.Tie( _terraRoot->getNode("transfer-rate-bytes-sec", true), (int*) &_workerThread->_transfer_rate );
-
- // use kbytes here because propety doesn't support 64-bit and we might conceivably
- // download more than 2G in a single session
- _tiedProperties.Tie( _terraRoot->getNode("downloaded-kbytes", true), (int*) &_workerThread->_total_kb_downloaded );
-
- _tiedProperties.Tie( _terraRoot->getNode("pending-kbytes", true), (int*) &_workerThread->_totalKbPending );
-
_terraRoot->getNode("busy", true)->setAttribute(SGPropertyNode::WRITE,false);
_terraRoot->getNode("active", true)->setAttribute(SGPropertyNode::WRITE,false);
_terraRoot->getNode("tile-count", true)->setAttribute(SGPropertyNode::WRITE,false);
_terraRoot->getNode("use-built-in-svn", true)->setAttribute(SGPropertyNode::USERARCHIVE,false);
_terraRoot->getNode("use-svn", true)->setAttribute(SGPropertyNode::USERARCHIVE,false);
+
// stalled is used as a signal handler (to connect listeners triggering GUI pop-ups)
_stalledNode = _terraRoot->getNode("stalled", true);
- _stalledNode->setBoolValue(_workerThread->_stalled);
+ _stalledNode->setBoolValue(_workerThread->isStalled());
_stalledNode->setAttribute(SGPropertyNode::PRESERVE,true);
}
void SGTerraSync::update(double)
{
- static SGBucket bucket;
- if (_workerThread->isDirty())
- {
- if (!_workerThread->_active)
- {
- if (_workerThread->_stalled)
- {
- SG_LOG(SG_TERRASYNC,SG_ALERT,
- "Automatic scenery download/synchronization stalled. Too many errors.");
- }
- else
- {
- // not really an alert - just always show this message
- SG_LOG(SG_TERRASYNC,SG_ALERT,
- "Automatic scenery download/synchronization has stopped.");
- }
- _stalledNode->setBoolValue(_workerThread->_stalled);
- }
+ TerrasyncThreadState copiedState(_workerThread->threadsafeCopyState());
- while (_workerThread->hasNewTiles())
- {
- SyncItem next = _workerThread->getNewTile();
+ _terraRoot->setBoolValue("busy", copiedState._busy);
+ _terraRoot->setIntValue("update-count", copiedState._success_count);
+ _terraRoot->setIntValue("error-count", copiedState._fail_count);
+ _terraRoot->setIntValue("tile-count", copiedState._updated_tile_count);
+ _terraRoot->setIntValue("cache-hits", copiedState._cache_hits);
+ _terraRoot->setIntValue("transfer-rate-bytes-sec", copiedState._transfer_rate);
+ _terraRoot->setIntValue("downloaded-kbytes", copiedState._total_kb_downloaded);
+ _terraRoot->setIntValue("pending-kbytes", copiedState._totalKbPending);
- if ((next._type == SyncItem::Tile) || (next._type == SyncItem::AIData)) {
- _activeTileDirs.erase(next._dir);
- }
- } // of freshly synced items
- }
+ _stalledNode->setBoolValue(_workerThread->isStalled());
+ _terraRoot->setBoolValue("active", _workerThread->isRunning());
+
+ while (_workerThread->hasNewTiles())
+ {
+ SyncItem next = _workerThread->getNewTile();
+
+ if ((next._type == SyncItem::Tile) || (next._type == SyncItem::AIData)) {
+ _activeTileDirs.erase(next._dir);
+ }
+ } // of freshly synced items
}
bool SGTerraSync::isIdle() {return _workerThread->isIdle();}
bool SGTerraSync::isTileDirPending(const std::string& sceneryDir) const
{
- if (!_workerThread->_running) {
+ if (!_workerThread->isRunning()) {
return false;
}
bool SGTerraSync::isDataDirPending(const std::string& dataDir) const
{
- if (!_workerThread->_running) {
+ if (!_workerThread->isRunning()) {
return false;
}