]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/khashmir.py
Increase the concurrency of DHT requests to 8.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
index 15237a2338c141b5a0a6b46992a0fdbddb87bc75..f083da4763dc25f5a96fea8b50be9ca7488a89a7 100644 (file)
@@ -1,7 +1,9 @@
-## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
 
 
-"""The main Khashmir program."""
+"""The main Khashmir program.
+
+@var isLocal: a compiled regular expression suitable for testing if an
+    IP address is from a known local or private range
+"""
 
 import warnings
 warnings.simplefilter("ignore", DeprecationWarning)
 
 import warnings
 warnings.simplefilter("ignore", DeprecationWarning)
@@ -10,7 +12,7 @@ from datetime import datetime, timedelta
 from random import randrange, shuffle
 from sha import sha
 from copy import copy
 from random import randrange, shuffle
 from sha import sha
 from copy import copy
-import os
+import os, re
 
 from twisted.internet.defer import Deferred
 from twisted.internet import protocol, reactor
 
 from twisted.internet.defer import Deferred
 from twisted.internet import protocol, reactor
@@ -25,6 +27,13 @@ from actions import FindNode, FindValue, GetValue, StoreValue
 from stats import StatsLogger
 import krpc
 
 from stats import StatsLogger
 import krpc
 
+isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
+                     '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
+                     '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
+                     '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
+                     '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
+                     '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
+
 class KhashmirBase(protocol.Factory):
     """The base Khashmir class, with base functionality and find node, no key-value mappings.
     
 class KhashmirBase(protocol.Factory):
     """The base Khashmir class, with base functionality and find node, no key-value mappings.
     
@@ -157,7 +166,7 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to doing nothing with the results)
         @type errback: C{method}
         @param errback: the method to call if an error occurs
             (optional, defaults to doing nothing with the results)
         @type errback: C{method}
         @param errback: the method to call if an error occurs
-            (optional, defaults to calling the callback with None)
+            (optional, defaults to calling the callback with the error)
         """
         n = self.Node(NULL_ID, host, port)
         self.sendJoin(n, callback=callback, errback=errback)
         """
         n = self.Node(NULL_ID, host, port)
         self.sendJoin(n, callback=callback, errback=errback)
@@ -183,7 +192,7 @@ class KhashmirBase(protocol.Factory):
         
         If all you have is a host/port, then use L{addContact}, which calls this
         method after receiving the PONG from the remote node. The reason for
         
         If all you have is a host/port, then use L{addContact}, which calls this
         method after receiving the PONG from the remote node. The reason for
-        the seperation is we can't insert a node into the table without its
+        the separation is we can't insert a node into the table without its
         node ID. That means of course the node passed into this method needs
         to be a properly formed Node object with a valid ID.
 
         node ID. That means of course the node passed into this method needs
         to be a properly formed Node object with a valid ID.
 
@@ -193,28 +202,59 @@ class KhashmirBase(protocol.Factory):
         @param contacted: whether the new node is known to be good, i.e.
             responded to a request (optional, defaults to True)
         """
         @param contacted: whether the new node is known to be good, i.e.
             responded to a request (optional, defaults to True)
         """
+        # Don't add any local nodes to the routing table
+        if not self.config['LOCAL_OK'] and isLocal.match(node.host):
+            log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
+            return
+        
         old = self.table.insertNode(node, contacted=contacted)
         old = self.table.insertNode(node, contacted=contacted)
-        if (old and old.id != self.node.id and
+
+        if (isinstance(old, self._Node) and old.id != self.node.id and
             (datetime.now() - old.lastSeen) > 
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
             (datetime.now() - old.lastSeen) > 
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
-            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
-                """The pinged node never responded, so replace it."""
-                log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage()))
-                self.stats.completedAction('ping', start)
-                self.table.replaceStaleNode(oldnode, newnode)
-            
-            def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
-                """Got a pong from the old node, so update it."""
-                self.stats.completedAction('ping', start)
-                if dict['id'] == old.id:
-                    self.table.justSeenNode(old.id)
-            
             # Bucket is full, check to see if old node is still available
             self.stats.startedAction('ping')
             df = old.ping(self.node.id)
             # Bucket is full, check to see if old node is still available
             self.stats.startedAction('ping')
             df = old.ping(self.node.id)
