X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p%2FPeerManager.py;h=d4c7aace9ad04dce45fceb1ac944c61803a7cc96;hb=3effe84f68e83e838bca36ae5b15069053c97e1b;hp=e9d1ecb590702de992bec32c14f3803fa0d6b6b9;hpb=c429a67c05afa54e5fe44607e5fe7c09fd35e81a;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/PeerManager.py b/apt_p2p/PeerManager.py index e9d1ecb..d4c7aac 100644 --- a/apt_p2p/PeerManager.py +++ b/apt_p2p/PeerManager.py @@ -19,6 +19,10 @@ from Hash import PIECE_SIZE from apt_p2p_Khashmir.bencode import bdecode from apt_p2p_conf import config + +class PeerError(Exception): + """An error occurred downloading from peers.""" + class GrowingFileStream(stream.FileStream): """Modified to stream data from a file as it becomes available. @@ -181,10 +185,10 @@ class StreamToFile: def _gotData(self, data): """Process the received data.""" if self.outFile.closed: - raise Exception, "outFile was unexpectedly closed" + raise PeerError, "outFile was unexpectedly closed" if data is None: - raise Exception, "Data is None?" + raise PeerError, "Data is None?" # Make sure we don't go too far if self.length is not None and self.position + len(data) > self.length: @@ -356,22 +360,26 @@ class FileDownload: # Start the DHT lookup lookupDefer = self.manager.dht.getValue(key) - lookupDefer.addCallback(self._getDHTPieces, key) + lookupDefer.addBoth(self._getDHTPieces, key) def _getDHTPieces(self, results, key): """Check the retrieved values.""" - for result in results: - # Make sure the hash matches the key - result_hash = sha.new(result.get('t', '')).digest() - if result_hash == key: - pieces = result['t'] - self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)] - log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces)) - self.startDownload() - return + if isinstance(results, list): + for result in results: + # Make sure the hash matches the key + result_hash = sha.new(result.get('t', '')).digest() + if result_hash == key: + pieces = result['t'] + self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)] + log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces)) + self.startDownload() + return + + log.msg('Could not retrieve the piece hashes from the DHT') + else: + log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, )) # Continue without the piece hashes - log.msg('Could not retrieve the piece hashes from the DHT') self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)] self.startDownload() @@ -394,7 +402,6 @@ class FileDownload: if self.pieces is None: # Send a request to one or more peers - log.msg('Checking for a peer to request piece hashes from') for site in self.peers: if self.peers[site].get('failed', False) != True: log.msg('Sending a piece hash request to %r' % (site, )) @@ -408,7 +415,6 @@ class FileDownload: if self.outstanding >= 4: break - log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding) if self.pieces is None and self.outstanding <= 0: # Continue without the piece hashes log.msg('Could not retrieve the piece hashes from the peers') @@ -420,7 +426,6 @@ class FileDownload: log.msg('Got a piece hash response %d from %r' % (response.code, site)) if response.code != 200: # Request failed, try a different peer - log.msg('Did not like response %d from %r' % (response.code, site)) self.getPeerPieces(key, site) else: # Read the response stream to a string @@ -494,8 +499,7 @@ class FileDownload: if parsed[0] == "http": site = splitHostPort(parsed[0], parsed[1]) self.mirror_path = urlunparse(('', '') + parsed[2:]) - peer = self.manager.getPeer(site) - peer.mirror = True + peer = self.manager.getPeer(site, mirror = True) self.peerlist.append(peer) # Special case if there's only one good peer left @@ -518,11 +522,9 @@ class FileDownload: #{ Downloading the pieces def getPieces(self): """Download the next pieces from the peers.""" - log.msg('Checking for more piece requests to send') self.sort() piece = self.nextFinish while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces): - log.msg('Checking piece %d' % piece) if self.completePieces[piece] == False: # Send a request to the highest ranked peer peer = self.peerlist.pop() @@ -540,7 +542,6 @@ class FileDownload: 'errbackArgs': (piece, peer)}) piece += 1 - log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces))) # Check if we're done if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces): log.msg('We seem to be done with all pieces') @@ -561,11 +562,14 @@ class FileDownload: # Read the response stream to the file log.msg('Streaming piece %d from peer %r' % (piece, peer)) if response.code == 206: - df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run() + df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, + PIECE_SIZE).run() else: df = StreamToFile(response.stream, self.file).run() - df.addCallbacks(self._gotPiece, self._gotError, - callbackArgs=(piece, peer), errbackArgs=(piece, peer)) + reactor.callLater(0, df.addCallbacks, + *(self._gotPiece, self._gotError), + **{'callbackArgs': (piece, peer), + 'errbackArgs': (piece, peer)}) self.outstanding -= 1 self.peerlist.append(peer) @@ -613,17 +617,20 @@ class PeerManager: @ivar cache_dir: the directory to use for storing all files @type dht: L{interfaces.IDHT} @ivar dht: the DHT instance + @type stats: L{stats.StatsLogger} + @ivar stats: the statistics logger to record sent data to @type clients: C{dictionary} @ivar clients: the available peers that have been previously contacted """ - def __init__(self, cache_dir, dht): + def __init__(self, cache_dir, dht, stats): """Initialize the instance.""" self.cache_dir = cache_dir self.cache_dir.restat(False) if not self.cache_dir.exists(): self.cache_dir.makedirs() self.dht = dht + self.stats = stats self.clients = {} def get(self, hash, mirror, peers = [], method="GET", modtime=None): @@ -649,8 +656,7 @@ class PeerManager: assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0] site = splitHostPort(parsed[0], parsed[1]) path = urlunparse(('', '') + parsed[2:]) - peer = self.getPeer(site) - peer.mirror = True + peer = self.getPeer(site, mirror = True) return peer.get(path, method, modtime) # elif len(peers) == 1: # site = uncompact(peers[0]['c']) @@ -662,14 +668,18 @@ class PeerManager: tmpfile = self.cache_dir.child(hash.hexexpected()) return FileDownload(self, hash, mirror, peers, tmpfile).run() - def getPeer(self, site): + def getPeer(self, site, mirror = False): """Create a new peer if necessary and return it. @type site: (C{string}, C{int}) @param site: the IP address and port of the peer + @param mirror: whether the peer is actually a mirror + (optional, defaults to False) """ if site not in self.clients: - self.clients[site] = Peer(site[0], site[1]) + self.clients[site] = Peer(site[0], site[1], self.stats) + if mirror: + self.clients[site].mirror = True return self.clients[site] def close(self):