]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/khashmir.py
Don't add local IP addresses to the routing table (with config option to override).
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
index 7946523da0f266d3efe18aa5b55edc52474e5cc2..294940c545c1cb5fa13af1fb3aa68ea7d1a747f9 100644 (file)
@@ -1,7 +1,9 @@
-## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
 
 
-"""The main Khashmir program."""
+"""The main Khashmir program.
+
+@var isLocal: a compiled regular expression suitable for testing if an
+    IP address is from a known local or private range
+"""
 
 import warnings
 warnings.simplefilter("ignore", DeprecationWarning)
 
 import warnings
 warnings.simplefilter("ignore", DeprecationWarning)
@@ -10,7 +12,7 @@ from datetime import datetime, timedelta
 from random import randrange, shuffle
 from sha import sha
 from copy import copy
 from random import randrange, shuffle
 from sha import sha
 from copy import copy
-import os
+import os, re
 
 from twisted.internet.defer import Deferred
 from twisted.internet import protocol, reactor
 
 from twisted.internet.defer import Deferred
 from twisted.internet import protocol, reactor
@@ -25,6 +27,11 @@ from actions import FindNode, FindValue, GetValue, StoreValue
 from stats import StatsLogger
 import krpc
 
 from stats import StatsLogger
 import krpc
 
+isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
+                     '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
+                     '(172\.0?([1][6-9])|([2][0-9])|([3][0-1])\.[0-9]{1,3}\.[0-9]{1,3})|'+
+                     '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
+
 class KhashmirBase(protocol.Factory):
     """The base Khashmir class, with base functionality and find node, no key-value mappings.
     
 class KhashmirBase(protocol.Factory):
     """The base Khashmir class, with base functionality and find node, no key-value mappings.
     
@@ -80,7 +87,7 @@ class KhashmirBase(protocol.Factory):
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
         self.token_secrets = [newID()]
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
         self.token_secrets = [newID()]
-        self.stats = StatsLogger(self.table, self.store, self.config)
+        self.stats = StatsLogger(self.table, self.store)
         
         # Start listening
         self.udp = krpc.hostbroker(self, self.stats, config)
         
         # Start listening
         self.udp = krpc.hostbroker(self, self.stats, config)
@@ -162,7 +169,7 @@ class KhashmirBase(protocol.Factory):
         n = self.Node(NULL_ID, host, port)
         self.sendJoin(n, callback=callback, errback=errback)
 
         n = self.Node(NULL_ID, host, port)
         self.sendJoin(n, callback=callback, errback=errback)
 
-    def findNode(self, id, callback, errback=None):
+    def findNode(self, id, callback):
         """Find the contact info for the K closest nodes in the global table.
         
         @type id: C{string}
         """Find the contact info for the K closest nodes in the global table.
         
         @type id: C{string}
@@ -170,21 +177,12 @@ class KhashmirBase(protocol.Factory):
         @type callback: C{method}
         @param callback: the method to call with the results, it must take 1
             parameter, the list of K closest nodes
         @type callback: C{method}
         @param callback: the method to call with the results, it must take 1
             parameter, the list of K closest nodes
-        @type errback: C{method}
-        @param errback: the method to call if an error occurs
-            (optional, defaults to doing nothing when an error occurs)
         """
         # Start with our node
         nodes = [copy(self.node)]
 
         """
         # Start with our node
         nodes = [copy(self.node)]
 
-        d = Deferred()
-        if errback:
-            d.addCallbacks(callback, errback)
-        else:
-            d.addCallback(callback)
-
         # Start the finding nodes action
         # Start the finding nodes action
-        state = FindNode(self, id, d.callback, self.config, self.stats)
+        state = FindNode(self, id, callback, self.config, self.stats)
         reactor.callLater(0, state.goWithNodes, nodes)
     
     def insertNode(self, node, contacted = True):
         reactor.callLater(0, state.goWithNodes, nodes)
     
     def insertNode(self, node, contacted = True):