-            df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
+            df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler,
+                            callbackArgs = (old, datetime.now()),
+                            errbackArgs = (old, datetime.now(), node, contacted))
+        elif not old and not contacted:
+            # There's room, we just need to contact the node first
+            self.stats.startedAction('ping')
+            df = node.ping(self.node.id)
+            # Convert the returned contact info into a node
+            df.addCallback(self._pongHandler, datetime.now())
+            # Try adding the contacted node
+            df.addCallbacks(self.insertNode, self._pongError,
+                            errbackArgs = (node, datetime.now()))
+
+    def _freshNodeHandler(self, dict, old, start):
+        """Got a pong from the old node, so update it."""
+        self.stats.completedAction('ping', start)
+        if dict['id'] == old.id:
+            self.table.justSeenNode(old.id)
+    
+    def _staleNodeHandler(self, err, old, start, node, contacted):
+        """The pinged node never responded, so replace it."""
+        log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage()))
+        self.stats.completedAction('ping', start)
+        self.table.invalidateNode(old)
+        self.insertNode(node, contacted)
+    
+    def _pongHandler(self, dict, start):
+        """Node responded properly, change response into a node to insert."""
+        self.stats.completedAction('ping', start)
+        # Create the node using the returned contact info
+        n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+        return n
 
 
+    def _pongError(self, err, node, start):
+        """Error occurred, fail node and errback or callback with error."""
+        log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
+        self.stats.completedAction('ping', start)
+        self.table.nodeFailed(node)
+    
     def sendJoin(self, node, callback=None, errback=None):
         """Join the DHT by pinging a bootstrap node.
         
     def sendJoin(self, node, callback=None, errback=None):
         """Join the DHT by pinging a bootstrap node.
         
@@ -226,31 +266,33 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to doing nothing with the results)
         @type errback: C{method}
         @param errback: the method to call if an error occurs
             (optional, defaults to doing nothing with the results)
         @type errback: C{method}
         @param errback: the method to call if an error occurs
-            (optional, defaults to calling the callback with None)
+            (optional, defaults to calling the callback with the error)
         """
         """
-
-        def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
-            """Node responded properly, callback with response."""
-            n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
-            self.stats.completedAction('join', start)
-            self.insertNode(n)
-            if callback:
-                callback((dict['ip_addr'], dict['port']))
-
-        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
-            """Error occurred, fail node and errback or callback with error."""
-            log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
-            self.stats.completedAction('join', start)
-            self.table.nodeFailed(node)
-            if errback:
-                errback()
-            elif callback:
-                callback(None)
-        
+        if errback is None:
+            errback = callback
         self.stats.startedAction('join')
         df = node.join(self.node.id)
         self.stats.startedAction('join')
         df = node.join(self.node.id)
-        df.addCallbacks(_pongHandler, _defaultPong)
-
+        df.addCallbacks(self._joinHandler, self._joinError,
+                        callbackArgs = (node, datetime.now()),
+                        errbackArgs = (node, datetime.now()))
+        if callback:
+            df.addCallbacks(callback, errback)
+
+    def _joinHandler(self, dict, node, start):
+        """Node responded properly, extract the response."""
+        self.stats.completedAction('join', start)
+        # Create the node using the returned contact info
+        n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+        reactor.callLater(0, self.insertNode, n)
+        return (dict['ip_addr'], dict['port'])
+
+    def _joinError(self, err, node, start):
+        """Error occurred, fail node."""
+        log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
+        self.stats.completedAction('join', start)
+        self.table.nodeFailed(node)
+        return err
+        
     def findCloseNodes(self, callback=lambda a: None):
         """Perform a findNode on the ID one away from our own.
 
     def findCloseNodes(self, callback=lambda a: None):
         """Perform a findNode on the ID one away from our own.
 
