X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_p2p%2FPeerManager.py;h=cfc0f78103e5a2ce8fed2f0084d71cf2c8f253c3;hp=68a3c32a49304ab243840dbde2b4fbf09660b9d4;hb=95453ed929f4b9faa3ddb4702ceea9d01784e0e6;hpb=1b2b271f65329a6bfaf7a5c935b9971834662865 diff --git a/apt_p2p/PeerManager.py b/apt_p2p/PeerManager.py index 68a3c32..cfc0f78 100644 --- a/apt_p2p/PeerManager.py +++ b/apt_p2p/PeerManager.py @@ -14,197 +14,14 @@ from twisted.web2 import stream from twisted.web2.http import Response, splitHostPort from HTTPDownloader import Peer +from Streams import GrowingFileStream, StreamToFile from util import uncompact from Hash import PIECE_SIZE from apt_p2p_Khashmir.bencode import bdecode +from apt_p2p_conf import config -class GrowingFileStream(stream.FileStream): - """Modified to stream data from a file as it becomes available. - - @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time - @ivar deferred: waiting for the result of the last read attempt - @ivar available: the number of bytes that are currently available to read - @ivar position: the current position in the file where the next read will begin - @ivar finished: True when no more data will be coming available - """ - - CHUNK_SIZE = 4*1024 - - def __init__(self, f, length = None): - stream.FileStream.__init__(self, f) - self.length = length - self.deferred = None - self.available = 0L - self.position = 0L - self.finished = False - - def updateAvailable(self, newlyAvailable): - """Update the number of bytes that are available. - - Call it with 0 to trigger reading of a fully read file. - - @param newlyAvailable: the number of bytes that just became available - """ - assert not self.finished - self.available += newlyAvailable - - # If a read is pending, let it go - if self.deferred and self.position < self.available: - # Try to read some data from the file - length = self.available - self.position - readSize = min(length, self.CHUNK_SIZE) - self.f.seek(self.position) - b = self.f.read(readSize) - bytesRead = len(b) - - # Check if end of file was reached - if bytesRead: - self.position += bytesRead - deferred = self.deferred - self.deferred = None - deferred.callback(b) - - def allAvailable(self): - """Indicate that no more data will be coming available.""" - self.finished = True - - # If a read is pending, let it go - if self.deferred: - if self.position < self.available: - # Try to read some data from the file - length = self.available - self.position - readSize = min(length, self.CHUNK_SIZE) - self.f.seek(self.position) - b = self.f.read(readSize) - bytesRead = len(b) - - # Check if end of file was reached - if bytesRead: - self.position += bytesRead - deferred = self.deferred - self.deferred = None - deferred.callback(b) - else: - # We're done - deferred = self.deferred - self.deferred = None - deferred.callback(None) - else: - # We're done - deferred = self.deferred - self.deferred = None - deferred.callback(None) - - def read(self, sendfile=False): - assert not self.deferred, "A previous read is still deferred." - - if self.f is None: - return None - - length = self.available - self.position - readSize = min(length, self.CHUNK_SIZE) - - # If we don't have any available, we're done or deferred - if readSize <= 0: - if self.finished: - return None - else: - self.deferred = defer.Deferred() - return self.deferred - - # Try to read some data from the file - self.f.seek(self.position) - b = self.f.read(readSize) - bytesRead = len(b) - if not bytesRead: - # End of file was reached, we're done or deferred - if self.finished: - return None - else: - self.deferred = defer.Deferred() - return self.deferred - else: - self.position += bytesRead - return b - -class StreamToFile: - """Save a stream to a partial file and hash it. - - @type stream: L{twisted.web2.stream.IByteStream} - @ivar stream: the input stream being read - @type outFile: L{twisted.python.filepath.FilePath} - @ivar outFile: the file being written - @type hash: C{sha1} - @ivar hash: the hash object for the data - @type position: C{int} - @ivar position: the current file position to write the next data to - @type length: C{int} - @ivar length: the position in the file to not write beyond - @type doneDefer: L{twisted.internet.defer.Deferred} - @ivar doneDefer: the deferred that will fire when done writing - """ - - def __init__(self, inputStream, outFile, start = 0, length = None): - """Initializes the file. - - @type inputStream: L{twisted.web2.stream.IByteStream} - @param inputStream: the input stream to read from - @type outFile: L{twisted.python.filepath.FilePath} - @param outFile: the file to write to - @type start: C{int} - @param start: the file position to start writing at - (optional, defaults to the start of the file) - @type length: C{int} - @param length: the maximum amount of data to write to the file - (optional, defaults to not limiting the writing to the file - """ - self.stream = inputStream - self.outFile = outFile - self.hash = sha.new() - self.position = start - self.length = None - if length is not None: - self.length = start + length - self.doneDefer = None - - def run(self): - """Start the streaming. - - @rtype: L{twisted.internet.defer.Deferred} - """ - log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position)) - self.doneDefer = stream.readStream(self.stream, self._gotData) - self.doneDefer.addCallbacks(self._done, self._error) - return self.doneDefer - - def _gotData(self, data): - """Process the received data.""" - if self.outFile.closed: - raise Exception, "outFile was unexpectedly closed" - - if data is None: - raise Exception, "Data is None?" - - # Make sure we don't go too far - if self.length is not None and self.position + len(data) > self.length: - data = data[:(self.length - self.position)] - - # Write and hash the streamed data - self.outFile.seek(self.position) - self.outFile.write(data) - self.hash.update(data) - self.position += len(data) - - def _done(self, result): - """Return the result.""" - log.msg('Streaming is complete') - return self.hash.digest() - - def _error(self, err): - """Log the error.""" - log.msg('Streaming error') - log.err(err) - return err +class PeerError(Exception): + """An error occurred downloading from peers.""" class FileDownload: """Manage a download from a list of peers or a mirror. @@ -261,6 +78,8 @@ class FileDownload: self.compact_peers = compact_peers self.path = '/~/' + quote_plus(hash.expected()) + self.defer = None + self.mirror_path = None self.pieces = None self.started = False @@ -312,7 +131,7 @@ class FileDownload: if max_found == no_pieces: # The file is not split into pieces log.msg('No pieces were found for the file') - self.pieces = [] + self.pieces = [self.hash.expected()] self.startDownload() elif max_found == max(pieces_string.values()): # Small number of pieces in a string @@ -353,24 +172,28 @@ class FileDownload: # del self.peers[site] # Start the DHT lookup - lookupDefer = self.manager.dht.getValue(key) - lookupDefer.addCallback(self._getDHTPieces, key) + lookupDefer = self.manager.dht.get(key) + lookupDefer.addBoth(self._getDHTPieces, key) def _getDHTPieces(self, results, key): """Check the retrieved values.""" - for result in results: - # Make sure the hash matches the key - result_hash = sha.new(result.get('t', '')).digest() - if result_hash == key: - pieces = result['t'] - self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)] - log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces)) - self.startDownload() - return + if isinstance(results, list): + for result in results: + # Make sure the hash matches the key + result_hash = sha.new(result.get('t', '')).digest() + if result_hash == key: + pieces = result['t'] + self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)] + log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces)) + self.startDownload() + return + + log.msg('Could not retrieve the piece hashes from the DHT') + else: + log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, )) # Continue without the piece hashes - log.msg('Could not retrieve the piece hashes from the DHT') - self.pieces = [] + self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)] self.startDownload() def getPeerPieces(self, key, failedSite = None): @@ -392,7 +215,6 @@ class FileDownload: if self.pieces is None: # Send a request to one or more peers - log.msg('Checking for a peer to request piece hashes from') for site in self.peers: if self.peers[site].get('failed', False) != True: log.msg('Sending a piece hash request to %r' % (site, )) @@ -403,14 +225,13 @@ class FileDownload: **{'callbackArgs': (key, site), 'errbackArgs': (key, site)}) self.outstanding += 1 - if self.outstanding >= 3: + if self.outstanding >= 4: break - log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding) if self.pieces is None and self.outstanding <= 0: # Continue without the piece hashes log.msg('Could not retrieve the piece hashes from the peers') - self.pieces = [] + self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)] self.startDownload() def _getPeerPieces(self, response, key, site): @@ -418,7 +239,6 @@ class FileDownload: log.msg('Got a piece hash response %d from %r' % (response.code, site)) if response.code != 200: # Request failed, try a different peer - log.msg('Did not like response %d from %r' % (response.code, site)) self.getPeerPieces(key, site) else: # Read the response stream to a string @@ -483,37 +303,41 @@ class FileDownload: log.msg('Starting to download %s' % self.path) self.started = True - assert self.pieces is not None, "You must initialize the piece hashes first" + assert self.pieces, "You must initialize the piece hashes first" self.peerlist = [self.peers[site]['peer'] for site in self.peers] + # Use the mirror if there are few peers + if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'): + parsed = urlparse(self.mirror) + if parsed[0] == "http": + site = splitHostPort(parsed[0], parsed[1]) + self.mirror_path = urlunparse(('', '') + parsed[2:]) + peer = self.manager.getPeer(site, mirror = True) + self.peerlist.append(peer) + # Special case if there's only one good peer left - if len(self.peerlist) == 1: - log.msg('Downloading from peer %r' % (self.peerlist[0], )) - self.defer.callback(self.peerlist[0].get(self.path)) - return +# if len(self.peerlist) == 1: +# log.msg('Downloading from peer %r' % (self.peerlist[0], )) +# self.defer.callback(self.peerlist[0].get(self.path)) +# return - # Start sending the return file - self.stream = GrowingFileStream(self.file, self.hash.expSize) - resp = Response(200, {}, self.stream) - self.defer.callback(resp) - # Begin to download the pieces self.outstanding = 0 self.nextFinish = 0 - if self.pieces: - self.completePieces = [False for piece in self.pieces] - else: - self.completePieces = [False] + self.completePieces = [False for piece in self.pieces] self.getPieces() #{ Downloading the pieces def getPieces(self): """Download the next pieces from the peers.""" - log.msg('Checking for more piece requests to send') + if self.file.closed: + log.msg('Download has been aborted for %s' % self.path) + self.stream.allAvailable(remove = True) + return + self.sort() piece = self.nextFinish while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces): - log.msg('Checking piece %d' % piece) if self.completePieces[piece] == False: # Send a request to the highest ranked peer peer = self.peerlist.pop() @@ -521,25 +345,26 @@ class FileDownload: log.msg('Sending a request for piece %d to peer %r' % (piece, peer)) self.outstanding += 1 - if self.pieces: - df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1) + path = self.path + if peer.mirror: + path = self.mirror_path + if len(self.completePieces) > 1: + df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1) else: - df = peer.get(self.path) + df = peer.get(path) reactor.callLater(0, df.addCallbacks, *(self._getPiece, self._getError), **{'callbackArgs': (piece, peer), 'errbackArgs': (piece, peer)}) piece += 1 - log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces))) # Check if we're done if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces): - log.msg('We seem to be done with all pieces') - self.stream.allAvailable() + log.msg('Download is complete for %s' % self.path) + self.stream.allAvailable(remove = True) def _getPiece(self, response, piece, peer): """Process the retrieved headers from the peer.""" - log.msg('Got response for piece %d from peer %r' % (piece, peer)) if ((len(self.completePieces) > 1 and response.code != 206) or (response.code not in (200, 206))): # Request failed, try a different peer @@ -549,14 +374,31 @@ class FileDownload: if response.stream and response.stream.length: stream.readAndDiscard(response.stream) else: + if self.defer: + # Start sending the return file + df = self.defer + self.defer = None + self.stream = GrowingFileStream(self.file, self.hash.expSize) + + # Get the headers from the peer's response + headers = {} + if response.headers.hasHeader('last-modified'): + headers['last-modified'] = response.headers.getHeader('last-modified') + resp = Response(200, headers, self.stream) + df.callback(resp) + # Read the response stream to the file log.msg('Streaming piece %d from peer %r' % (piece, peer)) if response.code == 206: - df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run() + df = StreamToFile(self.hash.newPieceHasher(), response.stream, + self.file, piece*PIECE_SIZE, PIECE_SIZE).run() else: - df = StreamToFile(response.stream, self.file).run() - df.addCallbacks(self._gotPiece, self._gotError, - callbackArgs=(piece, peer), errbackArgs=(piece, peer)) + df = StreamToFile(self.hash.newHasher(), response.stream, + self.file).run() + reactor.callLater(0, df.addCallbacks, + *(self._gotPiece, self._gotError), + **{'callbackArgs': (piece, peer), + 'errbackArgs': (piece, peer)}) self.outstanding -= 1 self.peerlist.append(peer) @@ -571,16 +413,14 @@ class FileDownload: self.getPieces() log.err(err) - def _gotPiece(self, response, piece, peer): + def _gotPiece(self, hash, piece, peer): """Process the retrieved piece from the peer.""" - log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response)) - if ((self.pieces and response != self.pieces[piece]) or - (len(self.pieces) == 0 and response != self.hash.expected())): + if self.pieces[piece] and hash.digest() != self.pieces[piece]: # Hash doesn't match log.msg('Hash error for piece %d from peer %r' % (piece, peer)) peer.hashError('Piece received from peer does not match expected') self.completePieces[piece] = False - elif self.pieces: + else: # Successfully completed one of several pieces log.msg('Finished with piece %d from peer %r' % (piece, peer)) self.completePieces[piece] = True @@ -588,18 +428,12 @@ class FileDownload: self.completePieces[self.nextFinish] == True): self.nextFinish += 1 self.stream.updateAvailable(PIECE_SIZE) - else: - # Whole download (only one piece) is complete - log.msg('Piece %d from peer %r is the last piece' % (piece, peer)) - self.completePieces[piece] = True - self.nextFinish = 1 - self.stream.updateAvailable(2**30) self.getPieces() def _gotError(self, err, piece, peer): """Piece download failed, try again.""" - log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response)) + log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, err)) log.err(err) self.completePieces[piece] = False self.getPieces() @@ -609,19 +443,22 @@ class PeerManager: @type cache_dir: L{twisted.python.filepath.FilePath} @ivar cache_dir: the directory to use for storing all files - @type dht: L{interfaces.IDHT} + @type dht: L{DHTManager.DHT} @ivar dht: the DHT instance + @type stats: L{stats.StatsLogger} + @ivar stats: the statistics logger to record sent data to @type clients: C{dictionary} @ivar clients: the available peers that have been previously contacted """ - def __init__(self, cache_dir, dht): + def __init__(self, cache_dir, dht, stats): """Initialize the instance.""" self.cache_dir = cache_dir self.cache_dir.restat(False) if not self.cache_dir.exists(): self.cache_dir.makedirs() self.dht = dht + self.stats = stats self.clients = {} def get(self, hash, mirror, peers = [], method="GET", modtime=None): @@ -647,26 +484,30 @@ class PeerManager: assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0] site = splitHostPort(parsed[0], parsed[1]) path = urlunparse(('', '') + parsed[2:]) - peer = self.getPeer(site) + peer = self.getPeer(site, mirror = True) return peer.get(path, method, modtime) - elif len(peers) == 1: - site = uncompact(peers[0]['c']) - log.msg('Downloading from peer %r' % (site, )) - path = '/~/' + quote_plus(hash.expected()) - peer = self.getPeer(site) - return peer.get(path) +# elif len(peers) == 1: +# site = uncompact(peers[0]['c']) +# log.msg('Downloading from peer %r' % (site, )) +# path = '/~/' + quote_plus(hash.expected()) +# peer = self.getPeer(site) +# return peer.get(path) else: tmpfile = self.cache_dir.child(hash.hexexpected()) return FileDownload(self, hash, mirror, peers, tmpfile).run() - def getPeer(self, site): + def getPeer(self, site, mirror = False): """Create a new peer if necessary and return it. @type site: (C{string}, C{int}) @param site: the IP address and port of the peer + @param mirror: whether the peer is actually a mirror + (optional, defaults to False) """ if site not in self.clients: - self.clients[site] = Peer(site[0], site[1]) + self.clients[site] = Peer(site[0], site[1], self.stats) + if mirror: + self.clients[site].mirror = True return self.clients[site] def close(self):