X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_p2p_Khashmir%2Fkhashmir.py;h=f083da4763dc25f5a96fea8b50be9ca7488a89a7;hp=5db5ed8ab48015c9c73073f9c8ceab8eac652701;hb=a5dd904be839e2b2896483724d6238b5a970b5de;hpb=593a583cc5076643890a811d2fb1a5e0548f290c diff --git a/apt_p2p_Khashmir/khashmir.py b/apt_p2p_Khashmir/khashmir.py index 5db5ed8..f083da4 100644 --- a/apt_p2p_Khashmir/khashmir.py +++ b/apt_p2p_Khashmir/khashmir.py @@ -1,7 +1,9 @@ -## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information -"""The main Khashmir program.""" +"""The main Khashmir program. + +@var isLocal: a compiled regular expression suitable for testing if an + IP address is from a known local or private range +""" import warnings warnings.simplefilter("ignore", DeprecationWarning) @@ -10,7 +12,7 @@ from datetime import datetime, timedelta from random import randrange, shuffle from sha import sha from copy import copy -import os +import os, re from twisted.internet.defer import Deferred from twisted.internet import protocol, reactor @@ -25,6 +27,13 @@ from actions import FindNode, FindValue, GetValue, StoreValue from stats import StatsLogger import krpc +isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+ + '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+ + '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+ + '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+ + '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+ + '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$') + class KhashmirBase(protocol.Factory): """The base Khashmir class, with base functionality and find node, no key-value mappings. @@ -157,7 +166,7 @@ class KhashmirBase(protocol.Factory): (optional, defaults to doing nothing with the results) @type errback: C{method} @param errback: the method to call if an error occurs - (optional, defaults to calling the callback with None) + (optional, defaults to calling the callback with the error) """ n = self.Node(NULL_ID, host, port) self.sendJoin(n, callback=callback, errback=errback) @@ -183,7 +192,7 @@ class KhashmirBase(protocol.Factory): If all you have is a host/port, then use L{addContact}, which calls this method after receiving the PONG from the remote node. The reason for - the seperation is we can't insert a node into the table without its + the separation is we can't insert a node into the table without its node ID. That means of course the node passed into this method needs to be a properly formed Node object with a valid ID. @@ -193,28 +202,59 @@ class KhashmirBase(protocol.Factory): @param contacted: whether the new node is known to be good, i.e. responded to a request (optional, defaults to True) """ + # Don't add any local nodes to the routing table + if not self.config['LOCAL_OK'] and isLocal.match(node.host): + log.msg('Not adding local node to table: %s/%s' % (node.host, node.port)) + return + old = self.table.insertNode(node, contacted=contacted) - if (old and old.id != self.node.id and + + if (isinstance(old, self._Node) and old.id != self.node.id and (datetime.now() - old.lastSeen) > timedelta(seconds=self.config['MIN_PING_INTERVAL'])): - def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()): - """The pinged node never responded, so replace it.""" - log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage())) - self.stats.completedAction('ping', start) - self.table.replaceStaleNode(oldnode, newnode) - - def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()): - """Got a pong from the old node, so update it.""" - self.stats.completedAction('ping', start) - if dict['id'] == old.id: - self.table.justSeenNode(old.id) - # Bucket is full, check to see if old node is still available self.stats.startedAction('ping') df = old.ping(self.node.id) - df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) + df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler, + callbackArgs = (old, datetime.now()), + errbackArgs = (old, datetime.now(), node, contacted)) + elif not old and not contacted: + # There's room, we just need to contact the node first + self.stats.startedAction('ping') + df = node.ping(self.node.id) + # Convert the returned contact info into a node + df.addCallback(self._pongHandler, datetime.now()) + # Try adding the contacted node + df.addCallbacks(self.insertNode, self._pongError, + errbackArgs = (node, datetime.now())) + + def _freshNodeHandler(self, dict, old, start): + """Got a pong from the old node, so update it.""" + self.stats.completedAction('ping', start) + if dict['id'] == old.id: + self.table.justSeenNode(old.id) + + def _staleNodeHandler(self, err, old, start, node, contacted): + """The pinged node never responded, so replace it.""" + log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage())) + self.stats.completedAction('ping', start) + self.table.invalidateNode(old) + self.insertNode(node, contacted) + + def _pongHandler(self, dict, start): + """Node responded properly, change response into a node to insert.""" + self.stats.completedAction('ping', start) + # Create the node using the returned contact info + n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) + return n + def _pongError(self, err, node, start): + """Error occurred, fail node and errback or callback with error.""" + log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage())) + self.stats.completedAction('ping', start) + self.table.nodeFailed(node) + def sendJoin(self, node, callback=None, errback=None): """Join the DHT by pinging a bootstrap node. @@ -226,31 +266,33 @@ class KhashmirBase(protocol.Factory): (optional, defaults to doing nothing with the results) @type errback: C{method} @param errback: the method to call if an error occurs - (optional, defaults to calling the callback with None) + (optional, defaults to calling the callback with the error) """ - - def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()): - """Node responded properly, callback with response.""" - n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) - self.stats.completedAction('join', start) - reactor.callLater(0, self.insertNode, n) - if callback: - callback((dict['ip_addr'], dict['port'])) - - def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()): - """Error occurred, fail node and errback or callback with error.""" - log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage())) - self.stats.completedAction('join', start) - self.table.nodeFailed(node) - if errback: - errback() - elif callback: - callback(None) - + if errback is None: + errback = callback self.stats.startedAction('join') df = node.join(self.node.id) - df.addCallbacks(_pongHandler, _defaultPong) - + df.addCallbacks(self._joinHandler, self._joinError, + callbackArgs = (node, datetime.now()), + errbackArgs = (node, datetime.now())) + if callback: + df.addCallbacks(callback, errback) + + def _joinHandler(self, dict, node, start): + """Node responded properly, extract the response.""" + self.stats.completedAction('join', start) + # Create the node using the returned contact info + n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) + reactor.callLater(0, self.insertNode, n) + return (dict['ip_addr'], dict['port']) + + def _joinError(self, err, node, start): + """Error occurred, fail node.""" + log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage())) + self.stats.completedAction('join', start) + self.table.nodeFailed(node) + return err + def findCloseNodes(self, callback=lambda a: None): """Perform a findNode on the ID one away from our own. @@ -517,12 +559,12 @@ class SimpleTests(unittest.TestCase): timeout = 10 DHT_DEFAULTS = {'PORT': 9977, - 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8, 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, - 'MAX_FAILURES': 3, + 'MAX_FAILURES': 3, 'LOCAL_OK': True, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, - 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, - 'KEY_EXPIRE': 3600, 'SPEW': False, } + 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, + 'KEY_EXPIRE': 3600, 'SPEW': True, } def setUp(self): d = self.DHT_DEFAULTS.copy() @@ -540,10 +582,10 @@ class SimpleTests(unittest.TestCase): def testAddContact(self): self.failUnlessEqual(len(self.a.table.buckets), 1) - self.failUnlessEqual(len(self.a.table.buckets[0].l), 0) + self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0) self.failUnlessEqual(len(self.b.table.buckets), 1) - self.failUnlessEqual(len(self.b.table.buckets[0].l), 0) + self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0) self.a.addContact('127.0.0.1', 4045) reactor.iterate() @@ -552,9 +594,9 @@ class SimpleTests(unittest.TestCase): reactor.iterate() self.failUnlessEqual(len(self.a.table.buckets), 1) - self.failUnlessEqual(len(self.a.table.buckets[0].l), 1) + self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1) self.failUnlessEqual(len(self.b.table.buckets), 1) - self.failUnlessEqual(len(self.b.table.buckets[0].l), 1) + self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1) def testStoreRetrieve(self): self.a.addContact('127.0.0.1', 4045) @@ -591,12 +633,12 @@ class MultiTest(unittest.TestCase): timeout = 30 num = 20 DHT_DEFAULTS = {'PORT': 9977, - 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8, 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, - 'MAX_FAILURES': 3, + 'MAX_FAILURES': 3, 'LOCAL_OK': True, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, - 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, - 'KEY_EXPIRE': 3600, 'SPEW': False, } + 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, + 'KEY_EXPIRE': 3600, 'SPEW': True, } def _done(self, val): self.done = 1