Increase the concurrency of DHT requests to 8.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
index 294940c..f083da4 100644 (file)
@@ -29,7 +29,9 @@ import krpc
 
 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
                      '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
-                     '(172\.0?([1][6-9])|([2][0-9])|([3][0-1])\.[0-9]{1,3}\.[0-9]{1,3})|'+
+                     '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
+                     '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
+                     '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
                      '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
 
 class KhashmirBase(protocol.Factory):
@@ -164,7 +166,7 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to doing nothing with the results)
         @type errback: C{method}
         @param errback: the method to call if an error occurs
-            (optional, defaults to calling the callback with None)
+            (optional, defaults to calling the callback with the error)
         """
         n = self.Node(NULL_ID, host, port)
         self.sendJoin(n, callback=callback, errback=errback)
@@ -190,7 +192,7 @@ class KhashmirBase(protocol.Factory):
         
         If all you have is a host/port, then use L{addContact}, which calls this
         method after receiving the PONG from the remote node. The reason for
-        the seperation is we can't insert a node into the table without its
+        the separation is we can't insert a node into the table without its
         node ID. That means of course the node passed into this method needs
         to be a properly formed Node object with a valid ID.
 
@@ -202,30 +204,57 @@ class KhashmirBase(protocol.Factory):
         """
         # Don't add any local nodes to the routing table
         if not self.config['LOCAL_OK'] and isLocal.match(node.host):
+            log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
             return
-
+        
         old = self.table.insertNode(node, contacted=contacted)
-        if (old and old.id != self.node.id and
+
+        if (isinstance(old, self._Node) and old.id != self.node.id and
             (datetime.now() - old.lastSeen) > 
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
-            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
-                """The pinged node never responded, so replace it."""
-                log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage()))
-                self.stats.completedAction('ping', start)
-                self.table.replaceStaleNode(oldnode, newnode)
-            
-            def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
-                """Got a pong from the old node, so update it."""
-                self.stats.completedAction('ping', start)
-                if dict['id'] == old.id:
-                    self.table.justSeenNode(old.id)
-            
             # Bucket is full, check to see if old node is still available
             self.stats.startedAction('ping')
             df = old.ping(self.node.id)
-            df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
+            df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler,
+                            callbackArgs = (old, datetime.now()),
+                            errbackArgs = (old, datetime.now(), node, contacted))
+        elif not old and not contacted:
+            # There's room, we just need to contact the node first
+            self.stats.startedAction('ping')
+            df = node.ping(self.node.id)
+            # Convert the returned contact info into a node
+            df.addCallback(self._pongHandler, datetime.now())
+            # Try adding the contacted node
+            df.addCallbacks(self.insertNode, self._pongError,
+                            errbackArgs = (node, datetime.now()))
+
+    def _freshNodeHandler(self, dict, old, start):
+        """Got a pong from the old node, so update it."""
+        self.stats.completedAction('ping', start)
+        if dict['id'] == old.id:
+            self.table.justSeenNode(old.id)
+    
+    def _staleNodeHandler(self, err, old, start, node, contacted):
+        """The pinged node never responded, so replace it."""
+        log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage()))
+        self.stats.completedAction('ping', start)
+        self.table.invalidateNode(old)
+        self.insertNode(node, contacted)
+    
+    def _pongHandler(self, dict, start):
+        """Node responded properly, change response into a node to insert."""
+        self.stats.completedAction('ping', start)
+        # Create the node using the returned contact info
+        n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+        return n
 
+    def _pongError(self, err, node, start):
+        """Error occurred, fail node and errback or callback with error."""
+        log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
+        self.stats.completedAction('ping', start)
+        self.table.nodeFailed(node)
+    
     def sendJoin(self, node, callback=None, errback=None):
         """Join the DHT by pinging a bootstrap node.
         
@@ -237,31 +266,33 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to doing nothing with the results)
         @type errback: C{method}
         @param errback: the method to call if an error occurs
-            (optional, defaults to calling the callback with None)
+            (optional, defaults to calling the callback with the error)
         """
-
-        def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
-            """Node responded properly, callback with response."""
-            n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
-            self.stats.completedAction('join', start)
-            reactor.callLater(0, self.insertNode, n)
-            if callback:
-                callback((dict['ip_addr'], dict['port']))
-
-        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
-            """Error occurred, fail node and errback or callback with error."""
-            log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
-            self.stats.completedAction('join', start)
-            self.table.nodeFailed(node)
-            if errback:
-                errback()
-            elif callback:
-                callback(None)
-        
+        if errback is None:
+            errback = callback
         self.stats.startedAction('join')
         df = node.join(self.node.id)
-        df.addCallbacks(_pongHandler, _defaultPong)
-
+        df.addCallbacks(self._joinHandler, self._joinError,
+                        callbackArgs = (node, datetime.now()),
+                        errbackArgs = (node, datetime.now()))
+        if callback:
+            df.addCallbacks(callback, errback)
+
+    def _joinHandler(self, dict, node, start):
+        """Node responded properly, extract the response."""
+        self.stats.completedAction('join', start)
+        # Create the node using the returned contact info
+        n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+        reactor.callLater(0, self.insertNode, n)
+        return (dict['ip_addr'], dict['port'])
+
+    def _joinError(self, err, node, start):
+        """Error occurred, fail node."""
+        log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
+        self.stats.completedAction('join', start)
+        self.table.nodeFailed(node)
+        return err
+        
     def findCloseNodes(self, callback=lambda a: None):
         """Perform a findNode on the ID one away from our own.
 
@@ -528,12 +559,12 @@ class SimpleTests(unittest.TestCase):
     
     timeout = 10
     DHT_DEFAULTS = {'PORT': 9977,
-                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
-                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
-                    'KEY_EXPIRE': 3600, 'SPEW': False, }
+                    'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
+                    'KEY_EXPIRE': 3600, 'SPEW': True, }
 
     def setUp(self):
         d = self.DHT_DEFAULTS.copy()
@@ -551,10 +582,10 @@ class SimpleTests(unittest.TestCase):
 
     def testAddContact(self):
         self.failUnlessEqual(len(self.a.table.buckets), 1)
-        self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
 
         self.failUnlessEqual(len(self.b.table.buckets), 1)
-        self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
 
         self.a.addContact('127.0.0.1', 4045)
         reactor.iterate()
@@ -563,9 +594,9 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
 
         self.failUnlessEqual(len(self.a.table.buckets), 1)
-        self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
+        self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
         self.failUnlessEqual(len(self.b.table.buckets), 1)
-        self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
+        self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
 
     def testStoreRetrieve(self):
         self.a.addContact('127.0.0.1', 4045)
@@ -602,12 +633,12 @@ class MultiTest(unittest.TestCase):
     timeout = 30
     num = 20
     DHT_DEFAULTS = {'PORT': 9977,
-                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
-                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
-                    'KEY_EXPIRE': 3600, 'SPEW': False, }
+                    'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
+                    'KEY_EXPIRE': 3600, 'SPEW': True, }
 
     def _done(self, val):
         self.done = 1