Also make the get and store functions call errbacks if not joined.
Also modify the callers of the get and store functions to respond well
to errbacks.
Also add a unittest for the rejoining functionality.
-Retry when joining the DHT.
-
-If a join node can not be reached when the program is started, it will
-currently give up and quit. Instead, it should try and join
-periodically every few minutes until it is successful.
-
-
Add statistics gathering to the peer downloading.
Statistics are needed of how much has been uploaded, downloaded from
Add statistics gathering to the peer downloading.
Statistics are needed of how much has been uploaded, downloaded from
# Start the DHT lookup
lookupDefer = self.manager.dht.getValue(key)
# Start the DHT lookup
lookupDefer = self.manager.dht.getValue(key)
- lookupDefer.addCallback(self._getDHTPieces, key)
+ lookupDefer.addBoth(self._getDHTPieces, key)
def _getDHTPieces(self, results, key):
"""Check the retrieved values."""
def _getDHTPieces(self, results, key):
"""Check the retrieved values."""
- for result in results:
- # Make sure the hash matches the key
- result_hash = sha.new(result.get('t', '')).digest()
- if result_hash == key:
- pieces = result['t']
- self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
- log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
- self.startDownload()
- return
+ if isinstance(results, list):
+ for result in results:
+ # Make sure the hash matches the key
+ result_hash = sha.new(result.get('t', '')).digest()
+ if result_hash == key:
+ pieces = result['t']
+ self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+ log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
+ self.startDownload()
+ return
+
+ log.msg('Could not retrieve the piece hashes from the DHT')
+ else:
+ log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
# Continue without the piece hashes
# Continue without the piece hashes
- log.msg('Could not retrieve the piece hashes from the DHT')
self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
self.startDownload()
self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
self.startDownload()
log.msg('Looking up hash in DHT for file: %s' % url)
key = hash.expected()
lookupDefer = self.dht.getValue(key)
log.msg('Looking up hash in DHT for file: %s' % url)
key = hash.expected()
lookupDefer = self.dht.getValue(key)
- lookupDefer.addCallback(self.lookupHash_done, hash, url, d)
+ lookupDefer.addBoth(self.lookupHash_done, hash, url, d)
def lookupHash_done(self, values, hash, url, d):
"""Start the download of the file.
def lookupHash_done(self, values, hash, url, d):
"""Start the download of the file.
@param values: the returned values from the DHT containing peer
download information
"""
@param values: the returned values from the DHT containing peer
download information
"""
- if not values:
- log.msg('Peers for %s were not found' % url)
+ if not isinstance(values, list) or not values:
+ if not isinstance(values, list):
+ log.msg('DHT lookup for %s failed with error %r' % (url, values))
+ else:
+ log.msg('Peers for %s were not found' % url)
getDefer = self.peers.get(hash, url)
getDefer.addCallback(self.cache.save_file, hash, url)
getDefer.addErrback(self.cache.save_error, url)
getDefer = self.peers.get(hash, url)
getDefer.addCallback(self.cache.save_file, hash, url)
getDefer.addErrback(self.cache.save_error, url)
value['l'] = sha.new(''.join(pieces)).digest()
storeDefer = self.dht.storeValue(key, value)
value['l'] = sha.new(''.join(pieces)).digest()
storeDefer = self.dht.storeValue(key, value)
- storeDefer.addCallback(self.store_done, hash)
+ storeDefer.addCallbacks(self.store_done, self.store_error,
+ callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
return storeDefer
def store_done(self, result, hash):
return storeDefer
def store_done(self, result, hash):
value = {'t': ''.join(pieces)}
storeDefer = self.dht.storeValue(key, value)
value = {'t': ''.join(pieces)}
storeDefer = self.dht.storeValue(key, value)
- storeDefer.addCallback(self.store_torrent_done, key)
+ storeDefer.addCallbacks(self.store_torrent_done, self.store_error,
+ callbackArgs = (key, ), errbackArgs = (key, ))
return storeDefer
return result
return storeDefer
return result
"""Adding the file to the DHT is complete, and so is the workflow."""
log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
return result
"""Adding the file to the DHT is complete, and so is the workflow."""
log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
return result
+
+ def store_error(self, err, key):
+ """Adding to the DHT failed."""
+ log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err))
+ return err
\ No newline at end of file
\ No newline at end of file
@ivar joined: whether the DHT network has been successfully joined
@type outstandingJoins: C{int}
@ivar outstandingJoins: the number of bootstrap nodes that have yet to respond
@ivar joined: whether the DHT network has been successfully joined
@type outstandingJoins: C{int}
@ivar outstandingJoins: the number of bootstrap nodes that have yet to respond
+ @type next_rejoin: C{int}
+ @ivar next_rejoin: the number of seconds before retrying the next join
@type foundAddrs: C{list} of (C{string}, C{int})
@ivar foundAddrs: the IP address an port that were returned by bootstrap nodes
@type storing: C{dictionary}
@type foundAddrs: C{list} of (C{string}, C{int})
@ivar foundAddrs: the IP address an port that were returned by bootstrap nodes
@type storing: C{dictionary}
self.bootstrap = []
self.bootstrap_node = False
self.joining = None
self.bootstrap = []
self.bootstrap_node = False
self.joining = None
self.joined = False
self.outstandingJoins = 0
self.joined = False
self.outstandingJoins = 0
self.foundAddrs = []
self.storing = {}
self.retrieving = {}
self.foundAddrs = []
self.storing = {}
self.retrieving = {}
else:
self.config[k] = self.config_parser.get(section, k)
else:
self.config[k] = self.config_parser.get(section, k)
- def join(self):
- """See L{apt_p2p.interfaces.IDHT}."""
- if self.config is None:
- raise DHTError, "configuration not loaded"
+ def join(self, deferred = None):
+ """See L{apt_p2p.interfaces.IDHT}.
+
+ @param deferred: the deferred to callback when the join is complete
+ (optional, defaults to creating a new deferred and returning it)
+ """
+ # Check for multiple simultaneous joins
- raise DHTError, "a join is already in progress"
+ if deferred:
+ deferred.errback(DHTError("a join is already in progress"))
+ return
+ else:
+ raise DHTError, "a join is already in progress"
+
+ if deferred:
+ self.joining = deferred
+ else:
+ self.joining = defer.Deferred()
+
+ if self.config is None:
+ self.joining.errback(DHTError("configuration not loaded"))
+ return self.joining
# Create the new khashmir instance
# Create the new khashmir instance
- self.khashmir = Khashmir(self.config, self.cache_dir)
-
- self.joining = defer.Deferred()
+ if not self.khashmir:
+ self.khashmir = Khashmir(self.config, self.cache_dir)
+
for node in self.bootstrap:
host, port = node.rsplit(':', 1)
port = int(port)
for node in self.bootstrap:
host, port = node.rsplit(':', 1)
port = int(port)
def _join_complete(self, result):
"""End the joining process and return the addresses found for this node."""
def _join_complete(self, result):
"""End the joining process and return the addresses found for this node."""
- if not self.joined and len(result) > 0:
+ if not self.joined and len(result) > 1:
self.joined = True
if self.joining and self.outstandingJoins <= 0:
df = self.joining
self.joined = True
if self.joining and self.outstandingJoins <= 0:
df = self.joining
self.joined = True
df.callback(self.foundAddrs)
else:
self.joined = True
df.callback(self.foundAddrs)
else:
- df.errback(DHTError('could not find any nodes to bootstrap to'))
+ # Try to join later using exponential backoff delays
+ log.msg('Join failed, retrying in %d seconds' % self.next_rejoin)
+ reactor.callLater(self.next_rejoin, self.join, df)
+ self.next_rejoin *= 2
def getAddrs(self):
"""Get the list of addresses returned by bootstrap nodes for this node."""
def getAddrs(self):
"""Get the list of addresses returned by bootstrap nodes for this node."""
def getValue(self, key):
"""See L{apt_p2p.interfaces.IDHT}."""
def getValue(self, key):
"""See L{apt_p2p.interfaces.IDHT}."""
- raise DHTError, "configuration not loaded"
+ d.errback(DHTError("configuration not loaded"))
+ return d
- raise DHTError, "have not joined a network yet"
+ d.errback(DHTError("have not joined a network yet"))
+ return d
if key not in self.retrieving:
self.khashmir.valueForKey(key, self._getValue)
self.retrieving.setdefault(key, []).append(d)
if key not in self.retrieving:
self.khashmir.valueForKey(key, self._getValue)
self.retrieving.setdefault(key, []).append(d)
def storeValue(self, key, value):
"""See L{apt_p2p.interfaces.IDHT}."""
def storeValue(self, key, value):
"""See L{apt_p2p.interfaces.IDHT}."""
- raise DHTError, "configuration not loaded"
+ d.errback(DHTError("configuration not loaded"))
+ return d
- raise DHTError, "have not joined a network yet"
+ d.errback(DHTError("have not joined a network yet"))
+ return d
key = self._normKey(key)
bvalue = bencode(value)
key = self._normKey(key)
bvalue = bencode(value)
if key in self.storing and bvalue in self.storing[key]:
raise DHTError, "already storing that key with the same value"
if key in self.storing and bvalue in self.storing[key]:
raise DHTError, "already storing that key with the same value"
self.khashmir.storeValueForKey(key, bvalue, self._storeValue)
self.storing.setdefault(key, {})[bvalue] = d
return d
self.khashmir.storeValueForKey(key, bvalue, self._storeValue)
self.storing.setdefault(key, {})[bvalue] = d
return d
class TestSimpleDHT(unittest.TestCase):
"""Simple 2-node unit tests for the DHT."""
class TestSimpleDHT(unittest.TestCase):
"""Simple 2-node unit tests for the DHT."""
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
def test_bootstrap_join(self):
d = self.a.join()
return d
def test_bootstrap_join(self):
d = self.a.join()
return d
+
+ def test_failed_join(self):
+ from krpc import KrpcError
+ d = self.b.join()
+ reactor.callLater(30, self.a.join)
+ def no_errors(result, self = self):
+ self.flushLoggedErrors(KrpcError)
+ return result
+ d.addCallback(no_errors)
+ return d
def node_join(self, result):
d = self.b.join()
def node_join(self, result):
d = self.b.join()
class TestMultiDHT(unittest.TestCase):
"""More complicated 20-node tests for the DHT."""
class TestMultiDHT(unittest.TestCase):
"""More complicated 20-node tests for the DHT."""
num = 20
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
num = 20
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,