]> git.mxchange.org Git - simgear.git/commitdiff
Parallel sync of items.
authorJames Turner <zakalawe@mac.com>
Wed, 25 Sep 2013 15:31:10 +0000 (16:31 +0100)
committerJames Turner <zakalawe@mac.com>
Wed, 25 Sep 2013 15:31:10 +0000 (16:31 +0100)
When using built-in sync code, separate items into distinct slots.
Slots process items sequentially, but each slot works in parallel (using
a single shared HTTP engine). This allows tiles to be synced in parallel
with airports/shared models data, greatly increasing responsiveness
to get tiles synced on initial launch.

simgear/scene/tsync/terrasync.cxx

index b71ee62c0f5145336f07c20b7272b63a2cae298a..bf25d207652b701ee88658b00451c55dfb2d75fa 100644 (file)
@@ -63,7 +63,7 @@
 #include <simgear/props/props_io.hxx>
 #include <simgear/io/HTTPClient.hxx>
 #include <simgear/io/SVNRepository.hxx>
-
+#include <simgear/structure/exception.hxx>
 
 static const bool svn_built_in_available = true;
 
@@ -108,9 +108,9 @@ bool hasWhitespace(string path)
 }
 
 ///////////////////////////////////////////////////////////////////////////////
-// WaitingTile ////////////////////////////////////////////////////////////////
+// WaitingSyncItem ////////////////////////////////////////////////////////////
 ///////////////////////////////////////////////////////////////////////////////
-class  WaitingSyncItem
+class WaitingSyncItem
 {
 public:
     enum Type
@@ -121,21 +121,65 @@ public:
         SharedModels
     };
     
+    WaitingSyncItem() :
+        _dir(),
+        _type(Stop),
+        _refreshScenery(false)
+    {
+    }
+    
     WaitingSyncItem(string dir, Type ty) :
         _dir(dir),
         _type(ty),
         _refreshScenery(false)
     {}
         
-    bool setRefresh() 
+    void setRefresh()
     { _refreshScenery = true; }
         
-    const string _dir;
-    const Type _type;
-    
+    string _dir;
+    Type _type;
     bool _refreshScenery;
 };
 
+///////////////////////////////////////////////////////////////////////////////
+
+/**
+ * @brief SyncSlot encapsulates a queue of sync items we will fetch
+ * serially. Multiple slots exist to sync different types of item in
+ * parallel.
+ */
+class SyncSlot
+{
+public:
+    WaitingSyncItem currentItem;
+    bool isNewDirectory;
+    std::queue<WaitingSyncItem> queue;
+    std::auto_ptr<SVNRepository> repository;
+    SGTimeStamp stamp;
+};
+
+static const int SYNC_SLOT_TILES = 0; ///< Terrain and Objects sync
+static const int SYNC_SLOT_SHARED_DATA = 1; /// shared Models and Airport data
+static const int NUM_SYNC_SLOTS = 2;
+
+/**
+ * @brief translate a sync item type into one of the available slots.
+ * This provides the scheduling / balancing / prioritising between slots.
+ */
+static unsigned int syncSlotForType(WaitingSyncItem::Type ty)
+{
+    switch (ty) {
+    case WaitingSyncItem::Tile: return SYNC_SLOT_TILES;
+    case WaitingSyncItem::SharedModels:
+    case WaitingSyncItem::AirportData:
+        return SYNC_SLOT_SHARED_DATA;
+    default:
+        return SYNC_SLOT_SHARED_DATA;
+    }
+}
+
+
 ///////////////////////////////////////////////////////////////////////////////
 // SGTerraSync::SvnThread /////////////////////////////////////////////////////
 ///////////////////////////////////////////////////////////////////////////////
@@ -178,21 +222,27 @@ public:
    volatile int  _cache_hits;
 private:
    virtual void run();
-   bool syncTree(const char* dir, bool& isNewDirectory);
-   bool syncTreeExternal(const char* dir);
+    
+    // external model run and helpers
+    void runExternal();
+    void syncPathExternal(const WaitingSyncItem& next);
+    bool runExternalSyncCommand(const char* dir);
+
+    // internal mode run and helpers
+    void runInternal();
+    void updateSyncSlot(SyncSlot& slot);
+
+    // commond helpers between both internal and external models
     
     bool isPathCached(const WaitingSyncItem& next) const;
-    void syncPath(const WaitingSyncItem& next);
+    void initCompletedTilesPersistentCache();
+    void writeCompletedTilesPersistentCache() const;
+    void updated(const WaitingSyncItem& item, bool isNewDirectory);
+    void fail(const WaitingSyncItem& failedItem);
     
-   void initCompletedTilesPersistentCache();
-   void writeCompletedTilesPersistentCache() const;
-   
-   bool syncTreeInternal(const char* dir);
-   
-   bool _use_built_in;
-   HTTP::Client _http;
-   std::auto_ptr<SVNRepository> _repository;
-
+    bool _use_built_in;
+    HTTP::Client _http;
+    SyncSlot _syncSlots[NUM_SYNC_SLOTS];
 
    volatile bool _is_dirty;
    volatile bool _stop;
