]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/khashmir.py
Use the version number in the Khashmir node ID.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
index c89870e86c88d56ce2713c99c6e6bcd2bacc80cf..f711a6a62a315964a339b03085d809079b0a2b6d 100644 (file)
@@ -15,6 +15,7 @@ from copy import copy
 import os, re
 
 from twisted.internet.defer import Deferred
+from twisted.internet.base import DelayedCall
 from twisted.internet import protocol, reactor
 from twisted.python import log
 from twisted.trial import unittest
@@ -41,6 +42,9 @@ class KhashmirBase(protocol.Factory):
     @ivar _Node: the knode implementation to use for this class of DHT
     @type config: C{dictionary}
     @ivar config: the configuration parameters for the DHT
+    @type pinging: C{dictionary}
+    @ivar pinging: the node's that are currently being pinged, keys are the
+        node id's, values are the Deferred or DelayedCall objects
     @type port: C{int}
     @ivar port: the port to listen on
     @type store: L{db.DB}
@@ -73,6 +77,7 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to the /tmp directory)
         """
         self.config = None
+        self.pinging = {}
         self.setup(config, cache_dir)
         
     def setup(self, config, cache_dir):
@@ -118,8 +123,8 @@ class KhashmirBase(protocol.Factory):
     def _loadSelfNode(self, host, port):
         """Create this node, loading any previously saved one."""
         id = self.store.getSelfNode()
-        if not id:
-            id = newID()
+        if not id or not id.endswith(self.config['VERSION']):
+            id = newID(self.config['VERSION'])
         return self._Node(id, host, port)
         
     def checkpoint(self):
@@ -180,6 +185,9 @@ class KhashmirBase(protocol.Factory):
         @param callback: the method to call with the results, it must take 1
             parameter, the list of K closest nodes
         """
+        # Mark the bucket as having been accessed
+        self.table.touch(id)
+        
         # Start with our node
         nodes = [copy(self.node)]
 
@@ -214,47 +222,76 @@ class KhashmirBase(protocol.Factory):
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
             # Bucket is full, check to see if old node is still available
-            self.stats.startedAction('ping')
-            df = old.ping(self.node.id)
-            df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler,
-                            callbackArgs = (old, datetime.now()),
-                            errbackArgs = (old, datetime.now(), node, contacted))
+            df = self.sendPing(old)
+            df.addErrback(self._staleNodeHandler, old, node, contacted)
         elif not old and not contacted:
             # There's room, we just need to contact the node first
-            self.stats.startedAction('ping')
-            df = node.ping(self.node.id)
-            # Convert the returned contact info into a node
-            df.addCallback(self._pongHandler, datetime.now())
-            # Try adding the contacted node
-            df.addCallbacks(self.insertNode, self._pongError,
-                            errbackArgs = (node, datetime.now()))
-
-    def _freshNodeHandler(self, dict, old, start):
-        """Got a pong from the old node, so update it."""
-        self.stats.completedAction('ping', start)
-        if dict['id'] == old.id:
-            self.table.justSeenNode(old.id)
-    
-    def _staleNodeHandler(self, err, old, start, node, contacted):
+            df = self.sendPing(node)
+            # Also schedule a future ping to make sure the node works
+            def rePing(newnode, self = self):
+                if newnode.id not in self.pinging:
+                    self.pinging[newnode.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
+                                                                 self.sendPing, newnode)
+                return newnode
+            df.addCallback(rePing)
+
+    def _staleNodeHandler(self, err, old, node, contacted):
         """The pinged node never responded, so replace it."""
-        log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage()))
-        self.stats.completedAction('ping', start)
         self.table.invalidateNode(old)
         self.insertNode(node, contacted)
+        return err
+    
+    def nodeFailed(self, node):
+        """Mark a node as having failed a request and schedule a future check.
+        
+        @type node: L{node.Node}
+        @param node: the new node to try and insert
+        """
+        exists = self.table.nodeFailed(node)
+        
+        # If in the table, schedule a ping, if one isn't already sent/scheduled
+        if exists and node.id not in self.pinging:
+            self.pinging[node.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
+                                                      self.sendPing, node)
     
