]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/khashmir.py
Make the DHT timeouts configuration parameters.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
index 230de1fb3a18124b30a5b74291e1facb8948290e..7abed574dee3b59d78115a94f76c3bd9a1d24821 100644 (file)
@@ -9,10 +9,12 @@ warnings.simplefilter("ignore", DeprecationWarning)
 from datetime import datetime, timedelta
 from random import randrange, shuffle
 from sha import sha
+from copy import copy
 import os
 
 from twisted.internet.defer import Deferred
 from twisted.internet import protocol, reactor
+from twisted.python import log
 from twisted.trial import unittest
 
 from db import DB
@@ -78,7 +80,7 @@ class KhashmirBase(protocol.Factory):
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
         self.token_secrets = [newID()]
-        self.stats = StatsLogger(self.table, self.store, self.config)
+        self.stats = StatsLogger(self.table, self.store)
         
         # Start listening
         self.udp = krpc.hostbroker(self, self.stats, config)
@@ -160,7 +162,7 @@ class KhashmirBase(protocol.Factory):
         n = self.Node(NULL_ID, host, port)
         self.sendJoin(n, callback=callback, errback=errback)
 
-    def findNode(self, id, callback, errback=None):
+    def findNode(self, id, callback):
         """Find the contact info for the K closest nodes in the global table.
         
         @type id: C{string}
@@ -168,25 +170,13 @@ class KhashmirBase(protocol.Factory):
         @type callback: C{method}
         @param callback: the method to call with the results, it must take 1
             parameter, the list of K closest nodes
-        @type errback: C{method}
-        @param errback: the method to call if an error occurs
-            (optional, defaults to doing nothing when an error occurs)
         """
-        # Get K nodes out of local table/cache
-        nodes = self.table.findNodes(id)
-        d = Deferred()
-        if errback:
-            d.addCallbacks(callback, errback)
-        else:
-            d.addCallback(callback)
+        # Start with our node
+        nodes = [copy(self.node)]
 
-        # If the target ID was found
-        if len(nodes) == 1 and nodes[0].id == id:
-            d.callback(nodes)
-        else:
-            # Start the finding nodes action
-            state = FindNode(self, id, d.callback, self.config, self.stats)
-            reactor.callLater(0, state.goWithNodes, nodes)
+        # Start the finding nodes action
+        state = FindNode(self, id, callback, self.config, self.stats)
+        reactor.callLater(0, state.goWithNodes, nodes)
     
     def insertNode(self, node, contacted = True):
         """Try to insert a node in our local table, pinging oldest contact if necessary.
@@ -208,13 +198,16 @@ class KhashmirBase(protocol.Factory):
             (datetime.now() - old.lastSeen) > 
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
-            def _staleNodeHandler(failure, oldnode = old, newnode = node):
+            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
                 """The pinged node never responded, so replace it."""
+                log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
+                log.err(err)
+                self.stats.completedAction('ping', start)
                 self.table.replaceStaleNode(oldnode, newnode)
             
-            def _notStaleNodeHandler(dict, old=old):
+            def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
                 """Got a pong from the old node, so update it."""
-                dict = dict['rsp']
+                self.stats.completedAction('ping', start)
                 if dict['id'] == old.id:
                     self.table.justSeenNode(old.id)
             
@@ -237,16 +230,20 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to calling the callback with None)
         """
 
-        def _pongHandler(dict, node=node, self=self, callback=callback):
+        def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
             """Node responded properly, callback with response."""
-            n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+            n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+            self.stats.completedAction('join', start)
             self.insertNode(n)
             if callback:
-                callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
+                callback((dict['ip_addr'], dict['port']))
 
-        def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
+        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
             """Error occurred, fail node and errback or callback with error."""
-            table.nodeFailed(node)
+            log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
+            log.err(err)
+            self.stats.completedAction('join', start)
+            self.table.nodeFailed(node)
             if errback:
                 errback()
             elif callback:
@@ -256,7 +253,7 @@ class KhashmirBase(protocol.Factory):
         df = node.join(self.node.id)
         df.addCallbacks(_pongHandler, _defaultPong)
 
-    def findCloseNodes(self, callback=lambda a: None, errback = None):
+    def findCloseNodes(self, callback=lambda a: None):
         """Perform a findNode on the ID one away from our own.
 
         This will allow us to populate our table with nodes on our network
@@ -267,12 +264,9 @@ class KhashmirBase(protocol.Factory):
         @param callback: the method to call with the results, it must take 1
             parameter, the list of K closest nodes
             (optional, defaults to doing nothing with the results)
-        @type errback: C{method}
-        @param errback: the method to call if an error occurs
-            (optional, defaults to doing nothing when an error occurs)
         """
         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
-        self.findNode(id, callback, errback)
+        self.findNode(id, callback)
 
     def refreshTable(self, force = False):
         """Check all the buckets for those that need refreshing.
@@ -304,7 +298,7 @@ class KhashmirBase(protocol.Factory):
         return self.stats.formatHTML()
 
     #{ Remote interface
-    def krpc_ping(self, id, _krpc_sender):
+    def krpc_ping(self, id, _krpc_sender = None):
         """Pong with our ID.
         
         @type id: C{string}
@@ -312,12 +306,13 @@ class KhashmirBase(protocol.Factory):
         @type _krpc_sender: (C{string}, C{int})
         @param _krpc_sender: the sender node's IP address and port
         """
-        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted = False)
+        if _krpc_sender is not None:
+            n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+            self.insertNode(n, contacted = False)
 
         return {"id" : self.node.id}
         
-    def krpc_join(self, id, _krpc_sender):
+    def krpc_join(self, id, _krpc_sender = None):
         """Add the node by responding with its address and port.
         
         @type id: C{string}
@@ -325,12 +320,15 @@ class KhashmirBase(protocol.Factory):
         @type _krpc_sender: (C{string}, C{int})
         @param _krpc_sender: the sender node's IP address and port
         """
-        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted = False)
+        if _krpc_sender is not None:
+            n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+            self.insertNode(n, contacted = False)
+        else:
+            _krpc_sender = ('127.0.0.1', self.port)
 
         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
         
-    def krpc_find_node(self, target, id, _krpc_sender):
+    def krpc_find_node(self, id, target, _krpc_sender = None):
         """Find the K closest nodes to the target in the local routing table.
         
         @type target: C{string}
@@ -340,8 +338,11 @@ class KhashmirBase(protocol.Factory):
         @type _krpc_sender: (C{string}, C{int})
         @param _krpc_sender: the sender node's IP address and port
         """
-        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted = False)
+        if _krpc_sender is not None:
+            n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+            self.insertNode(n, contacted = False)
+        else:
+            _krpc_sender = ('127.0.0.1', self.port)
 
         nodes = self.table.findNodes(target)
         nodes = map(lambda node: node.contactInfo(), nodes)
@@ -355,7 +356,7 @@ class KhashmirRead(KhashmirBase):
     _Node = KNodeRead
 
     #{ Local interface
-    def findValue(self, key, callback, errback=None):
+    def findValue(self, key, callback):
         """Get the nodes that have values for the key from the global table.
         
         @type key: C{string}
@@ -363,20 +364,12 @@ class KhashmirRead(KhashmirBase):
         @type callback: C{method}
         @param callback: the method to call with the results, it must take 1
             parameter, the list of nodes with values