@@ -306,7 +348,7 @@ class KhashmirBase(protocol.Factory):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
 
         return {"id" : self.node.id}
         
 
         return {"id" : self.node.id}
         
@@ -320,7 +362,7 @@ class KhashmirBase(protocol.Factory):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
@@ -338,7 +380,7 @@ class KhashmirBase(protocol.Factory):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
@@ -414,7 +456,7 @@ class KhashmirRead(KhashmirBase):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
     
         nodes = self.table.findNodes(key)
         nodes = map(lambda node: node.contactInfo(), nodes)
     
         nodes = self.table.findNodes(key)
         nodes = map(lambda node: node.contactInfo(), nodes)
@@ -436,7 +478,7 @@ class KhashmirRead(KhashmirBase):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
     
         l = self.store.retrieveValues(key)
         if num == 0 or num >= len(l):
     
         l = self.store.retrieveValues(key)
         if num == 0 or num >= len(l):
@@ -496,7 +538,7 @@ class KhashmirWrite(KhashmirRead):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
@@ -517,12 +559,12 @@ class SimpleTests(unittest.TestCase):
     
     timeout = 10
     DHT_DEFAULTS = {'PORT': 9977,
     
     timeout = 10
     DHT_DEFAULTS = {'PORT': 9977,
-                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
-                    'MAX_FAILURES': 3,
+                    'MAX_FAILURES': 3, 'LOCAL_OK': True,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
-                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
-                    'KEY_EXPIRE': 3600, 'SPEW': False, }
+                    'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
+                    'KEY_EXPIRE': 3600, 'SPEW': True, }
 
     def setUp(self):
         d = self.DHT_DEFAULTS.copy()
 
     def setUp(self):
         d = self.DHT_DEFAULTS.copy()
@@ -540,10 +582,10 @@ class SimpleTests(unittest.TestCase):
 
     def testAddContact(self):
         self.failUnlessEqual(len(self.a.table.buckets), 1)
 
     def testAddContact(self):
         self.failUnlessEqual(len(self.a.table.buckets), 1)
-        self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
 
         self.failUnlessEqual(len(self.b.table.buckets), 1)
 
         self.failUnlessEqual(len(self.b.table.buckets), 1)
-        self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
 
         self.a.addContact('127.0.0.1', 4045)
         reactor.iterate()
 
         self.a.addContact('127.0.0.1', 4045)
         reactor.iterate()
@@ -552,9 +594,9 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
 
         self.failUnlessEqual(len(self.a.table.buckets), 1)
         reactor.iterate()
 
         self.failUnlessEqual(len(self.a.table.buckets), 1)
-        self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
+        self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
         self.failUnlessEqual(len(self.b.table.buckets), 1)
         self.failUnlessEqual(len(self.b.table.buckets), 1)
-        self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
+        self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
 
     def testStoreRetrieve(self):
         self.a.addContact('127.0.0.1', 4045)
 
     def testStoreRetrieve(self):
         self.a.addContact('127.0.0.1', 4045)
@@ -591,12 +633,12 @@ class MultiTest(unittest.TestCase):
     timeout = 30
     num = 20
     DHT_DEFAULTS = {'PORT': 9977,
     timeout = 30
     num = 20
     DHT_DEFAULTS = {'PORT': 9977,
-                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
-                    'MAX_FAILURES': 3,
+                    'MAX_FAILURES': 3, 'LOCAL_OK': True,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
-                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
-                    'KEY_EXPIRE': 3600, 'SPEW': False, }
+                    'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
+                    'KEY_EXPIRE': 3600, 'SPEW': True, }
 
     def _done(self, val):
         self.done = 1
 
     def _done(self, val):
         self.done = 1