When a node fails, schedule a future ping to check again.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
index 968ddeb63a6914a68d50919fe7083fd52a31582a..607c49bebabe50f6effdd0f85bd6c52fdfbfb764 100644 (file)
@@ -15,6 +15,7 @@ from copy import copy
 import os, re
 
 from twisted.internet.defer import Deferred
+from twisted.internet.base import DelayedCall
 from twisted.internet import protocol, reactor
 from twisted.python import log
 from twisted.trial import unittest
@@ -41,6 +42,9 @@ class KhashmirBase(protocol.Factory):
     @ivar _Node: the knode implementation to use for this class of DHT
     @type config: C{dictionary}
     @ivar config: the configuration parameters for the DHT
+    @type pinging: C{dictionary}
+    @ivar pinging: the node's that are currently being pinged, keys are the
+        node id's, values are the Deferred or DelayedCall objects
     @type port: C{int}
     @ivar port: the port to listen on
     @type store: L{db.DB}
@@ -73,6 +77,7 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to the /tmp directory)
         """
         self.config = None
+        self.pinging = {}
         self.setup(config, cache_dir)
         
     def setup(self, config, cache_dir):
@@ -217,47 +222,69 @@ class KhashmirBase(protocol.Factory):
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
             # Bucket is full, check to see if old node is still available
-            self.stats.startedAction('ping')
-            df = old.ping(self.node.id)
-            df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler,
-                            callbackArgs = (old, datetime.now()),
-                            errbackArgs = (old, datetime.now(), node, contacted))
+            df = self.sendPing(old)
+            df.addErrback(self._staleNodeHandler, old, node, contacted)
         elif not old and not contacted:
             # There's room, we just need to contact the node first
-            self.stats.startedAction('ping')
-            df = node.ping(self.node.id)
-            # Convert the returned contact info into a node
-            df.addCallback(self._pongHandler, datetime.now())
-            # Try adding the contacted node
-            df.addCallbacks(self.insertNode, self._pongError,
-                            errbackArgs = (node, datetime.now()))
-
-    def _freshNodeHandler(self, dict, old, start):
-        """Got a pong from the old node, so update it."""
-        self.stats.completedAction('ping', start)
-        if dict['id'] == old.id:
-            self.table.justSeenNode(old.id)
-    
-    def _staleNodeHandler(self, err, old, start, node, contacted):
+            self.sendPing(node)
+
+    def _staleNodeHandler(self, err, old, node, contacted):
         """The pinged node never responded, so replace it."""
-        log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage()))
-        self.stats.completedAction('ping', start)
         self.table.invalidateNode(old)
         self.insertNode(node, contacted)
+        return err
+    
+    def nodeFailed(self, node):
+        """Mark a node as having failed a request and schedule a future check.
+        
+        @type node: L{node.Node}
+        @param node: the new node to try and insert
+        """
+        exists = self.table.nodeFailed(node)
+        
+        # If in the table, schedule a ping, if one isn't already sent/scheduled
+        if exists and node.id not in self.pinging:
+            self.pinging[node,id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
+                                                      self.sendPing, node)
     
-    def _pongHandler(self, dict, start):
-        """Node responded properly, change response into a node to insert."""
+    def sendPing(self, node):
+        """Ping the node to see if it's still alive.
+        
+        @type node: L{node.Node}
+        @param node: the node to send the join to
+        """
+        # Check for a ping already underway
+        if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
+            self.pinging[node.id].active()):
+            self.pinging[node.id].cancel()
+        elif isinstance(self.pinging.get(node.id, None), Deferred):
+            return self.pinging[node.id]
+
+        self.stats.startedAction('ping')
+        df = node.ping(self.node.id)
+        self.pinging[node.id] = df
+        df.addCallbacks(self._pingHandler, self._pingError,
+                        callbackArgs = (node, datetime.now()),
+                        errbackArgs = (node, datetime.now()))
+        return df
+
+    def _pingHandler(self, dict, node, start):
+        """Node responded properly, update it and return the node object."""
         self.stats.completedAction('ping', start)
+        del self.pinging[node.id]
         # Create the node using the returned contact info
         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+        reactor.callLater(0, self.insertNode, n)
         return n
 
-    def _pongError(self, err, node, start):
-        """Error occurred, fail node and errback or callback with error."""
+    def _pingError(self, err, node, start):
+        """Error occurred, fail node."""
         log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
         self.stats.completedAction('ping', start)
-        self.table.nodeFailed(node)
-    
+        del self.pinging[node.id]
+        self.nodeFailed(node)
+        return err
+        
     def sendJoin(self, node, callback=None, errback=None):
         """Join the DHT by pinging a bootstrap node.
         
@@ -293,7 +320,7 @@ class KhashmirBase(protocol.Factory):
         """Error occurred, fail node."""
         log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
         self.stats.completedAction('join', start)
-        self.table.nodeFailed(node)
+        self.nodeFailed(node)
         return err
         
     def findCloseNodes(self, callback=lambda a: None):
@@ -334,6 +361,9 @@ class KhashmirBase(protocol.Factory):
             self.next_checkpoint.cancel()
         except:
             pass
+        for call in self.pinging:
+            if isinstance(call, DelayedCall) and call.active():
+                call.cancel()
         self.store.close()
     
     def getStats(self):
@@ -598,6 +628,10 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
 
         self.failUnlessEqual(len(self.a.table.buckets), 1)
         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
@@ -626,6 +660,20 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
 
     def _cb(self, key, val):
         if not val:
@@ -666,6 +714,9 @@ class MultiTest(unittest.TestCase):
             reactor.iterate()
             reactor.iterate()
             reactor.iterate() 
+            reactor.iterate()
+            reactor.iterate()
+            reactor.iterate() 
             
         for i in self.l:
             self.done = 0