When a node fails, schedule a future ping to check again.
authorCameron Dale <camrdale@gmail.com>
Sat, 10 May 2008 00:18:20 +0000 (17:18 -0700)
committerCameron Dale <camrdale@gmail.com>
Sat, 10 May 2008 00:18:20 +0000 (17:18 -0700)
Should help to eliminate bad nodes faster.
To facilitate, added a sendPing() to Khashmir.
insertNode now uses sendPing() rather than it's own routines.
Pinging is now stateful, so that multiple pings are not sent simultaneously.

apt_p2p_Khashmir/DHT.py
apt_p2p_Khashmir/actions.py
apt_p2p_Khashmir/khashmir.py
apt_p2p_Khashmir/ktable.py

index 1236ebc..ddcaa7a 100644 (file)
@@ -477,7 +477,7 @@ class TestMultiDHT(unittest.TestCase):
         if next_node + 1 < len(self.l):
             d.addCallback(self.node_join, next_node + 1)
         else:
-            d.addCallback(self.lastDefer.callback)
+            reactor.callLater(1, d.addCallback, self.lastDefer.callback)
     
     def test_join(self):
         self.timeout = 2
index 865561c..c49959f 100644 (file)
@@ -207,7 +207,7 @@ class ActionBase:
         """Receive an error from a remote node."""
         log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage()))
         if node.id != self.caller.node.id:
-            self.caller.table.nodeFailed(node)
+            self.caller.nodeFailed(node)
         self.failed[node.id] = 1
         if self.outstanding.has_key(node.id):
             self.outstanding_results -= self.outstanding[node.id]
index 968ddeb..607c49b 100644 (file)
@@ -15,6 +15,7 @@ from copy import copy
 import os, re
 
 from twisted.internet.defer import Deferred
+from twisted.internet.base import DelayedCall
 from twisted.internet import protocol, reactor
 from twisted.python import log
 from twisted.trial import unittest
@@ -41,6 +42,9 @@ class KhashmirBase(protocol.Factory):
     @ivar _Node: the knode implementation to use for this class of DHT
     @type config: C{dictionary}
     @ivar config: the configuration parameters for the DHT
+    @type pinging: C{dictionary}
+    @ivar pinging: the node's that are currently being pinged, keys are the
+        node id's, values are the Deferred or DelayedCall objects
     @type port: C{int}
     @ivar port: the port to listen on
     @type store: L{db.DB}
