X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p_Khashmir%2FDHT.py;h=f11eefc8305f87f12c10a85b5703d942ee3ed08e;hb=d63ad7d7b1c9e5567bd28450197ef810dc5c5475;hp=edb626d7b338116bad8493bb9a267d6ec44eb7b1;hpb=1e0537c366d9c95b4cda1b105fbcca52a9dbbb3a;p=quix0rs-apt-p2p.git diff --git a/apt_p2p_Khashmir/DHT.py b/apt_p2p_Khashmir/DHT.py index edb626d..f11eefc 100644 --- a/apt_p2p_Khashmir/DHT.py +++ b/apt_p2p_Khashmir/DHT.py @@ -45,6 +45,8 @@ class DHT: @ivar joined: whether the DHT network has been successfully joined @type outstandingJoins: C{int} @ivar outstandingJoins: the number of bootstrap nodes that have yet to respond + @type next_rejoin: C{int} + @ivar next_rejoin: the number of seconds before retrying the next join @type foundAddrs: C{list} of (C{string}, C{int}) @ivar foundAddrs: the IP address an port that were returned by bootstrap nodes @type storing: C{dictionary} @@ -79,8 +81,10 @@ class DHT: self.bootstrap = [] self.bootstrap_node = False self.joining = None + self.khashmir = None self.joined = False self.outstandingJoins = 0 + self.next_rejoin = 20 self.foundAddrs = [] self.storing = {} self.retrieving = {} @@ -115,17 +119,33 @@ class DHT: else: self.config[k] = self.config_parser.get(section, k) - def join(self): - """See L{apt_p2p.interfaces.IDHT}.""" - if self.config is None: - raise DHTError, "configuration not loaded" + def join(self, deferred = None): + """See L{apt_p2p.interfaces.IDHT}. + + @param deferred: the deferred to callback when the join is complete + (optional, defaults to creating a new deferred and returning it) + """ + # Check for multiple simultaneous joins if self.joining: - raise DHTError, "a join is already in progress" + if deferred: + deferred.errback(DHTError("a join is already in progress")) + return + else: + raise DHTError, "a join is already in progress" + + if deferred: + self.joining = deferred + else: + self.joining = defer.Deferred() + + if self.config is None: + self.joining.errback(DHTError("configuration not loaded")) + return self.joining # Create the new khashmir instance - self.khashmir = Khashmir(self.config, self.cache_dir) - - self.joining = defer.Deferred() + if not self.khashmir: + self.khashmir = Khashmir(self.config, self.cache_dir) + for node in self.bootstrap: host, port = node.rsplit(':', 1) port = int(port) @@ -168,7 +188,7 @@ class DHT: def _join_complete(self, result): """End the joining process and return the addresses found for this node.""" - if not self.joined and len(result) > 0: + if not self.joined and len(result) > 1: self.joined = True if self.joining and self.outstandingJoins <= 0: df = self.joining @@ -177,7 +197,10 @@ class DHT: self.joined = True df.callback(self.foundAddrs) else: - df.errback(DHTError('could not find any nodes to bootstrap to')) + # Try to join later using exponential backoff delays + log.msg('Join failed, retrying in %d seconds' % self.next_rejoin) + reactor.callLater(self.next_rejoin, self.join, df) + self.next_rejoin *= 2 def getAddrs(self): """Get the list of addresses returned by bootstrap nodes for this node.""" @@ -214,14 +237,17 @@ class DHT: def getValue(self, key): """See L{apt_p2p.interfaces.IDHT}.""" + d = defer.Deferred() + if self.config is None: - raise DHTError, "configuration not loaded" + d.errback(DHTError("configuration not loaded")) + return d if not self.joined: - raise DHTError, "have not joined a network yet" + d.errback(DHTError("have not joined a network yet")) + return d key = self._normKey(key) - d = defer.Deferred() if key not in self.retrieving: self.khashmir.valueForKey(key, self._getValue) self.retrieving.setdefault(key, []).append(d) @@ -245,10 +271,14 @@ class DHT: def storeValue(self, key, value): """See L{apt_p2p.interfaces.IDHT}.""" + d = defer.Deferred() + if self.config is None: - raise DHTError, "configuration not loaded" + d.errback(DHTError("configuration not loaded")) + return d if not self.joined: - raise DHTError, "have not joined a network yet" + d.errback(DHTError("have not joined a network yet")) + return d key = self._normKey(key) bvalue = bencode(value) @@ -256,7 +286,6 @@ class DHT: if key in self.storing and bvalue in self.storing[key]: raise DHTError, "already storing that key with the same value" - d = defer.Deferred() self.khashmir.storeValueForKey(key, bvalue, self._storeValue) self.storing.setdefault(key, {})[bvalue] = d return d @@ -301,7 +330,7 @@ class DHT: class TestSimpleDHT(unittest.TestCase): """Simple 2-node unit tests for the DHT.""" - timeout = 2 + timeout = 50 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, @@ -325,6 +354,16 @@ class TestSimpleDHT(unittest.TestCase): def test_bootstrap_join(self): d = self.a.join() return d + + def test_failed_join(self): + from krpc import KrpcError + d = self.b.join() + reactor.callLater(30, self.a.join) + def no_errors(result, self = self): + self.flushLoggedErrors(KrpcError) + return result + d.addCallback(no_errors) + return d def node_join(self, result): d = self.b.join() @@ -406,7 +445,7 @@ class TestSimpleDHT(unittest.TestCase): class TestMultiDHT(unittest.TestCase): """More complicated 20-node tests for the DHT.""" - timeout = 60 + timeout = 80 num = 20 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,