@@ -330,66 +380,7 @@ bool SGTerraSync::SvnThread::start()
     return true;
 }
 
-// sync one directory tree
-bool SGTerraSync::SvnThread::syncTree(const char* dir, bool& isNewDirectory)
-{
-    int rc;
-    SGPath path( _local_dir );
-
-    path.append( dir );
-    isNewDirectory = !path.exists();
-    if (isNewDirectory)
-    {
-        rc = path.create_dir( 0755 );
-        if (rc)
-        {
-            SG_LOG(SG_TERRAIN,SG_ALERT,
-                   "Cannot create directory '" << dir << "', return code = " << rc );
-            return false;
-        }
-    }
-
-    if (_use_built_in)
-        return syncTreeInternal(dir);
-    else
-        return syncTreeExternal(dir);
-}
-    
-bool SGTerraSync::SvnThread::syncTreeInternal(const char* dir)
-{
-    ostringstream command;
-    command << _svn_server << "/" << dir;
-
-    SGPath path(_local_dir);
-    path.append(dir);
-    _repository.reset(new SVNRepository(path, &_http));
-    _repository->setBaseUrl(command.str());
-    
-    SGTimeStamp st;
-    st.stamp();
-    SG_LOG(SG_IO, SG_DEBUG, "terrasync: will sync " << command.str());
-    _repository->update();
-    
-    bool result = true;
-    while (!_stop && _repository->isDoingSync()) {
-        _http.update(100);
-    }
-    
-    if (_repository->failure() == SVNRepository::SVN_ERROR_NOT_FOUND) {
-        // this is fine, but maybe we should use a different return code
-        // in the future to higher layers can distuinguish this case
-    } else if (_repository->failure() != SVNRepository::SVN_NO_ERROR) {
-        result = false;
-    } else {
-        SG_LOG(SG_IO, SG_DEBUG, "sync of " << command.str() << " finished ("
-            << st.elapsedMSec() << " msec");
-    }
-    
-    _repository.reset();
-    return result;
-}
-
-bool SGTerraSync::SvnThread::syncTreeExternal(const char* dir)
+bool SGTerraSync::SvnThread::runExternalSyncCommand(const char* dir)
 {
     ostringstream buf;
     SGPath localPath( _local_dir );
@@ -446,9 +437,22 @@ bool SGTerraSync::SvnThread::syncTreeExternal(const char* dir)
 void SGTerraSync::SvnThread::run()
 {
     _active = true;
-    
     initCompletedTilesPersistentCache();
+
+    
+    if (_use_built_in) {
+        runInternal();
+    } else {
+        runExternal();
+    }
     
+    _active = false;
+    _running = false;
+    _is_dirty = true;
+}
+
+void SGTerraSync::SvnThread::runExternal()
+{
     while (!_stop)
     {
         WaitingSyncItem next = waitingTiles.pop_front();
@@ -462,7 +466,7 @@ void SGTerraSync::SvnThread::run()
             continue;
         }
         
-        syncPath(next);
+        syncPathExternal(next);
 
         if ((_allowed_errors >= 0)&&
             (_consecutive_errors >= _allowed_errors))
@@ -470,11 +474,114 @@ void SGTerraSync::SvnThread::run()
             _stalled = true;
             _stop = true;
         }
+    } // of thread running loop
+}
+
+void SGTerraSync::SvnThread::syncPathExternal(const WaitingSyncItem& next)
+{
+    _busy = true;
+    SGPath path( _local_dir );
+    path.append( next._dir );
+    bool isNewDirectory = !path.exists();
+    
+    try {
+        if (isNewDirectory) {
+            int rc = path.create_dir( 0755 );
+            if (rc) {
+                SG_LOG(SG_TERRAIN,SG_ALERT,
+                       "Cannot create directory '" << path << "', return code = " << rc );
+                throw sg_exception("Cannot create directory for terrasync", path.str());
+            }
+        }
+        
+        if (!runExternalSyncCommand(next._dir.c_str())) {
+            throw sg_exception("Running external sync command failed");
+        }
+    } catch (sg_exception& e) {
+        fail(next);
+        return;
     }
+    
+    updated(next, isNewDirectory);
+}
 