-        @type errback: C{method}
-        @param errback: the method to call if an error occurs
-            (optional, defaults to doing nothing when an error occurs)
         """
-        # Get K nodes out of local table/cache
-        nodes = self.table.findNodes(key)
-        d = Deferred()
-        if errback:
-            d.addCallbacks(callback, errback)
-        else:
-            d.addCallback(callback)
-
+        # Start with ourself
+        nodes = [copy(self.node)]
+        
         # Search for others starting with the locally found ones
-        state = FindValue(self, key, d.callback, self.config, self.stats)
+        state = FindValue(self, key, callback, self.config, self.stats)
         reactor.callLater(0, state.goWithNodes, nodes)
 
     def valueForKey(self, key, callback, searchlocal = True):
@@ -393,24 +386,25 @@ class KhashmirRead(KhashmirBase):
         @type searchlocal: C{boolean}
         @param searchlocal: whether to also look for any local values
         """
-        # Get any local values
-        if searchlocal:
-            l = self.store.retrieveValues(key)
-            if len(l) > 0:
-                reactor.callLater(0, callback, key, l)
-        else:
-            l = []
 
-        def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
+        def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
             """Use the found nodes to send requests for values to."""
-            state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
+            # Get any local values
+            if searchlocal:
+                l = self.store.retrieveValues(key)
+                if len(l) > 0:
+                    node = copy(self.node)
+                    node.updateNumValues(len(l))
+                    nodes = nodes + [node]
+
+            state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
             reactor.callLater(0, state.goWithNodes, nodes)
             
         # First lookup nodes that have values for the key
         self.findValue(key, _getValueForKey)
 
     #{ Remote interface
-    def krpc_find_value(self, key, id, _krpc_sender):
+    def krpc_find_value(self, id, key, _krpc_sender = None):
         """Find the number of values stored locally for the key, and the K closest nodes.
         
         @type key: C{string}
@@ -420,15 +414,16 @@ class KhashmirRead(KhashmirBase):
         @type _krpc_sender: (C{string}, C{int})
         @param _krpc_sender: the sender node's IP address and port
         """
-        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted = False)
+        if _krpc_sender is not None:
+            n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+            self.insertNode(n, contacted = False)
     
         nodes = self.table.findNodes(key)
         nodes = map(lambda node: node.contactInfo(), nodes)
         num_values = self.store.countValues(key)
         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
 
-    def krpc_get_value(self, key, num, id, _krpc_sender):
+    def krpc_get_value(self, id, key, num, _krpc_sender = None):
         """Retrieve the values stored locally for the key.
         
         @type key: C{string}
@@ -441,8 +436,9 @@ class KhashmirRead(KhashmirBase):
         @type _krpc_sender: (C{string}, C{int})
         @param _krpc_sender: the sender node's IP address and port
         """
-        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted = False)
+        if _krpc_sender is not None:
+            n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+            self.insertNode(n, contacted = False)
     
         l = self.store.retrieveValues(key)
         if num == 0 or num >= len(l):
@@ -487,7 +483,7 @@ class KhashmirWrite(KhashmirRead):
         self.findNode(key, _storeValueForKey)
                     
     #{ Remote interface
-    def krpc_store_value(self, key, value, token, id, _krpc_sender):
+    def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
         """Store the value locally with the key.
         
         @type key: C{string}
@@ -500,8 +496,12 @@ class KhashmirWrite(KhashmirRead):
         @type _krpc_sender: (C{string}, C{int})
         @param _krpc_sender: the sender node's IP address and port
         """
-        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted = False)
+        if _krpc_sender is not None:
+            n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+            self.insertNode(n, contacted = False)
+        else:
+            _krpc_sender = ('127.0.0.1', self.port)
+
         for secret in self.token_secrets:
             this_token = sha(secret + _krpc_sender[0]).digest()
             if token == this_token:
@@ -518,11 +518,12 @@ class Khashmir(KhashmirWrite):
 class SimpleTests(unittest.TestCase):
     
     timeout = 10
-    DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+    DHT_DEFAULTS = {'PORT': 9977,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def setUp(self):
@@ -591,11 +592,12 @@ class MultiTest(unittest.TestCase):
     
     timeout = 30
     num = 20
-    DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+    DHT_DEFAULTS = {'PORT': 9977,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def _done(self, val):