@@ -73,6 +77,7 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to the /tmp directory)
         """
         self.config = None
+        self.pinging = {}
         self.setup(config, cache_dir)
         
     def setup(self, config, cache_dir):
@@ -217,47 +222,69 @@ class KhashmirBase(protocol.Factory):
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
             # Bucket is full, check to see if old node is still available
-            self.stats.startedAction('ping')
-            df = old.ping(self.node.id)
-            df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler,
-                            callbackArgs = (old, datetime.now()),
-                            errbackArgs = (old, datetime.now(), node, contacted))
+            df = self.sendPing(old)
+            df.addErrback(self._staleNodeHandler, old, node, contacted)
         elif not old and not contacted:
             # There's room, we just need to contact the node first
-            self.stats.startedAction('ping')
-            df = node.ping(self.node.id)
-            # Convert the returned contact info into a node
-            df.addCallback(self._pongHandler, datetime.now())
-            # Try adding the contacted node
-            df.addCallbacks(self.insertNode, self._pongError,
-                            errbackArgs = (node, datetime.now()))
-
-    def _freshNodeHandler(self, dict, old, start):
-        """Got a pong from the old node, so update it."""
-        self.stats.completedAction('ping', start)
-        if dict['id'] == old.id:
-            self.table.justSeenNode(old.id)
-    
-    def _staleNodeHandler(self, err, old, start, node, contacted):
+            self.sendPing(node)
+
+    def _staleNodeHandler(self, err, old, node, contacted):
         """The pinged node never responded, so replace it."""
-        log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage()))
-        self.stats.completedAction('ping', start)
         self.table.invalidateNode(old)
         self.insertNode(node, contacted)
+        return err
+    
+    def nodeFailed(self, node):
+        """Mark a node as having failed a request and schedule a future check.
+        
+        @type node: L{node.Node}
+        @param node: the new node to try and insert
+        """
+        exists = self.table.nodeFailed(node)
+        
+        # If in the table, schedule a ping, if one isn't already sent/scheduled
+        if exists and node.id not in self.pinging:
+            self.pinging[node,id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
+                                                      self.sendPing, node)
     
-    def _pongHandler(self, dict, start):
-        """Node responded properly, change response into a node to insert."""
+    def sendPing(self, node):
+        """Ping the node to see if it's still alive.
+        
+        @type node: L{node.Node}
+        @param node: the node to send the join to
+        """
+        # Check for a ping already underway
+        if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
+            self.pinging[node.id].active()):
+            self.pinging[node.id].cancel()
+        elif isinstance(self.pinging.get(node.id, None), Deferred):
+            return self.pinging[node.id]
+
+        self.stats.startedAction('ping')
+        df = node.ping(self.node.id)
+        self.pinging[node.id] = df
+        df.addCallbacks(self._pingHandler, self._pingError,
+                        callbackArgs = (node, datetime.now()),
+                        errbackArgs = (node, datetime.now()))
+        return df
+
+    def _pingHandler(self, dict, node, start):
+        """Node responded properly, update it and return the node object."""
         self.stats.completedAction('ping', start)
+        del self.pinging[node.id]
         # Create the node using the returned contact info
         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+        reactor.callLater(0, self.insertNode, n)
         return n
 
-    def _pongError(self, err, node, start):
-        """Error occurred, fail node and errback or callback with error."""
+    def _pingError(self, err, node, start):
+        """Error occurred, fail node."""
         log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
         self.stats.completedAction('ping', start)
-        self.table.nodeFailed(node)
-    
+        del self.pinging[node.id]
+        self.nodeFailed(node)
+        return err
+        
     def sendJoin(self, node, callback=None, errback=None):
         """Join the DHT by pinging a bootstrap node.
         
@@ -293,7 +320,7 @@ class KhashmirBase(protocol.Factory):
         """Error occurred, fail node."""
         log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
         self.stats.completedAction('join', start)
-        self.table.nodeFailed(node)
+        self.nodeFailed(node)
         return err
         
     def findCloseNodes(self, callback=lambda a: None):
@@ -334,6 +361,9 @@ class KhashmirBase(protocol.Factory):
             self.next_checkpoint.cancel()
         except:
             pass
+        for call in self.pinging:
+            if isinstance(call, DelayedCall) and call.active():
+                call.cancel()
         self.store.close()
     
     def getStats(self):
@@ -598,6 +628,10 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
 
         self.failUnlessEqual(len(self.a.table.buckets), 1)
         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
@@ -626,6 +660,20 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
 
     def _cb(self, key, val):
         if not val:
@@ -666,6 +714,9 @@ class MultiTest(unittest.TestCase):
             reactor.iterate()
             reactor.iterate()
             reactor.iterate() 
+            reactor.iterate()
+            reactor.iterate()
+            reactor.iterate() 
             
         for i in self.l:
             self.done = 0
index 199c626..201d494 100644 (file)
@@ -262,7 +262,10 @@ class KTable:
         self.replaceStaleNode(n)
     
     def nodeFailed(self, node):
-        """Mark a node as having failed once, and remove it if it has failed too much."""
+        """Mark a node as having failed once, and remove it if it has failed too much.
+        
+        @return: whether the node is in the routing table
+        """
         # Get the bucket number
         num = self._nodeNum(node)
         i = self._bucketIndexForInt(num)
@@ -271,11 +274,13 @@ class KTable:
         try:
             n = self.buckets[i].node(num)
         except ValueError:
-            return None
+            return False
         else:
             # The node is in the bucket
             if n.msgFailed() >= self.config['MAX_FAILURES']:
                 self.invalidateNode(n)
+                return False
+            return True
                         
 class KBucket:
     """Single bucket of nodes in a kademlia-like routing table.