From: James Turner Date: Wed, 25 Sep 2013 15:31:10 +0000 (+0100) Subject: Parallel sync of items. X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=db98c7440e56bec04ec44ad2eb0de4d3e2460481;p=simgear.git Parallel sync of items. When using built-in sync code, separate items into distinct slots. Slots process items sequentially, but each slot works in parallel (using a single shared HTTP engine). This allows tiles to be synced in parallel with airports/shared models data, greatly increasing responsiveness to get tiles synced on initial launch. --- diff --git a/simgear/scene/tsync/terrasync.cxx b/simgear/scene/tsync/terrasync.cxx index b71ee62c..bf25d207 100644 --- a/simgear/scene/tsync/terrasync.cxx +++ b/simgear/scene/tsync/terrasync.cxx @@ -63,7 +63,7 @@ #include #include #include - +#include static const bool svn_built_in_available = true; @@ -108,9 +108,9 @@ bool hasWhitespace(string path) } /////////////////////////////////////////////////////////////////////////////// -// WaitingTile //////////////////////////////////////////////////////////////// +// WaitingSyncItem //////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////// -class WaitingSyncItem +class WaitingSyncItem { public: enum Type @@ -121,21 +121,65 @@ public: SharedModels }; + WaitingSyncItem() : + _dir(), + _type(Stop), + _refreshScenery(false) + { + } + WaitingSyncItem(string dir, Type ty) : _dir(dir), _type(ty), _refreshScenery(false) {} - bool setRefresh() + void setRefresh() { _refreshScenery = true; } - const string _dir; - const Type _type; - + string _dir; + Type _type; bool _refreshScenery; }; +/////////////////////////////////////////////////////////////////////////////// + +/** + * @brief SyncSlot encapsulates a queue of sync items we will fetch + * serially. Multiple slots exist to sync different types of item in + * parallel. + */ +class SyncSlot +{ +public: + WaitingSyncItem currentItem; + bool isNewDirectory; + std::queue queue; + std::auto_ptr repository; + SGTimeStamp stamp; +}; + +static const int SYNC_SLOT_TILES = 0; ///< Terrain and Objects sync +static const int SYNC_SLOT_SHARED_DATA = 1; /// shared Models and Airport data +static const int NUM_SYNC_SLOTS = 2; + +/** + * @brief translate a sync item type into one of the available slots. + * This provides the scheduling / balancing / prioritising between slots. + */ +static unsigned int syncSlotForType(WaitingSyncItem::Type ty) +{ + switch (ty) { + case WaitingSyncItem::Tile: return SYNC_SLOT_TILES; + case WaitingSyncItem::SharedModels: + case WaitingSyncItem::AirportData: + return SYNC_SLOT_SHARED_DATA; + default: + return SYNC_SLOT_SHARED_DATA; + } +} + + /////////////////////////////////////////////////////////////////////////////// // SGTerraSync::SvnThread ///////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////// @@ -178,21 +222,27 @@ public: volatile int _cache_hits; private: virtual void run(); - bool syncTree(const char* dir, bool& isNewDirectory); - bool syncTreeExternal(const char* dir); + + // external model run and helpers + void runExternal(); + void syncPathExternal(const WaitingSyncItem& next); + bool runExternalSyncCommand(const char* dir); + + // internal mode run and helpers + void runInternal(); + void updateSyncSlot(SyncSlot& slot); + + // commond helpers between both internal and external models bool isPathCached(const WaitingSyncItem& next) const; - void syncPath(const WaitingSyncItem& next); + void initCompletedTilesPersistentCache(); + void writeCompletedTilesPersistentCache() const; + void updated(const WaitingSyncItem& item, bool isNewDirectory); + void fail(const WaitingSyncItem& failedItem); - void initCompletedTilesPersistentCache(); - void writeCompletedTilesPersistentCache() const; - - bool syncTreeInternal(const char* dir); - - bool _use_built_in; - HTTP::Client _http; - std::auto_ptr _repository; - + bool _use_built_in; + HTTP::Client _http; + SyncSlot _syncSlots[NUM_SYNC_SLOTS]; volatile bool _is_dirty; volatile bool _stop; @@ -330,66 +380,7 @@ bool SGTerraSync::SvnThread::start() return true; } -// sync one directory tree -bool SGTerraSync::SvnThread::syncTree(const char* dir, bool& isNewDirectory) -{ - int rc; - SGPath path( _local_dir ); - - path.append( dir ); - isNewDirectory = !path.exists(); - if (isNewDirectory) - { - rc = path.create_dir( 0755 ); - if (rc) - { - SG_LOG(SG_TERRAIN,SG_ALERT, - "Cannot create directory '" << dir << "', return code = " << rc ); - return false; - } - } - - if (_use_built_in) - return syncTreeInternal(dir); - else - return syncTreeExternal(dir); -} - -bool SGTerraSync::SvnThread::syncTreeInternal(const char* dir) -{ - ostringstream command; - command << _svn_server << "/" << dir; - - SGPath path(_local_dir); - path.append(dir); - _repository.reset(new SVNRepository(path, &_http)); - _repository->setBaseUrl(command.str()); - - SGTimeStamp st; - st.stamp(); - SG_LOG(SG_IO, SG_DEBUG, "terrasync: will sync " << command.str()); - _repository->update(); - - bool result = true; - while (!_stop && _repository->isDoingSync()) { - _http.update(100); - } - - if (_repository->failure() == SVNRepository::SVN_ERROR_NOT_FOUND) { - // this is fine, but maybe we should use a different return code - // in the future to higher layers can distuinguish this case - } else if (_repository->failure() != SVNRepository::SVN_NO_ERROR) { - result = false; - } else { - SG_LOG(SG_IO, SG_DEBUG, "sync of " << command.str() << " finished (" - << st.elapsedMSec() << " msec"); - } - - _repository.reset(); - return result; -} - -bool SGTerraSync::SvnThread::syncTreeExternal(const char* dir) +bool SGTerraSync::SvnThread::runExternalSyncCommand(const char* dir) { ostringstream buf; SGPath localPath( _local_dir ); @@ -446,9 +437,22 @@ bool SGTerraSync::SvnThread::syncTreeExternal(const char* dir) void SGTerraSync::SvnThread::run() { _active = true; - initCompletedTilesPersistentCache(); + + + if (_use_built_in) { + runInternal(); + } else { + runExternal(); + } + _active = false; + _running = false; + _is_dirty = true; +} + +void SGTerraSync::SvnThread::runExternal() +{ while (!_stop) { WaitingSyncItem next = waitingTiles.pop_front(); @@ -462,7 +466,7 @@ void SGTerraSync::SvnThread::run() continue; } - syncPath(next); + syncPathExternal(next); if ((_allowed_errors >= 0)&& (_consecutive_errors >= _allowed_errors)) @@ -470,11 +474,114 @@ void SGTerraSync::SvnThread::run() _stalled = true; _stop = true; } + } // of thread running loop +} + +void SGTerraSync::SvnThread::syncPathExternal(const WaitingSyncItem& next) +{ + _busy = true; + SGPath path( _local_dir ); + path.append( next._dir ); + bool isNewDirectory = !path.exists(); + + try { + if (isNewDirectory) { + int rc = path.create_dir( 0755 ); + if (rc) { + SG_LOG(SG_TERRAIN,SG_ALERT, + "Cannot create directory '" << path << "', return code = " << rc ); + throw sg_exception("Cannot create directory for terrasync", path.str()); + } + } + + if (!runExternalSyncCommand(next._dir.c_str())) { + throw sg_exception("Running external sync command failed"); + } + } catch (sg_exception& e) { + fail(next); + return; } + + updated(next, isNewDirectory); +} - _active = false; - _running = false; - _is_dirty = true; +void SGTerraSync::SvnThread::updateSyncSlot(SyncSlot &slot) +{ + if (slot.repository.get()) { + if (slot.repository->isDoingSync()) { + return; // easy, still working + } + + // check result + SVNRepository::ResultCode res = slot.repository->failure(); + if (res == SVNRepository::SVN_ERROR_NOT_FOUND) { + // this is fine, but maybe we should use a different return code + // in the future to higher layers can distinguish this case + } else if (res != SVNRepository::SVN_NO_ERROR) { + fail(slot.currentItem); + } else { + updated(slot.currentItem, slot.isNewDirectory); + SG_LOG(SG_IO, SG_DEBUG, "sync of " << slot.repository->baseUrl() << " finished (" + << slot.stamp.elapsedMSec() << " msec"); + } + + // whatever happened, we're done with this repository instance + slot.repository.reset(); + } + + // init and start sync of the next repository + if (!slot.queue.empty()) { + slot.currentItem = slot.queue.front(); + slot.queue.pop(); + + SGPath path(_local_dir); + path.append(slot.currentItem._dir); + slot.isNewDirectory = !path.exists(); + if (slot.isNewDirectory) { + int rc = path.create_dir( 0755 ); + if (rc) { + SG_LOG(SG_TERRAIN,SG_ALERT, + "Cannot create directory '" << path << "', return code = " << rc ); + fail(slot.currentItem); + return; + } + } // of creating directory step + + slot.repository.reset(new SVNRepository(path, &_http)); + slot.repository->setBaseUrl(_svn_server + "/" + slot.currentItem._dir); + slot.repository->update(); + + slot.stamp.stamp(); + SG_LOG(SG_IO, SG_DEBUG, "sync of " << slot.repository->baseUrl() << " started"); + } +} + +void SGTerraSync::SvnThread::runInternal() +{ + while (!_stop) { + _http.update(100); + if (_stop) + break; + + // drain the waiting tiles queue into the sync slot queues. + while (!waitingTiles.empty()) { + WaitingSyncItem next = waitingTiles.pop_front(); + if (isPathCached(next)) { + _cache_hits++; + SG_LOG(SG_TERRAIN, SG_DEBUG, + "Cache hit for: '" << next._dir << "'"); + continue; + } + + unsigned int slot = syncSlotForType(next._type); + _syncSlots[slot].queue.push(next); + } + + // update each sync slot in turn + for (unsigned int slot=0; slot < NUM_SYNC_SLOTS; ++slot) { + updateSyncSlot(_syncSlots[slot]); + } + } // of thread running loop } bool SGTerraSync::SvnThread::isPathCached(const WaitingSyncItem& next) const @@ -495,40 +602,36 @@ bool SGTerraSync::SvnThread::isPathCached(const WaitingSyncItem& next) const return (ii->second > now); } -void SGTerraSync::SvnThread::syncPath(const WaitingSyncItem& next) +void SGTerraSync::SvnThread::fail(const WaitingSyncItem& failedItem) { - bool isNewDirectory = false; time_t now = time(0); - - _busy = true; - if (!syncTree(next._dir.c_str(),isNewDirectory)) - { - _consecutive_errors++; - _fail_count++; - _completedTiles[ next._dir ] = now + UpdateInterval::FailedAttempt; - } - else - { - _consecutive_errors = 0; - _success_count++; - SG_LOG(SG_TERRAIN,SG_INFO, - "Successfully synchronized directory '" << next._dir << "'"); - if (next._refreshScenery) + _consecutive_errors++; + _fail_count++; + _completedTiles[ failedItem._dir ] = now + UpdateInterval::FailedAttempt; + _busy = false; +} + +void SGTerraSync::SvnThread::updated(const WaitingSyncItem& item, bool isNewDirectory) +{ + time_t now = time(0); + _consecutive_errors = 0; + _success_count++; + SG_LOG(SG_TERRAIN,SG_INFO, + "Successfully synchronized directory '" << item._dir << "'"); + if (item._refreshScenery) { + // updated a tile + _updated_tile_count++; + if (isNewDirectory) { - // updated a tile - _updated_tile_count++; - if (isNewDirectory) - { - // for now only report new directories to refresh display - // (i.e. only when ocean needs to be replaced with actual data) - _freshTiles.push_back(next); - _is_dirty = true; - } + // for now only report new directories to refresh display + // (i.e. only when ocean needs to be replaced with actual data) + _freshTiles.push_back(item); + _is_dirty = true; } - - _completedTiles[ next._dir ] = now + UpdateInterval::SuccessfulAttempt; - writeCompletedTilesPersistentCache(); } + + _completedTiles[ item._dir ] = now + UpdateInterval::SuccessfulAttempt; + writeCompletedTilesPersistentCache(); _busy = false; }