Only add nodes to the routing table that have responded to a request.
authorCameron Dale <camrdale@gmail.com>
Tue, 29 Apr 2008 00:44:45 +0000 (17:44 -0700)
committerCameron Dale <camrdale@gmail.com>
Tue, 29 Apr 2008 00:44:45 +0000 (17:44 -0700)
If they haven't, ping them before adding them.
Still ping the old node first to see if it's stale.
Move the temporary functions for ping/join to be module functions.

apt_p2p_Khashmir/DHT.py
apt_p2p_Khashmir/khashmir.py
apt_p2p_Khashmir/ktable.py

index af99f053c689800cfce81f18a2ba26170f2ab069..cc3660c5679149e78e68447cacb3ab3f13418234 100644 (file)
@@ -338,7 +338,7 @@ class TestSimpleDHT(unittest.TestCase):
                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
-                    'KEY_EXPIRE': 3600, 'SPEW': False, }
+                    'KEY_EXPIRE': 3600, 'SPEW': True, }
 
     def setUp(self):
         self.a = DHT()
@@ -459,7 +459,7 @@ class TestMultiDHT(unittest.TestCase):
                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
-                    'KEY_EXPIRE': 3600, 'SPEW': False, }
+                    'KEY_EXPIRE': 3600, 'SPEW': True, }
 
     def setUp(self):
         self.l = []
index 294940c545c1cb5fa13af1fb3aa68ea7d1a747f9..408a7e974b0f32a900a4c255d38fa59411ed6cc5 100644 (file)
@@ -164,7 +164,7 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to doing nothing with the results)
         @type errback: C{method}
         @param errback: the method to call if an error occurs
-            (optional, defaults to calling the callback with None)
+            (optional, defaults to calling the callback with the error)
         """
         n = self.Node(NULL_ID, host, port)
         self.sendJoin(n, callback=callback, errback=errback)
@@ -190,7 +190,7 @@ class KhashmirBase(protocol.Factory):
         
         If all you have is a host/port, then use L{addContact}, which calls this
         method after receiving the PONG from the remote node. The reason for
-        the seperation is we can't insert a node into the table without its
+        the separation is we can't insert a node into the table without its
         node ID. That means of course the node passed into this method needs
         to be a properly formed Node object with a valid ID.
 
@@ -202,30 +202,57 @@ class KhashmirBase(protocol.Factory):
         """
         # Don't add any local nodes to the routing table
         if not self.config['LOCAL_OK'] and isLocal.match(node.host):
+            log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
             return
-
+        
         old = self.table.insertNode(node, contacted=contacted)
-        if (old and old.id != self.node.id and
+
+        if (isinstance(old, self._Node) and old.id != self.node.id and
             (datetime.now() - old.lastSeen) > 
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
-            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
-                """The pinged node never responded, so replace it."""
-                log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage()))
-                self.stats.completedAction('ping', start)
-                self.table.replaceStaleNode(oldnode, newnode)
-            
-            def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
-                """Got a pong from the old node, so update it."""
-                self.stats.completedAction('ping', start)
-                if dict['id'] == old.id:
-                    self.table.justSeenNode(old.id)
-            
             # Bucket is full, check to see if old node is still available
             self.stats.startedAction('ping')
             df = old.ping(self.node.id)
-            df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
+            df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler,
+                            callbackArgs = (old, datetime.now()),
+                            errbackArgs = (old, datetime.now(), node, contacted))
+        elif not old and not contacted:
+            # There's room, we just need to contact the node first
+            self.stats.startedAction('ping')
+            df = node.ping(self.node.id)
+            # Convert the returned contact info into a node
+            df.addCallback(self._pongHandler, datetime.now())
+            # Try adding the contacted node
+            df.addCallbacks(self.insertNode, self._pongError,
+                            errbackArgs = (node, datetime.now()))
+
+    def _freshNodeHandler(self, dict, old, start):
+        """Got a pong from the old node, so update it."""
+        self.stats.completedAction('ping', start)
+        if dict['id'] == old.id:
+            self.table.justSeenNode(old.id)
+    
+    def _staleNodeHandler(self, err, old, start, node, contacted):
+        """The pinged node never responded, so replace it."""
+        log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage()))
+        self.stats.completedAction('ping', start)
+        self.table.invalidateNode(old)
+        self.insertNode(node, contacted)
+    
+    def _pongHandler(self, dict, start):
+        """Node responded properly, change response into a node to insert."""
+        self.stats.completedAction('ping', start)
+        # Create the node using the returned contact info
+        n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+        return n
 
+    def _pongError(self, err, node, start):
+        """Error occurred, fail node and errback or callback with error."""
+        log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
+        self.stats.completedAction('ping', start)
+        self.table.nodeFailed(node)
+    
     def sendJoin(self, node, callback=None, errback=None):
         """Join the DHT by pinging a bootstrap node.
         
@@ -237,31 +264,33 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to doing nothing with the results)
         @type errback: C{method}
         @param errback: the method to call if an error occurs
-            (optional, defaults to calling the callback with None)
+            (optional, defaults to calling the callback with the error)
         """
