# Start the DHT lookup
lookupDefer = self.manager.dht.getValue(key)
- lookupDefer.addCallback(self._getDHTPieces, key)
+ lookupDefer.addBoth(self._getDHTPieces, key)
def _getDHTPieces(self, results, key):
"""Check the retrieved values."""
- for result in results:
- # Make sure the hash matches the key
- result_hash = sha.new(result.get('t', '')).digest()
- if result_hash == key:
- pieces = result['t']
- self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
- log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
- self.startDownload()
- return
+ if isinstance(results, list):
+ for result in results:
+ # Make sure the hash matches the key
+ result_hash = sha.new(result.get('t', '')).digest()
+ if result_hash == key:
+ pieces = result['t']
+ self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+ log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
+ self.startDownload()
+ return
+
+ log.msg('Could not retrieve the piece hashes from the DHT')
+ else:
+ log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
# Continue without the piece hashes
- log.msg('Could not retrieve the piece hashes from the DHT')
self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
self.startDownload()
if parsed[0] == "http":
site = splitHostPort(parsed[0], parsed[1])
self.mirror_path = urlunparse(('', '') + parsed[2:])
- peer = self.manager.getPeer(site)
- peer.mirror = True
+ peer = self.manager.getPeer(site, mirror = True)
self.peerlist.append(peer)
# Special case if there's only one good peer left
# Read the response stream to the file
log.msg('Streaming piece %d from peer %r' % (piece, peer))
if response.code == 206:
- df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
+ df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
+ PIECE_SIZE).run()
else:
df = StreamToFile(response.stream, self.file).run()
- df.addCallbacks(self._gotPiece, self._gotError,
- callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+ reactor.callLater(0, df.addCallbacks,
+ *(self._gotPiece, self._gotError),
+ **{'callbackArgs': (piece, peer),
+ 'errbackArgs': (piece, peer)})
self.outstanding -= 1
self.peerlist.append(peer)
@ivar cache_dir: the directory to use for storing all files
@type dht: L{interfaces.IDHT}
@ivar dht: the DHT instance
+ @type stats: L{stats.StatsLogger}
+ @ivar stats: the statistics logger to record sent data to
@type clients: C{dictionary}
@ivar clients: the available peers that have been previously contacted
"""
- def __init__(self, cache_dir, dht):
+ def __init__(self, cache_dir, dht, stats):
"""Initialize the instance."""
self.cache_dir = cache_dir
self.cache_dir.restat(False)
if not self.cache_dir.exists():
self.cache_dir.makedirs()
self.dht = dht
+ self.stats = stats
self.clients = {}
def get(self, hash, mirror, peers = [], method="GET", modtime=None):
assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
site = splitHostPort(parsed[0], parsed[1])
path = urlunparse(('', '') + parsed[2:])
- peer = self.getPeer(site)
- peer.mirror = True
+ peer = self.getPeer(site, mirror = True)
return peer.get(path, method, modtime)
# elif len(peers) == 1:
# site = uncompact(peers[0]['c'])
tmpfile = self.cache_dir.child(hash.hexexpected())
return FileDownload(self, hash, mirror, peers, tmpfile).run()
- def getPeer(self, site):
+ def getPeer(self, site, mirror = False):
"""Create a new peer if necessary and return it.
@type site: (C{string}, C{int})
@param site: the IP address and port of the peer
+ @param mirror: whether the peer is actually a mirror
+ (optional, defaults to False)
"""
if site not in self.clients:
- self.clients[site] = Peer(site[0], site[1])
+ self.clients[site] = Peer(site[0], site[1], self.stats)
+ if mirror:
+ self.clients[site].mirror = True
return self.clients[site]
def close(self):