@@ -202,19 +200,24 @@ class KhashmirBase(protocol.Factory):
         @param contacted: whether the new node is known to be good, i.e.
             responded to a request (optional, defaults to True)
         """
         @param contacted: whether the new node is known to be good, i.e.
             responded to a request (optional, defaults to True)
         """
+        # Don't add any local nodes to the routing table
+        if not self.config['LOCAL_OK'] and isLocal.match(node.host):
+            return
+
         old = self.table.insertNode(node, contacted=contacted)
         if (old and old.id != self.node.id and
             (datetime.now() - old.lastSeen) > 
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
         old = self.table.insertNode(node, contacted=contacted)
         if (old and old.id != self.node.id and
             (datetime.now() - old.lastSeen) > 
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
-            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
+            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
                 """The pinged node never responded, so replace it."""
                 """The pinged node never responded, so replace it."""
-                log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
-                log.err(err)
+                log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage()))
+                self.stats.completedAction('ping', start)
                 self.table.replaceStaleNode(oldnode, newnode)
             
                 self.table.replaceStaleNode(oldnode, newnode)
             
-            def _notStaleNodeHandler(dict, old=old, self=self):
+            def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
                 """Got a pong from the old node, so update it."""
                 """Got a pong from the old node, so update it."""
+                self.stats.completedAction('ping', start)
                 if dict['id'] == old.id:
                     self.table.justSeenNode(old.id)
             
                 if dict['id'] == old.id:
                     self.table.justSeenNode(old.id)
             
@@ -237,17 +240,18 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to calling the callback with None)
         """
 
             (optional, defaults to calling the callback with None)
         """
 
-        def _pongHandler(dict, node=node, self=self, callback=callback):
+        def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
             """Node responded properly, callback with response."""
             n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
             """Node responded properly, callback with response."""
             n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
-            self.insertNode(n)
+            self.stats.completedAction('join', start)
+            reactor.callLater(0, self.insertNode, n)
             if callback:
                 callback((dict['ip_addr'], dict['port']))
 
             if callback:
                 callback((dict['ip_addr'], dict['port']))
 
-        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
+        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
             """Error occurred, fail node and errback or callback with error."""
             """Error occurred, fail node and errback or callback with error."""
-            log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
-            log.err(err)
+            log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
+            self.stats.completedAction('join', start)
             self.table.nodeFailed(node)
             if errback:
                 errback()
             self.table.nodeFailed(node)
             if errback:
                 errback()
@@ -258,7 +262,7 @@ class KhashmirBase(protocol.Factory):
         df = node.join(self.node.id)
         df.addCallbacks(_pongHandler, _defaultPong)
 
         df = node.join(self.node.id)
         df.addCallbacks(_pongHandler, _defaultPong)
 
-    def findCloseNodes(self, callback=lambda a: None, errback = None):
+    def findCloseNodes(self, callback=lambda a: None):
         """Perform a findNode on the ID one away from our own.
 
         This will allow us to populate our table with nodes on our network
         """Perform a findNode on the ID one away from our own.
 
         This will allow us to populate our table with nodes on our network
@@ -269,12 +273,9 @@ class KhashmirBase(protocol.Factory):
         @param callback: the method to call with the results, it must take 1
             parameter, the list of K closest nodes
             (optional, defaults to doing nothing with the results)
         @param callback: the method to call with the results, it must take 1
             parameter, the list of K closest nodes
             (optional, defaults to doing nothing with the results)
-        @type errback: C{method}
-        @param errback: the method to call if an error occurs
-            (optional, defaults to doing nothing when an error occurs)
         """
         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
         """
         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
-        self.findNode(id, callback, errback)
+        self.findNode(id, callback)
 
     def refreshTable(self, force = False):
         """Check all the buckets for those that need refreshing.
 
     def refreshTable(self, force = False):
         """Check all the buckets for those that need refreshing.
