X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_p2p_Khashmir%2FDHT.py;h=af247302f9ed6dc0b3838bbef09d2ebf681d54f8;hp=f773482695428b03e044153e3b2e2b3e8a6be8e1;hb=151de5a26973474d2b1a8fc3f071615c09e9a62d;hpb=7a84d9fb17076695aba3c0f5a32c6487bdd3f059 diff --git a/apt_p2p_Khashmir/DHT.py b/apt_p2p_Khashmir/DHT.py index f773482..af24730 100644 --- a/apt_p2p_Khashmir/DHT.py +++ b/apt_p2p_Khashmir/DHT.py @@ -16,6 +16,7 @@ from zope.interface import implements from apt_p2p.interfaces import IDHT, IDHTStats, IDHTStatsFactory from khashmir import Khashmir from bencode import bencode, bdecode +from khash import HASH_LENGTH try: from twisted.web2 import channel, server, resource, http, http_headers @@ -105,15 +106,16 @@ class DHT: self.bootstrap_node = self.config_parser.getboolean(section, 'BOOTSTRAP_NODE') for k in self.config_parser.options(section): # The numbers in the config file - if k in ['K', 'HASH_LENGTH', 'CONCURRENT_REQS', 'STORE_REDUNDANCY', + if k in ['CONCURRENT_REQS', 'STORE_REDUNDANCY', 'RETRIEVE_VALUES', 'MAX_FAILURES', 'PORT']: self.config[k] = self.config_parser.getint(section, k) # The times in the config file elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL', - 'BUCKET_STALENESS', 'KEY_EXPIRE']: + 'BUCKET_STALENESS', 'KEY_EXPIRE', + 'KRPC_TIMEOUT', 'KRPC_INITIAL_DELAY']: self.config[k] = self.config_parser.gettime(section, k) # The booleans in the config file - elif k in ['SPEW']: + elif k in ['SPEW', 'LOCAL_OK']: self.config[k] = self.config_parser.getboolean(section, k) # Everything else is a string else: @@ -230,34 +232,24 @@ class DHT: self.joined = False self.khashmir.shutdown() - def _normKey(self, key, bits=None, bytes=None): + def _normKey(self, key): """Normalize the length of keys used in the DHT.""" - bits = self.config["HASH_LENGTH"] - if bits is not None: - bytes = (bits - 1) // 8 + 1 - else: - if bytes is None: - raise DHTError, "you must specify one of bits or bytes for normalization" - # Extend short keys with null bytes - if len(key) < bytes: - key = key + '\000'*(bytes - len(key)) + if len(key) < HASH_LENGTH: + key = key + '\000'*(HASH_LENGTH - len(key)) # Truncate long keys - elif len(key) > bytes: - key = key[:bytes] + elif len(key) > HASH_LENGTH: + key = key[:HASH_LENGTH] return key def getValue(self, key): """See L{apt_p2p.interfaces.IDHT}.""" - d = defer.Deferred() - if self.config is None: - d.errback(DHTError("configuration not loaded")) - return d + return defer.fail(DHTError("configuration not loaded")) if not self.joined: - d.errback(DHTError("have not joined a network yet")) - return d + return defer.fail(DHTError("have not joined a network yet")) + d = defer.Deferred() key = self._normKey(key) if key not in self.retrieving: @@ -283,15 +275,12 @@ class DHT: def storeValue(self, key, value): """See L{apt_p2p.interfaces.IDHT}.""" - d = defer.Deferred() - if self.config is None: - d.errback(DHTError("configuration not loaded")) - return d + return defer.fail(DHTError("configuration not loaded")) if not self.joined: - d.errback(DHTError("have not joined a network yet")) - return d + return defer.fail(DHTError("have not joined a network yet")) + d = defer.Deferred() key = self._normKey(key) bvalue = bencode(value) @@ -343,12 +332,13 @@ class TestSimpleDHT(unittest.TestCase): """Simple 2-node unit tests for the DHT.""" timeout = 50 - DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, + DHT_DEFAULTS = {'PORT': 9977, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, - 'MAX_FAILURES': 3, + 'MAX_FAILURES': 3, 'LOCAL_OK': True, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, - 'KEY_EXPIRE': 3600, 'SPEW': False, } + 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, + 'KEY_EXPIRE': 3600, 'SPEW': True, } def setUp(self): self.a = DHT() @@ -367,14 +357,15 @@ class TestSimpleDHT(unittest.TestCase): d = self.a.join() return d - def test_failed_join(self): + def no_krpc_errors(self, result): from krpc import KrpcError + self.flushLoggedErrors(KrpcError) + return result + + def test_failed_join(self): d = self.b.join() reactor.callLater(30, self.a.join) - def no_errors(result, self = self): - self.flushLoggedErrors(KrpcError) - return result - d.addCallback(no_errors) + d.addCallback(self.no_krpc_errors) return d def node_join(self, result): @@ -382,11 +373,14 @@ class TestSimpleDHT(unittest.TestCase): return d def test_join(self): - self.lastDefer = defer.Deferred() d = self.a.join() d.addCallback(self.node_join) - d.addCallback(self.lastDefer.callback) - return self.lastDefer + return d + + def test_timeout_retransmit(self): + d = self.b.join() + reactor.callLater(4, self.a.join) + return d def test_normKey(self): h = self.a._normKey('12345678901234567890') @@ -457,14 +451,15 @@ class TestSimpleDHT(unittest.TestCase): class TestMultiDHT(unittest.TestCase): """More complicated 20-node tests for the DHT.""" - timeout = 80 + timeout = 100 num = 20 - DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, + DHT_DEFAULTS = {'PORT': 9977, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, - 'MAX_FAILURES': 3, + 'MAX_FAILURES': 3, 'LOCAL_OK': True, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, - 'KEY_EXPIRE': 3600, 'SPEW': False, } + 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, + 'KEY_EXPIRE': 3600, 'SPEW': True, } def setUp(self): self.l = []