-    def _pongHandler(self, dict, start):
-        """Node responded properly, change response into a node to insert."""
+    def sendPing(self, node):
+        """Ping the node to see if it's still alive.
+        
+        @type node: L{node.Node}
+        @param node: the node to send the join to
+        """
+        # Check for a ping already underway
+        if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
+            self.pinging[node.id].active()):
+            self.pinging[node.id].cancel()
+        elif isinstance(self.pinging.get(node.id, None), Deferred):
+            return self.pinging[node.id]
+
+        self.stats.startedAction('ping')
+        df = node.ping(self.node.id)
+        self.pinging[node.id] = df
+        df.addCallbacks(self._pingHandler, self._pingError,
+                        callbackArgs = (node, datetime.now()),
+                        errbackArgs = (node, datetime.now()))
+        return df
+
+    def _pingHandler(self, dict, node, start):
+        """Node responded properly, update it and return the node object."""
         self.stats.completedAction('ping', start)
+        del self.pinging[node.id]
         # Create the node using the returned contact info
         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+        reactor.callLater(0, self.insertNode, n)
         return n
 
-    def _pongError(self, err, node, start):
-        """Error occurred, fail node and errback or callback with error."""
+    def _pingError(self, err, node, start):
+        """Error occurred, fail node."""
         log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
         self.stats.completedAction('ping', start)
-        self.table.nodeFailed(node)
-    
+        del self.pinging[node.id]
+        self.nodeFailed(node)
+        return err
+        
     def sendJoin(self, node, callback=None, errback=None):
         """Join the DHT by pinging a bootstrap node.
         
@@ -290,7 +327,7 @@ class KhashmirBase(protocol.Factory):
         """Error occurred, fail node."""
         log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
         self.stats.completedAction('join', start)
-        self.table.nodeFailed(node)
+        self.nodeFailed(node)
         return err
         
     def findCloseNodes(self, callback=lambda a: None):
@@ -331,6 +368,10 @@ class KhashmirBase(protocol.Factory):
             self.next_checkpoint.cancel()
         except:
             pass
+        for nodeid in self.pinging.keys():
+            if isinstance(self.pinging[nodeid], DelayedCall) and self.pinging[nodeid].active():
+                self.pinging[nodeid].cancel()
+                del self.pinging[nodeid]
         self.store.close()
     
     def getStats(self):
@@ -405,6 +446,9 @@ class KhashmirRead(KhashmirBase):
         @param callback: the method to call with the results, it must take 1
             parameter, the list of nodes with values
         """
+        # Mark the bucket as having been accessed
+        self.table.touch(key)
+        
         # Start with ourself
         nodes = [copy(self.node)]
         
@@ -558,9 +602,9 @@ class Khashmir(KhashmirWrite):
 class SimpleTests(unittest.TestCase):
     
     timeout = 10
-    DHT_DEFAULTS = {'PORT': 9977,
-                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
-                    'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+    DHT_DEFAULTS = {'VERSION': 'A000', 'PORT': 9977,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
+                    'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
@@ -592,6 +636,10 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
 
         self.failUnlessEqual(len(self.a.table.buckets), 1)
         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
@@ -620,6 +668,20 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
 
     def _cb(self, key, val):
         if not val:
@@ -632,9 +694,9 @@ class MultiTest(unittest.TestCase):
     
     timeout = 30
     num = 20
-    DHT_DEFAULTS = {'PORT': 9977,
-                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
-                    'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+    DHT_DEFAULTS = {'VERSION': 'A000', 'PORT': 9977,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
+                    'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
@@ -660,6 +722,9 @@ class MultiTest(unittest.TestCase):
             reactor.iterate()
             reactor.iterate()
             reactor.iterate() 
+            reactor.iterate()
+            reactor.iterate()
+            reactor.iterate() 
             
         for i in self.l:
             self.done = 0