@@ -316,7 +317,7 @@ class KhashmirBase(protocol.Factory):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
 
         return {"id" : self.node.id}
         
 
         return {"id" : self.node.id}
         
@@ -330,7 +331,7 @@ class KhashmirBase(protocol.Factory):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
@@ -348,7 +349,7 @@ class KhashmirBase(protocol.Factory):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
@@ -364,7 +365,7 @@ class KhashmirRead(KhashmirBase):
     _Node = KNodeRead
 
     #{ Local interface
     _Node = KNodeRead
 
     #{ Local interface
-    def findValue(self, key, callback, errback=None):
+    def findValue(self, key, callback):
         """Get the nodes that have values for the key from the global table.
         
         @type key: C{string}
         """Get the nodes that have values for the key from the global table.
         
         @type key: C{string}
@@ -372,21 +373,12 @@ class KhashmirRead(KhashmirBase):
         @type callback: C{method}
         @param callback: the method to call with the results, it must take 1
             parameter, the list of nodes with values
         @type callback: C{method}
         @param callback: the method to call with the results, it must take 1
             parameter, the list of nodes with values
-        @type errback: C{method}
-        @param errback: the method to call if an error occurs
-            (optional, defaults to doing nothing when an error occurs)
         """
         # Start with ourself
         nodes = [copy(self.node)]
         
         """
         # Start with ourself
         nodes = [copy(self.node)]
         
-        d = Deferred()
-        if errback:
-            d.addCallbacks(callback, errback)
-        else:
-            d.addCallback(callback)
-
         # Search for others starting with the locally found ones
         # Search for others starting with the locally found ones
-        state = FindValue(self, key, d.callback, self.config, self.stats)
+        state = FindValue(self, key, callback, self.config, self.stats)
         reactor.callLater(0, state.goWithNodes, nodes)
 
     def valueForKey(self, key, callback, searchlocal = True):
         reactor.callLater(0, state.goWithNodes, nodes)
 
     def valueForKey(self, key, callback, searchlocal = True):
@@ -433,7 +425,7 @@ class KhashmirRead(KhashmirBase):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
     
         nodes = self.table.findNodes(key)
         nodes = map(lambda node: node.contactInfo(), nodes)
     
         nodes = self.table.findNodes(key)
         nodes = map(lambda node: node.contactInfo(), nodes)
@@ -455,7 +447,7 @@ class KhashmirRead(KhashmirBase):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
     
         l = self.store.retrieveValues(key)
         if num == 0 or num >= len(l):
     
         l = self.store.retrieveValues(key)
         if num == 0 or num >= len(l):
@@ -515,7 +507,7 @@ class KhashmirWrite(KhashmirRead):
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         """
         if _krpc_sender is not None:
             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-            self.insertNode(n, contacted = False)
+            reactor.callLater(0, self.insertNode, n, False)
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
         else:
             _krpc_sender = ('127.0.0.1', self.port)
 
@@ -535,11 +527,12 @@ class Khashmir(KhashmirWrite):
 class SimpleTests(unittest.TestCase):
     
     timeout = 10
 class SimpleTests(unittest.TestCase):
     
     timeout = 10
-    DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+    DHT_DEFAULTS = {'PORT': 9977,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
-                    'MAX_FAILURES': 3,
+                    'MAX_FAILURES': 3, 'LOCAL_OK': True,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def setUp(self):
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def setUp(self):
@@ -608,11 +601,12 @@ class MultiTest(unittest.TestCase):
     
     timeout = 30
     num = 20
     
     timeout = 30
     num = 20
-    DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+    DHT_DEFAULTS = {'PORT': 9977,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
-                    'MAX_FAILURES': 3,
+                    'MAX_FAILURES': 3, 'LOCAL_OK': True,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def _done(self, val):
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def _done(self, val):