From 695f86cd055c50859f1a9919e96756fb3aa4c429 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Wed, 9 Apr 2008 17:27:29 -0700 Subject: [PATCH] More multiple peer downloading work, finished but still untested. --- apt_p2p/HTTPDownloader.py | 2 - apt_p2p/PeerManager.py | 214 ++++++++++++++++++++++++++++++-------- 2 files changed, 169 insertions(+), 47 deletions(-) diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index 2a53897..e3eb66e 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -105,8 +105,6 @@ class Peer(ClientFactory): req = self.response_queue.pop(0) log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code)) self._completed += 1 - if resp.code >= 400: - self._errors += 1 now = datetime.now() self._responseTimes.append((now, now - req.submissionTime)) self._lastResponse = (now, resp.stream.length) diff --git a/apt_p2p/PeerManager.py b/apt_p2p/PeerManager.py index 90a35c5..0f64e8f 100644 --- a/apt_p2p/PeerManager.py +++ b/apt_p2p/PeerManager.py @@ -11,7 +11,7 @@ from twisted.internet import reactor, defer from twisted.python import log from twisted.trial import unittest from twisted.web2 import stream -from twisted.web2.http import splitHostPort +from twisted.web2.http import Response, splitHostPort from HTTPDownloader import Peer from util import uncompact @@ -65,7 +65,7 @@ class GrowingFileStream(stream.FileStream): deferred.callback(b) def allAvailable(self): - """Indicate that no more data is coming available.""" + """Indicate that no more data will be coming available.""" self.finished = True # If a read is pending, let it go @@ -124,74 +124,126 @@ class GrowingFileStream(stream.FileStream): return b class StreamToFile(defer.Deferred): - """Saves a stream to a file. + """Save a stream to a partial file and hash it. @type stream: L{twisted.web2.stream.IByteStream} @ivar stream: the input stream being read @type outFile: L{twisted.python.filepath.FilePath} @ivar outFile: the file being written - @type hash: L{Hash.HashObject} - @ivar hash: the hash object for the file + @type hash: C{sha1} + @ivar hash: the hash object for the data + @type position: C{int} + @ivar position: the current file position to write the next data to @type length: C{int} - @ivar length: the length of the original (compressed) file + @ivar length: the position in the file to not write beyond @type doneDefer: L{twisted.internet.defer.Deferred} - @ivar doneDefer: the deferred that will fire when done streaming + @ivar doneDefer: the deferred that will fire when done writing """ - def __init__(self, inputStream, outFile, hash, start, length): + def __init__(self, inputStream, outFile, start = 0, length = None): """Initializes the file. @type inputStream: L{twisted.web2.stream.IByteStream} @param inputStream: the input stream to read from @type outFile: L{twisted.python.filepath.FilePath} @param outFile: the file to write to - @type hash: L{Hash.HashObject} - @param hash: the hash object to use for the file + @type start: C{int} + @param start: the file position to start writing at + (optional, defaults to the start of the file) + @type length: C{int} + @param length: the maximum amount of data to write to the file + (optional, defaults to not limiting the writing to the file """ self.stream = inputStream self.outFile = outFile - self.hash = hash - self.hash.new() - self.length = self.stream.length + self.hash = sha.new() + self.position = start + self.length = None + if length is not None: + self.length = start + length self.doneDefer = None def run(self): - """Start the streaming.""" - self.doneDefer = stream.readStream(self.stream, _gotData) + """Start the streaming. + + @rtype: L{twisted.internet.defer.Deferred} + """ + self.doneDefer = stream.readStream(self.stream, self._gotData) self.doneDefer.addCallbacks(self._done, self._error) return self.doneDefer - def _done(self): - """Close all the output files, return the result.""" - if not self.outFile.closed: - self.outFile.close() - self.hash.digest() - self.doneDefer.callback(self.hash) - def _gotData(self, data): + """Process the received data.""" if self.outFile.closed: - return + raise Exception, "outFile was unexpectedly closed" if data is None: - self._done() + raise Exception, "Data is None?" + + # Make sure we don't go too far + if self.length is not None and self.position + len(data) > self.length: + data = data[:(self.length - self.position)] # Write and hash the streamed data + self.outFile.seek(self.position) self.outFile.write(data) self.hash.update(data) + self.position += len(data) + def _done(self, result): + """Return the result.""" + return self.hash.digest() + + def _error(self, err): + """Log the error.""" + log.err(err) + return err + class FileDownload: """Manage a download from a list of peers or a mirror. - + @type manager: L{PeerManager} + @ivar manager: the manager to send requests for peers to + @type hash: L{Hash.HashObject} + @ivar hash: the hash object containing the expected hash for the file + @ivar mirror: the URI of the file on the mirror + @type compact_peers: C{list} of C{dictionary} + @ivar compact_peers: a list of the peer info where the file can be found + @type file: C{file} + @ivar file: the open file to right the download to + @type path: C{string} + @ivar path: the path to request from peers to access the file + @type pieces: C{list} of C{string} + @ivar pieces: the hashes of the pieces in the file + @type started: C{boolean} + @ivar started: whether the download has begun yet + @type defer: L{twisted.internet.defer.Deferred} + @ivar defer: the deferred that will callback with the result of the download + @type peers: C{dictionary} + @ivar peers: information about each of the peers available to download from + @type outstanding: C{int} + @ivar outstanding: the number of requests to peers currently outstanding + @type peerlist: C{list} of L{HTTPDownloader.Peer} + @ivar peerlist: the sorted list of peers for this download + @type stream: L{GrowingFileStream} + @ivar stream: the stream of resulting data from the download + @type nextFinish: C{int} + @ivar nextFinish: the next piece that is needed to finish for the stream + @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer} + @ivar completePieces: one per piece, will be False if no requests are + outstanding for the piece, True if the piece has been successfully + downloaded, or the Peer that a request for this piece has been sent """ def __init__(self, manager, hash, mirror, compact_peers, file): """Initialize the instance and check for piece hashes. + @type manager: L{PeerManager} + @param manager: the manager to send requests for peers to @type hash: L{Hash.HashObject} @param hash: the hash object containing the expected hash for the file @param mirror: the URI of the file on the mirror - @type compact_peers: C{list} of C{string} + @type compact_peers: C{list} of C{dictionary} @param compact_peers: a list of the peer info where the file can be found @type file: L{twisted.python.filepath.FilePath} @param file: the temporary file to use to store the downloaded file @@ -259,6 +311,7 @@ class FileDownload: # Find the most popular piece string if num == max_found: self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)] + log.msg('Peer info contained %d piece hashes' % len(self.pieces)) self.startDownload() break elif max_found == max(pieces_hash.values()): @@ -412,39 +465,110 @@ class FileDownload: self.defer.callback(self.peerlist[0].get(self.path)) return - self.sort() + # Start sending the return file + self.stream = GrowingFileStream(self.file) + resp = Response(200, {}, self.stream) + self.defer.callback(resp) + + # Begin to download the pieces self.outstanding = 0 - self.next_piece = 0 + self.nextFinish = 0 + if self.pieces: + self.completePieces = [False for piece in self.pieces] + else: + self.completePieces = [False] + self.getPieces() - while self.outstanding < 3 and self.peerlist and self.next_piece < len(self.pieces): - peer = self.peerlist.pop() - piece = self.next_piece - self.next_piece += 1 - - self.outstanding += 1 - df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1) - df.addCallbacks(self._gotPiece, self._gotError, - callbackArgs=(piece, peer), errbackArgs=(piece, peer)) + #{ Downloading the pieces + def getPieces(self): + """Download the next pieces from the peers.""" + self.sort() + piece = self.nextFinish + while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces): + if self.completePieces[piece] == False: + # Send a request to the highest ranked peer + peer = self.peerlist.pop() + self.completePieces[piece] = peer + + self.outstanding += 1 + if self.pieces: + df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1) + else: + df = peer.get(self.path) + reactor.callLater(0, df.addCallbacks, + *(self._getPiece, self._getError), + **{'callbackArgs': (piece, peer), + 'errbackArgs': (piece, peer)}) + piece += 1 + + # Check if we're don + if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces): + self.stream.allAvailable() - def _gotPiece(self, response, piece, peer): - """Process the retrieved piece from the peer.""" - if response.code != 206: + def _getPiece(self, response, piece, peer): + """Process the retrieved headers from the peer.""" + if ((len(self.completePieces) > 1 and response.code != 206) or + (response.code not in (200, 206))): # Request failed, try a different peer - self.getPeerPieces(key, site) + peer.hashError() + self.completePieces[piece] = False + if response.stream and response.stream.length: + stream.readAndDiscard(response.stream) else: # Read the response stream to the file - df = StreamToFile(response.stream, self.file, self.hash, piece*PIECE_SIZE, PIECE_SIZE).run() - df.addCallbacks(self._gotPeerPieces, self._gotPeerError, - callbackArgs=(key, site), errbackArgs=(key, site)) + if response.code == 206: + df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run() + else: + df = StreamToFile(response.stream, self.file).run() + df.addCallbacks(self._gotPiece, self._gotError, + callbackArgs=(piece, peer), errbackArgs=(piece, peer)) - def _gotError(self, err, piece, peer): + self.outstanding -= 1 + self.peerlist.append(peer) + self.getPieces() + + def _getError(self, err, piece, peer): """Peer failed, try again.""" + self.outstanding -= 1 + self.peerlist.append(peer) + self.completePieces[piece] = False + self.getPieces() log.err(err) + def _gotPiece(self, response, piece, peer): + """Process the retrieved piece from the peer.""" + if ((self.pieces and response != self.pieces[piece]) or + (len(self.pieces) == 0 and response == self.hash.expected())): + # Hash doesn't match + peer.hashError() + self.completePieces[piece] = False + elif self.pieces: + # Successfully completed one of several pieces + self.completePieces[piece] = True + while (self.nextFinish < len(self.completePieces) and + self.completePieces[self.nextFinish] == True): + self.nextFinish += 1 + self.stream.updateAvailable(PIECE_SIZE) + else: + # Whole download (only one piece) is complete + self.completePieces[piece] = True + self.stream.updateAvailable(2**30) + + self.getPieces() + + def _gotError(self, err, piece, peer): + """Piece download failed, try again.""" + log.err(err) + self.completePieces[piece] = False + self.getPieces() class PeerManager: """Manage a set of peers and the requests to them. + @type cache_dir: L{twisted.python.filepath.FilePath} + @ivar cache_dir: the directory to use for storing all files + @type dht: L{interfaces.IDHT} + @ivar dht: the DHT instance @type clients: C{dictionary} @ivar clients: the available peers that have been previously contacted """ -- 2.39.5