X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_p2p%2FPeerManager.py;h=cfc0f78103e5a2ce8fed2f0084d71cf2c8f253c3;hp=deeb668a54c4ae670a4e501c5e106fe7b6a300ff;hb=95453ed929f4b9faa3ddb4702ceea9d01784e0e6;hpb=b8e34cf19e0fca0443497ae461cb1b27a0d94b19 diff --git a/apt_p2p/PeerManager.py b/apt_p2p/PeerManager.py index deeb668..cfc0f78 100644 --- a/apt_p2p/PeerManager.py +++ b/apt_p2p/PeerManager.py @@ -8,219 +8,21 @@ from binascii import b2a_hex, a2b_hex import sha from twisted.internet import reactor, defer -from twisted.python import log, filepath +from twisted.python import log from twisted.trial import unittest from twisted.web2 import stream from twisted.web2.http import Response, splitHostPort from HTTPDownloader import Peer +from Streams import GrowingFileStream, StreamToFile from util import uncompact from Hash import PIECE_SIZE from apt_p2p_Khashmir.bencode import bdecode from apt_p2p_conf import config - class PeerError(Exception): """An error occurred downloading from peers.""" -class GrowingFileStream(stream.FileStream): - """Modified to stream data from a file as it becomes available. - - @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time - @ivar deferred: waiting for the result of the last read attempt - @ivar available: the number of bytes that are currently available to read - @ivar position: the current position in the file where the next read will begin - @ivar finished: True when no more data will be coming available - """ - - CHUNK_SIZE = 4*1024 - - def __init__(self, f, length = None): - stream.FileStream.__init__(self, f) - self.length = length - self.deferred = None - self.available = 0L - self.position = 0L - self.finished = False - - def updateAvailable(self, newlyAvailable): - """Update the number of bytes that are available. - - Call it with 0 to trigger reading of a fully read file. - - @param newlyAvailable: the number of bytes that just became available - """ - assert not self.finished - self.available += newlyAvailable - - # If a read is pending, let it go - if self.deferred and self.position < self.available: - # Try to read some data from the file - length = self.available - self.position - readSize = min(length, self.CHUNK_SIZE) - self.f.seek(self.position) - b = self.f.read(readSize) - bytesRead = len(b) - - # Check if end of file was reached - if bytesRead: - self.position += bytesRead - deferred = self.deferred - self.deferred = None - deferred.callback(b) - - def allAvailable(self): - """Indicate that no more data will be coming available.""" - self.finished = True - - # If a read is pending, let it go - if self.deferred: - if self.position < self.available: - # Try to read some data from the file - length = self.available - self.position - readSize = min(length, self.CHUNK_SIZE) - self.f.seek(self.position) - b = self.f.read(readSize) - bytesRead = len(b) - - # Check if end of file was reached - if bytesRead: - self.position += bytesRead - deferred = self.deferred - self.deferred = None - deferred.callback(b) - else: - # We're done - self._close() - deferred = self.deferred - self.deferred = None - deferred.callback(None) - else: - # We're done - self._close() - deferred = self.deferred - self.deferred = None - deferred.callback(None) - - def read(self, sendfile=False): - assert not self.deferred, "A previous read is still deferred." - - if self.f is None: - return None - - length = self.available - self.position - readSize = min(length, self.CHUNK_SIZE) - - # If we don't have any available, we're done or deferred - if readSize <= 0: - if self.finished: - self._close() - return None - else: - self.deferred = defer.Deferred() - return self.deferred - - # Try to read some data from the file - self.f.seek(self.position) - b = self.f.read(readSize) - bytesRead = len(b) - if not bytesRead: - # End of file was reached, we're done or deferred - if self.finished: - self._close() - return None - else: - self.deferred = defer.Deferred() - return self.deferred - else: - self.position += bytesRead - return b - - def _close(self): - """Close the temporary file and remove it.""" - self.f.close() - filepath.FilePath(self.f.name).remove() - self.f = None - -class StreamToFile: - """Save a stream to a partial file and hash it. - - @type stream: L{twisted.web2.stream.IByteStream} - @ivar stream: the input stream being read - @type outFile: L{twisted.python.filepath.FilePath} - @ivar outFile: the file being written - @type hasher: hashing object, e.g. C{sha1} - @ivar hasher: the hash object for the data - @type position: C{int} - @ivar position: the current file position to write the next data to - @type length: C{int} - @ivar length: the position in the file to not write beyond - @type doneDefer: L{twisted.internet.defer.Deferred} - @ivar doneDefer: the deferred that will fire when done writing - """ - - def __init__(self, hasher, inputStream, outFile, start = 0, length = None): - """Initializes the file. - - @type hasher: hashing object, e.g. C{sha1} - @param hasher: the hash object for the data - @type inputStream: L{twisted.web2.stream.IByteStream} - @param inputStream: the input stream to read from - @type outFile: L{twisted.python.filepath.FilePath} - @param outFile: the file to write to - @type start: C{int} - @param start: the file position to start writing at - (optional, defaults to the start of the file) - @type length: C{int} - @param length: the maximum amount of data to write to the file - (optional, defaults to not limiting the writing to the file - """ - self.stream = inputStream - self.outFile = outFile - self.hasher = hasher - self.position = start - self.length = None - if length is not None: - self.length = start + length - self.doneDefer = None - - def run(self): - """Start the streaming. - - @rtype: L{twisted.internet.defer.Deferred} - """ - self.doneDefer = stream.readStream(self.stream, self._gotData) - self.doneDefer.addCallbacks(self._done, self._error) - return self.doneDefer - - def _gotData(self, data): - """Process the received data.""" - if self.outFile.closed: - raise PeerError, "outFile was unexpectedly closed" - - if data is None: - raise PeerError, "Data is None?" - - # Make sure we don't go too far - if self.length is not None and self.position + len(data) > self.length: - data = data[:(self.length - self.position)] - - # Write and hash the streamed data - self.outFile.seek(self.position) - self.outFile.write(data) - self.hasher.update(data) - self.position += len(data) - - def _done(self, result): - """Return the result.""" - return self.hasher.digest() - - def _error(self, err): - """Log the error.""" - log.msg('Streaming error') - log.err(err) - return err - class FileDownload: """Manage a download from a list of peers or a mirror. @@ -370,7 +172,7 @@ class FileDownload: # del self.peers[site] # Start the DHT lookup - lookupDefer = self.manager.dht.getValue(key) + lookupDefer = self.manager.dht.get(key) lookupDefer.addBoth(self._getDHTPieces, key) def _getDHTPieces(self, results, key): @@ -528,6 +330,11 @@ class FileDownload: #{ Downloading the pieces def getPieces(self): """Download the next pieces from the peers.""" + if self.file.closed: + log.msg('Download has been aborted for %s' % self.path) + self.stream.allAvailable(remove = True) + return + self.sort() piece = self.nextFinish while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces): @@ -554,7 +361,7 @@ class FileDownload: # Check if we're done if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces): log.msg('Download is complete for %s' % self.path) - self.stream.allAvailable() + self.stream.allAvailable(remove = True) def _getPiece(self, response, piece, peer): """Process the retrieved headers from the peer.""" @@ -606,9 +413,9 @@ class FileDownload: self.getPieces() log.err(err) - def _gotPiece(self, response, piece, peer): + def _gotPiece(self, hash, piece, peer): """Process the retrieved piece from the peer.""" - if self.pieces[piece] and response != self.pieces[piece]: + if self.pieces[piece] and hash.digest() != self.pieces[piece]: # Hash doesn't match log.msg('Hash error for piece %d from peer %r' % (piece, peer)) peer.hashError('Piece received from peer does not match expected') @@ -626,7 +433,7 @@ class FileDownload: def _gotError(self, err, piece, peer): """Piece download failed, try again.""" - log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response)) + log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, err)) log.err(err) self.completePieces[piece] = False self.getPieces() @@ -636,7 +443,7 @@ class PeerManager: @type cache_dir: L{twisted.python.filepath.FilePath} @ivar cache_dir: the directory to use for storing all files - @type dht: L{interfaces.IDHT} + @type dht: L{DHTManager.DHT} @ivar dht: the DHT instance @type stats: L{stats.StatsLogger} @ivar stats: the statistics logger to record sent data to