-
-        def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
-            """Node responded properly, callback with response."""
-            n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
-            self.stats.completedAction('join', start)
-            reactor.callLater(0, self.insertNode, n)
-            if callback:
-                callback((dict['ip_addr'], dict['port']))
-
-        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
-            """Error occurred, fail node and errback or callback with error."""
-            log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
-            self.stats.completedAction('join', start)
-            self.table.nodeFailed(node)
-            if errback:
-                errback()
-            elif callback:
-                callback(None)
-        
+        if errback is None:
+            errback = callback
         self.stats.startedAction('join')
         df = node.join(self.node.id)
-        df.addCallbacks(_pongHandler, _defaultPong)
-
+        df.addCallbacks(self._joinHandler, self._joinError,
+                        callbackArgs = (node, datetime.now()),
+                        errbackArgs = (node, datetime.now()))
+        if callback:
+            df.addCallbacks(callback, errback)
+
+    def _joinHandler(self, dict, node, start):
+        """Node responded properly, extract the response."""
+        self.stats.completedAction('join', start)
+        # Create the node using the returned contact info
+        n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+        reactor.callLater(0, self.insertNode, n)
+        return (dict['ip_addr'], dict['port'])
+
+    def _joinError(self, err, node, start):
+        """Error occurred, fail node."""
+        log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
+        self.stats.completedAction('join', start)
+        self.table.nodeFailed(node)
+        return err
+        
     def findCloseNodes(self, callback=lambda a: None):
         """Perform a findNode on the ID one away from our own.
 
@@ -533,7 +562,7 @@ class SimpleTests(unittest.TestCase):
                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
-                    'KEY_EXPIRE': 3600, 'SPEW': False, }
+                    'KEY_EXPIRE': 3600, 'SPEW': True, }
 
     def setUp(self):
         d = self.DHT_DEFAULTS.copy()
@@ -607,7 +636,7 @@ class MultiTest(unittest.TestCase):
                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
-                    'KEY_EXPIRE': 3600, 'SPEW': False, }
+                    'KEY_EXPIRE': 3600, 'SPEW': True, }
 
     def _done(self, val):
         self.done = 1
index 007ff1c998a756c085e1e0be1b17674f42d8b68a..e91c0db9843cf250fb5ab46bc0226b21150be314 100644 (file)
@@ -173,6 +173,7 @@ class KTable:
             # Remove the stale node
             del(self.buckets[i].l[it])
             removed = True
+            log.msg('Removed node from routing table: %s/%s' % (stale.host, stale.port))
         
         # Insert the new node
         if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < K:
@@ -183,7 +184,8 @@ class KTable:
     def insertNode(self, node, contacted = True):
         """Try to insert a node in the routing table.
         
-        This inserts the node, returning None if successful, otherwise returns
+        This inserts the node, returning True if successful, False if the
+        node could have been added if it responds to a ping, otherwise returns
         the oldest node in the bucket if it's full. The caller is then
         responsible for pinging the returned node and calling replaceStaleNode
         if it doesn't respond. contacted means that yes, we contacted THEM and
@@ -194,11 +196,13 @@ class KTable:
         @type contacted: C{boolean}
         @param contacted: whether the new node is known to be good, i.e.
             responded to a request (optional, defaults to True)
-        @rtype: L{node.Node}
-        @return: None if successful (the bucket wasn't full), otherwise returns the oldest node in the bucket
+        @rtype: L{node.Node} or C{boolean}
+        @return: True if successful (the bucket wasn't full), False if the
+            node could have been added if it was contacted, otherwise
+            returns the oldest node in the bucket
         """
         assert node.id != NULL_ID
-        if node.id == self.node.id: return
+        if node.id == self.node.id: return True
 
         # Get the bucket for this node
         i = self._bucketIndexForInt(node.num)
@@ -219,16 +223,18 @@ class KTable:
                 # utilizing this nodes new contact info
                 self.buckets[i].l.append(node)
                 self.buckets[i].touch()
-            return
+            return True
         
         # We don't have this node, check to see if the bucket is full
         if len(self.buckets[i].l) < K:
             # Not full, append this node and return
             if contacted:
                 node.updateLastSeen()
-            self.buckets[i].l.append(node)
-            self.buckets[i].touch()
-            return
+                self.buckets[i].l.append(node)
+                self.buckets[i].touch()
+                log.msg('Added node to routing table: %s/%s' % (node.host, node.port))
+                return True
+            return False
             
         # Bucket is full, check to see if the local node is not in the bucket
         if not (self.buckets[i].min <= self.node < self.buckets[i].max):