-    _active = false;
-    _running = false;
-    _is_dirty = true;
+void SGTerraSync::SvnThread::updateSyncSlot(SyncSlot &slot)
+{
+    if (slot.repository.get()) {
+        if (slot.repository->isDoingSync()) {
+            return; // easy, still working
+        }
+        
+        // check result
+        SVNRepository::ResultCode res = slot.repository->failure();
+        if (res == SVNRepository::SVN_ERROR_NOT_FOUND) {
+            // this is fine, but maybe we should use a different return code
+            // in the future to higher layers can distinguish this case
+        } else if (res != SVNRepository::SVN_NO_ERROR) {
+            fail(slot.currentItem);
+        } else {
+            updated(slot.currentItem, slot.isNewDirectory);
+            SG_LOG(SG_IO, SG_DEBUG, "sync of " << slot.repository->baseUrl() << " finished ("
+                   << slot.stamp.elapsedMSec() << " msec");
+        }
+
+        // whatever happened, we're done with this repository instance
+        slot.repository.reset();
+    }
+    
+    // init and start sync of the next repository
+    if (!slot.queue.empty()) {
+        slot.currentItem = slot.queue.front();
+        slot.queue.pop();
+        
+        SGPath path(_local_dir);
+        path.append(slot.currentItem._dir);
+        slot.isNewDirectory = !path.exists();
+        if (slot.isNewDirectory) {
+            int rc = path.create_dir( 0755 );
+            if (rc) {
+                SG_LOG(SG_TERRAIN,SG_ALERT,
+                       "Cannot create directory '" << path << "', return code = " << rc );
+                fail(slot.currentItem);
+                return;
+            }
+        } // of creating directory step
+        
+        slot.repository.reset(new SVNRepository(path, &_http));
+        slot.repository->setBaseUrl(_svn_server + "/" + slot.currentItem._dir);
+        slot.repository->update();
+        
+        slot.stamp.stamp();
+        SG_LOG(SG_IO, SG_DEBUG, "sync of " << slot.repository->baseUrl() << " started");
+    }
+}
+
+void SGTerraSync::SvnThread::runInternal()
+{
+    while (!_stop) {
+        _http.update(100);
+        if (_stop)
+            break;
+        // drain the waiting tiles queue into the sync slot queues.
+        while (!waitingTiles.empty()) {
+            WaitingSyncItem next = waitingTiles.pop_front();
+            if (isPathCached(next)) {
+                _cache_hits++;
+                SG_LOG(SG_TERRAIN, SG_DEBUG,
+                       "Cache hit for: '" << next._dir << "'");
+                continue;
+            }
+
+            unsigned int slot = syncSlotForType(next._type);
+            _syncSlots[slot].queue.push(next);
+        }
+       
+        // update each sync slot in turn
+        for (unsigned int slot=0; slot < NUM_SYNC_SLOTS; ++slot) {
+            updateSyncSlot(_syncSlots[slot]);
+        }
+    } // of thread running loop
 }
 
 bool SGTerraSync::SvnThread::isPathCached(const WaitingSyncItem& next) const
@@ -495,40 +602,36 @@ bool SGTerraSync::SvnThread::isPathCached(const WaitingSyncItem& next) const
     return (ii->second > now);
 }
 
-void SGTerraSync::SvnThread::syncPath(const WaitingSyncItem& next)
+void SGTerraSync::SvnThread::fail(const WaitingSyncItem& failedItem)
 {
-    bool isNewDirectory = false;
     time_t now = time(0);
-    
-    _busy = true;
-    if (!syncTree(next._dir.c_str(),isNewDirectory))
-    {
-        _consecutive_errors++;
-        _fail_count++;
-        _completedTiles[ next._dir ] = now + UpdateInterval::FailedAttempt;
-    }
-    else
-    {
-        _consecutive_errors = 0;
-        _success_count++;
-        SG_LOG(SG_TERRAIN,SG_INFO,
-               "Successfully synchronized directory '" << next._dir << "'");
-        if (next._refreshScenery)
+    _consecutive_errors++;
+    _fail_count++;
+    _completedTiles[ failedItem._dir ] = now + UpdateInterval::FailedAttempt;
+    _busy = false;
+}
+
+void SGTerraSync::SvnThread::updated(const WaitingSyncItem& item, bool isNewDirectory)
+{
+    time_t now = time(0);
+    _consecutive_errors = 0;
+    _success_count++;
+    SG_LOG(SG_TERRAIN,SG_INFO,
+           "Successfully synchronized directory '" << item._dir << "'");
+    if (item._refreshScenery) {
+        // updated a tile
+        _updated_tile_count++;
+        if (isNewDirectory)
         {
-            // updated a tile
-            _updated_tile_count++;
-            if (isNewDirectory)
-            {
-                // for now only report new directories to refresh display
-                // (i.e. only when ocean needs to be replaced with actual data)
-                _freshTiles.push_back(next);
-                _is_dirty = true;
-            }
+            // for now only report new directories to refresh display
+            // (i.e. only when ocean needs to be replaced with actual data)
+            _freshTiles.push_back(item);
+            _is_dirty = true;
         }
-        
-        _completedTiles[ next._dir ] = now + UpdateInterval::SuccessfulAttempt;
-        writeCompletedTilesPersistentCache();
     }
+    
+    _completedTiles[ item._dir ] = now + UpdateInterval::SuccessfulAttempt;
+    writeCompletedTilesPersistentCache();
     _busy = false;
 }