]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/khashmir.py
Make the DHT timeouts configuration parameters.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
index 554608feda1494ba7e1bfedf98379d9f5096caaa..7abed574dee3b59d78115a94f76c3bd9a1d24821 100644 (file)
@@ -80,7 +80,7 @@ class KhashmirBase(protocol.Factory):
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
         self.token_secrets = [newID()]
-        self.stats = StatsLogger(self.table, self.store, self.config)
+        self.stats = StatsLogger(self.table, self.store)
         
         # Start listening
         self.udp = krpc.hostbroker(self, self.stats, config)
@@ -162,7 +162,7 @@ class KhashmirBase(protocol.Factory):
         n = self.Node(NULL_ID, host, port)
         self.sendJoin(n, callback=callback, errback=errback)
 
-    def findNode(self, id, callback, errback=None):
+    def findNode(self, id, callback):
         """Find the contact info for the K closest nodes in the global table.
         
         @type id: C{string}
@@ -170,26 +170,13 @@ class KhashmirBase(protocol.Factory):
         @type callback: C{method}
         @param callback: the method to call with the results, it must take 1
             parameter, the list of K closest nodes
-        @type errback: C{method}
-        @param errback: the method to call if an error occurs
-            (optional, defaults to doing nothing when an error occurs)
         """
-        # Get K nodes out of local table/cache
-        nodes = self.table.findNodes(id)
-        nodes = [copy(node) for node in nodes]
-        d = Deferred()
-        if errback:
-            d.addCallbacks(callback, errback)
-        else:
-            d.addCallback(callback)
+        # Start with our node
+        nodes = [copy(self.node)]
 
-        # If the target ID was found
-        if len(nodes) == 1 and nodes[0].id == id:
-            d.callback(nodes)
-        else:
-            # Start the finding nodes action
-            state = FindNode(self, id, d.callback, self.config, self.stats)
-            reactor.callLater(0, state.goWithNodes, nodes)
+        # Start the finding nodes action
+        state = FindNode(self, id, callback, self.config, self.stats)
+        reactor.callLater(0, state.goWithNodes, nodes)
     
     def insertNode(self, node, contacted = True):
         """Try to insert a node in our local table, pinging oldest contact if necessary.
@@ -211,14 +198,16 @@ class KhashmirBase(protocol.Factory):
             (datetime.now() - old.lastSeen) > 
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
-            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
+            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
                 """The pinged node never responded, so replace it."""
                 log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
                 log.err(err)
+                self.stats.completedAction('ping', start)
                 self.table.replaceStaleNode(oldnode, newnode)
             
-            def _notStaleNodeHandler(dict, old=old, self=self):
+            def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
                 """Got a pong from the old node, so update it."""
+                self.stats.completedAction('ping', start)
                 if dict['id'] == old.id:
                     self.table.justSeenNode(old.id)
             
@@ -241,17 +230,19 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to calling the callback with None)
         """
 
-        def _pongHandler(dict, node=node, self=self, callback=callback):
+        def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
             """Node responded properly, callback with response."""
             n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+            self.stats.completedAction('join', start)
             self.insertNode(n)
             if callback:
                 callback((dict['ip_addr'], dict['port']))
 
-        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
+        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
             """Error occurred, fail node and errback or callback with error."""
             log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
             log.err(err)
+            self.stats.completedAction('join', start)
             self.table.nodeFailed(node)
             if errback:
                 errback()
@@ -262,7 +253,7 @@ class KhashmirBase(protocol.Factory):
         df = node.join(self.node.id)
         df.addCallbacks(_pongHandler, _defaultPong)
 
-    def findCloseNodes(self, callback=lambda a: None, errback = None):
+    def findCloseNodes(self, callback=lambda a: None):
         """Perform a findNode on the ID one away from our own.
 
         This will allow us to populate our table with nodes on our network
@@ -273,12 +264,9 @@ class KhashmirBase(protocol.Factory):
         @param callback: the method to call with the results, it must take 1
             parameter, the list of K closest nodes
             (optional, defaults to doing nothing with the results)
-        @type errback: C{method}
-        @param errback: the method to call if an error occurs
-            (optional, defaults to doing nothing when an error occurs)
         """
         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
-        self.findNode(id, callback, errback)
+        self.findNode(id, callback)
 
     def refreshTable(self, force = False):
         """Check all the buckets for those that need refreshing.
@@ -368,7 +356,7 @@ class KhashmirRead(KhashmirBase):
     _Node = KNodeRead
 
     #{ Local interface
-    def findValue(self, key, callback, errback=None):
+    def findValue(self, key, callback):
         """Get the nodes that have values for the key from the global table.
         
         @type key: C{string}
@@ -376,21 +364,12 @@ class KhashmirRead(KhashmirBase):
         @type callback: C{method}
         @param callback: the method to call with the results, it must take 1
             parameter, the list of nodes with values
-        @type errback: C{method}
-        @param errback: the method to call if an error occurs
-            (optional, defaults to doing nothing when an error occurs)
         """
-        # Get K nodes out of local table/cache
-        nodes = self.table.findNodes(key)
-        nodes = [copy(node) for node in nodes]
-        d = Deferred()
-        if errback:
-            d.addCallbacks(callback, errback)
-        else:
-            d.addCallback(callback)
-
+        # Start with ourself
+        nodes = [copy(self.node)]
+        
         # Search for others starting with the locally found ones
-        state = FindValue(self, key, d.callback, self.config, self.stats)
+        state = FindValue(self, key, callback, self.config, self.stats)
         reactor.callLater(0, state.goWithNodes, nodes)
 
     def valueForKey(self, key, callback, searchlocal = True):
@@ -407,17 +386,18 @@ class KhashmirRead(KhashmirBase):
         @type searchlocal: C{boolean}
         @param searchlocal: whether to also look for any local values
         """
-        # Get any local values
-        if searchlocal:
-            l = self.store.retrieveValues(key)
-            if len(l) > 0:
-                reactor.callLater(0, callback, key, l)
-        else:
-            l = []
 
-        def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
+        def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
             """Use the found nodes to send requests for values to."""
-            state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
+            # Get any local values
+            if searchlocal:
+                l = self.store.retrieveValues(key)
+                if len(l) > 0:
+                    node = copy(self.node)
+                    node.updateNumValues(len(l))
+                    nodes = nodes + [node]
+
+            state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
             reactor.callLater(0, state.goWithNodes, nodes)
             
         # First lookup nodes that have values for the key
@@ -538,11 +518,12 @@ class Khashmir(KhashmirWrite):
 class SimpleTests(unittest.TestCase):
     
     timeout = 10
-    DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+    DHT_DEFAULTS = {'PORT': 9977,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def setUp(self):
@@ -611,11 +592,12 @@ class MultiTest(unittest.TestCase):
     
     timeout = 30
     num = 20
-    DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+    DHT_DEFAULTS = {'PORT': 9977,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def _done(self, val):