]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/khashmir.py
Update the version number and changelog for the initial release.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
index 8860c6719491e67c6267a1e483267587bb5dc3d2..5db5ed8ab48015c9c73073f9c8ceab8eac652701 100644 (file)
@@ -80,7 +80,7 @@ class KhashmirBase(protocol.Factory):
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
         self.token_secrets = [newID()]
-        self.stats = StatsLogger(self.table, self.store, self.config)
+        self.stats = StatsLogger(self.table, self.store)
         
         # Start listening
         self.udp = krpc.hostbroker(self, self.stats, config)
@@ -198,14 +198,15 @@ class KhashmirBase(protocol.Factory):
             (datetime.now() - old.lastSeen) > 
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
-            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
+            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
                 """The pinged node never responded, so replace it."""
-                log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
-                log.err(err)
+                log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage()))
+                self.stats.completedAction('ping', start)
                 self.table.replaceStaleNode(oldnode, newnode)
             
-            def _notStaleNodeHandler(dict, old=old, self=self):
+            def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
                 """Got a pong from the old node, so update it."""
+                self.stats.completedAction('ping', start)
                 if dict['id'] == old.id:
                     self.table.justSeenNode(old.id)
             
@@ -228,17 +229,18 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to calling the callback with None)
         """
 
-        def _pongHandler(dict, node=node, self=self, callback=callback):
+        def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
             """Node responded properly, callback with response."""
             n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
-            self.insertNode(n)
+            self.stats.completedAction('join', start)
+            reactor.callLater(0, self.insertNode, n)
             if callback:
                 callback((dict['ip_addr'], dict['port']))
 
-        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
+        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
             """Error occurred, fail node and errback or callback with error."""
-            log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
-            log.err(err)
+            log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
+            self.stats.completedAction('join', start)
             self.table.nodeFailed(node)
             if errback:
                 errback()
@@ -304,7 +306,7 @@ class KhashmirBase(protocol.Factory):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
 
         return {"id" : self.node.id}
         
@@ -318,7 +320,7 @@ class KhashmirBase(protocol.Factory):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
@@ -336,7 +338,7 @@ class KhashmirBase(protocol.Factory):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
@@ -412,7 +414,7 @@ class KhashmirRead(KhashmirBase):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
     
         nodes = self.table.findNodes(key)
         nodes = map(lambda node: node.contactInfo(), nodes)
@@ -434,7 +436,7 @@ class KhashmirRead(KhashmirBase):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
     
         l = self.store.retrieveValues(key)
         if num == 0 or num >= len(l):
@@ -494,7 +496,7 @@ class KhashmirWrite(KhashmirRead):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
@@ -514,11 +516,12 @@ class Khashmir(KhashmirWrite):
 class SimpleTests(unittest.TestCase):
     
     timeout = 10
-    DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+    DHT_DEFAULTS = {'PORT': 9977,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def setUp(self):
@@ -587,11 +590,12 @@ class MultiTest(unittest.TestCase):
     
     timeout = 30
     num = 20
-    DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+    DHT_DEFAULTS = {'PORT': 9977,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def _done(self, val):