X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;ds=sidebyside;f=apt_p2p_Khashmir%2FDHT.py;h=af247302f9ed6dc0b3838bbef09d2ebf681d54f8;hb=17b7fb5f34a147cf61191d857a4735ec9e331842;hp=399babffa274df3218cdbb88b7b2cc85e907a5e1;hpb=7b1167d8ce780312d3689c9309c7e9c64060c085;p=quix0rs-apt-p2p.git diff --git a/apt_p2p_Khashmir/DHT.py b/apt_p2p_Khashmir/DHT.py index 399babf..af24730 100644 --- a/apt_p2p_Khashmir/DHT.py +++ b/apt_p2p_Khashmir/DHT.py @@ -13,9 +13,16 @@ from twisted.python import log from twisted.trial import unittest from zope.interface import implements -from apt_p2p.interfaces import IDHT +from apt_p2p.interfaces import IDHT, IDHTStats, IDHTStatsFactory from khashmir import Khashmir from bencode import bencode, bdecode +from khash import HASH_LENGTH + +try: + from twisted.web2 import channel, server, resource, http, http_headers + _web2 = True +except ImportError: + _web2 = False khashmir_dir = 'apt-p2p-Khashmir' @@ -39,6 +46,8 @@ class DHT: @ivar joined: whether the DHT network has been successfully joined @type outstandingJoins: C{int} @ivar outstandingJoins: the number of bootstrap nodes that have yet to respond + @type next_rejoin: C{int} + @ivar next_rejoin: the number of seconds before retrying the next join @type foundAddrs: C{list} of (C{string}, C{int}) @ivar foundAddrs: the IP address an port that were returned by bootstrap nodes @type storing: C{dictionary} @@ -51,6 +60,8 @@ class DHT: @type retrieved: C{dictionary} @ivar retrieved: keys are the keys for which getValue requests are active, values are list of the values returned so far + @type factory: L{twisted.web2.channel.HTTPFactory} + @ivar factory: the factory to use to serve HTTP requests for statistics @type config_parser: L{apt_p2p.apt_p2p_conf.AptP2PConfigParser} @ivar config_parser: the configuration info for the main program @type section: C{string} @@ -59,8 +70,11 @@ class DHT: @ivar khashmir: the khashmir DHT instance to use """ - implements(IDHT) - + if _web2: + implements(IDHT, IDHTStats, IDHTStatsFactory) + else: + implements(IDHT, IDHTStats) + def __init__(self): """Initialize the DHT.""" self.config = None @@ -68,12 +82,15 @@ class DHT: self.bootstrap = [] self.bootstrap_node = False self.joining = None + self.khashmir = None self.joined = False self.outstandingJoins = 0 + self.next_rejoin = 20 self.foundAddrs = [] self.storing = {} self.retrieving = {} self.retrieved = {} + self.factory = None def loadConfig(self, config, section): """See L{apt_p2p.interfaces.IDHT}.""" @@ -89,48 +106,77 @@ class DHT: self.bootstrap_node = self.config_parser.getboolean(section, 'BOOTSTRAP_NODE') for k in self.config_parser.options(section): # The numbers in the config file - if k in ['K', 'HASH_LENGTH', 'CONCURRENT_REQS', 'STORE_REDUNDANCY', + if k in ['CONCURRENT_REQS', 'STORE_REDUNDANCY', 'RETRIEVE_VALUES', 'MAX_FAILURES', 'PORT']: self.config[k] = self.config_parser.getint(section, k) # The times in the config file elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL', - 'BUCKET_STALENESS', 'KEY_EXPIRE']: + 'BUCKET_STALENESS', 'KEY_EXPIRE', + 'KRPC_TIMEOUT', 'KRPC_INITIAL_DELAY']: self.config[k] = self.config_parser.gettime(section, k) # The booleans in the config file - elif k in ['SPEW']: + elif k in ['SPEW', 'LOCAL_OK']: self.config[k] = self.config_parser.getboolean(section, k) # Everything else is a string else: self.config[k] = self.config_parser.get(section, k) - def join(self): - """See L{apt_p2p.interfaces.IDHT}.""" - if self.config is None: - raise DHTError, "configuration not loaded" + def join(self, deferred = None): + """See L{apt_p2p.interfaces.IDHT}. + + @param deferred: the deferred to callback when the join is complete + (optional, defaults to creating a new deferred and returning it) + """ + # Check for multiple simultaneous joins if self.joining: - raise DHTError, "a join is already in progress" + if deferred: + deferred.errback(DHTError("a join is already in progress")) + return + else: + raise DHTError, "a join is already in progress" + + if deferred: + self.joining = deferred + else: + self.joining = defer.Deferred() + + if self.config is None: + self.joining.errback(DHTError("configuration not loaded")) + return self.joining # Create the new khashmir instance - self.khashmir = Khashmir(self.config, self.cache_dir) - - self.joining = defer.Deferred() + if not self.khashmir: + self.khashmir = Khashmir(self.config, self.cache_dir) + + self.outstandingJoins = 0 for node in self.bootstrap: host, port = node.rsplit(':', 1) port = int(port) + self.outstandingJoins += 1 # Translate host names into IP addresses if isIPAddress(host): self._join_gotIP(host, port) else: - reactor.resolve(host).addCallback(self._join_gotIP, port) + reactor.resolve(host).addCallbacks(self._join_gotIP, + self._join_resolveFailed, + callbackArgs = (port, ), + errbackArgs = (host, port)) return self.joining def _join_gotIP(self, ip, port): """Join the DHT using a single bootstrap nodes IP address.""" - self.outstandingJoins += 1 self.khashmir.addContact(ip, port, self._join_single, self._join_error) + def _join_resolveFailed(self, err, host, port): + """Failed to lookup the IP address of the bootstrap node.""" + log.msg('Failed to find an IP address for host: (%r, %r)' % (host, port)) + log.err(err) + self.outstandingJoins -= 1 + if self.outstandingJoins <= 0: + self.khashmir.findCloseNodes(self._join_complete) + def _join_single(self, addr): """Process the response from the bootstrap node. @@ -140,7 +186,7 @@ class DHT: if addr: self.foundAddrs.append(addr) if addr or self.outstandingJoins <= 0: - self.khashmir.findCloseNodes(self._join_complete, self._join_complete) + self.khashmir.findCloseNodes(self._join_complete) log.msg('Got back from bootstrap node: %r' % (addr,)) def _join_error(self, failure = None): @@ -152,11 +198,11 @@ class DHT: self.outstandingJoins -= 1 log.msg("bootstrap node could not be reached") if self.outstandingJoins <= 0: - self.khashmir.findCloseNodes(self._join_complete, self._join_complete) + self.khashmir.findCloseNodes(self._join_complete) def _join_complete(self, result): """End the joining process and return the addresses found for this node.""" - if not self.joined and len(result) > 0: + if not self.joined and isinstance(result, list) and len(result) > 1: self.joined = True if self.joining and self.outstandingJoins <= 0: df = self.joining @@ -165,7 +211,10 @@ class DHT: self.joined = True df.callback(self.foundAddrs) else: - df.errback(DHTError('could not find any nodes to bootstrap to')) + # Try to join later using exponential backoff delays + log.msg('Join failed, retrying in %d seconds' % self.next_rejoin) + reactor.callLater(self.next_rejoin, self.join, df) + self.next_rejoin *= 2 def getAddrs(self): """Get the list of addresses returned by bootstrap nodes for this node.""" @@ -183,33 +232,26 @@ class DHT: self.joined = False self.khashmir.shutdown() - def _normKey(self, key, bits=None, bytes=None): + def _normKey(self, key): """Normalize the length of keys used in the DHT.""" - bits = self.config["HASH_LENGTH"] - if bits is not None: - bytes = (bits - 1) // 8 + 1 - else: - if bytes is None: - raise DHTError, "you must specify one of bits or bytes for normalization" - # Extend short keys with null bytes - if len(key) < bytes: - key = key + '\000'*(bytes - len(key)) + if len(key) < HASH_LENGTH: + key = key + '\000'*(HASH_LENGTH - len(key)) # Truncate long keys - elif len(key) > bytes: - key = key[:bytes] + elif len(key) > HASH_LENGTH: + key = key[:HASH_LENGTH] return key def getValue(self, key): """See L{apt_p2p.interfaces.IDHT}.""" if self.config is None: - raise DHTError, "configuration not loaded" + return defer.fail(DHTError("configuration not loaded")) if not self.joined: - raise DHTError, "have not joined a network yet" + return defer.fail(DHTError("have not joined a network yet")) + d = defer.Deferred() key = self._normKey(key) - d = defer.Deferred() if key not in self.retrieving: self.khashmir.valueForKey(key, self._getValue) self.retrieving.setdefault(key, []).append(d) @@ -234,17 +276,17 @@ class DHT: def storeValue(self, key, value): """See L{apt_p2p.interfaces.IDHT}.""" if self.config is None: - raise DHTError, "configuration not loaded" + return defer.fail(DHTError("configuration not loaded")) if not self.joined: - raise DHTError, "have not joined a network yet" + return defer.fail(DHTError("have not joined a network yet")) + d = defer.Deferred() key = self._normKey(key) bvalue = bencode(value) if key in self.storing and bvalue in self.storing[key]: raise DHTError, "already storing that key with the same value" - d = defer.Deferred() self.khashmir.storeValueForKey(key, bvalue, self._storeValue) self.storing.setdefault(key, {})[bvalue] = d return d @@ -260,17 +302,43 @@ class DHT: del self.storing[key][bvalue] if len(self.storing[key].keys()) == 0: del self.storing[key] + + def getStats(self): + """See L{apt_p2p.interfaces.IDHTStats}.""" + return self.khashmir.getStats() + + def getStatsFactory(self): + """See L{apt_p2p.interfaces.IDHTStatsFactory}.""" + assert _web2, "NOT IMPLEMENTED: twisted.web2 must be installed to use the stats factory." + if self.factory is None: + # Create a simple HTTP factory for stats + class StatsResource(resource.Resource): + def __init__(self, manager): + self.manager = manager + def render(self, ctx): + return http.Response( + 200, + {'content-type': http_headers.MimeType('text', 'html')}, + '\n\n' + self.manager.getStats() + '\n\n') + def locateChild(self, request, segments): + log.msg('Got HTTP stats request from %s' % (request.remoteAddr, )) + return self, () + + self.factory = channel.HTTPFactory(server.Site(StatsResource(self))) + return self.factory + class TestSimpleDHT(unittest.TestCase): """Simple 2-node unit tests for the DHT.""" - timeout = 2 - DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, + timeout = 50 + DHT_DEFAULTS = {'PORT': 9977, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, - 'MAX_FAILURES': 3, + 'MAX_FAILURES': 3, 'LOCAL_OK': True, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, - 'KEY_EXPIRE': 3600, 'SPEW': False, } + 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, + 'KEY_EXPIRE': 3600, 'SPEW': True, } def setUp(self): self.a = DHT() @@ -288,17 +356,31 @@ class TestSimpleDHT(unittest.TestCase): def test_bootstrap_join(self): d = self.a.join() return d + + def no_krpc_errors(self, result): + from krpc import KrpcError + self.flushLoggedErrors(KrpcError) + return result + + def test_failed_join(self): + d = self.b.join() + reactor.callLater(30, self.a.join) + d.addCallback(self.no_krpc_errors) + return d def node_join(self, result): d = self.b.join() return d def test_join(self): - self.lastDefer = defer.Deferred() d = self.a.join() d.addCallback(self.node_join) - d.addCallback(self.lastDefer.callback) - return self.lastDefer + return d + + def test_timeout_retransmit(self): + d = self.b.join() + reactor.callLater(4, self.a.join) + return d def test_normKey(self): h = self.a._normKey('12345678901234567890') @@ -369,14 +451,15 @@ class TestSimpleDHT(unittest.TestCase): class TestMultiDHT(unittest.TestCase): """More complicated 20-node tests for the DHT.""" - timeout = 60 + timeout = 100 num = 20 - DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, + DHT_DEFAULTS = {'PORT': 9977, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, - 'MAX_FAILURES': 3, + 'MAX_FAILURES': 3, 'LOCAL_OK': True, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, - 'KEY_EXPIRE': 3600, 'SPEW': False, } + 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, + 'KEY_EXPIRE': 3600, 'SPEW': True, } def setUp(self): self.l = []