From: Cameron Dale Date: Fri, 25 Apr 2008 05:18:29 +0000 (-0700) Subject: Move all streams to new Streams module and replace ProxyFileStream with GrowingFileSt... X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=05422476cb06c6ccd2def7709a251e618e1eafb3;p=quix0rs-apt-p2p.git Move all streams to new Streams module and replace ProxyFileStream with GrowingFileStream. Instead of ProxyFileStream, now use a combination of GrowingFileStream and StreamToFile like the PeerManager does to download and upload. It's mostly working, only some errors when aborting apt-get in test 'a'. Also added a new test 'c' for downloading out of order, which is why ProxyFileStream can not be used. --- diff --git a/apt_p2p/CacheManager.py b/apt_p2p/CacheManager.py index b991093..42f89d9 100644 --- a/apt_p2p/CacheManager.py +++ b/apt_p2p/CacheManager.py @@ -5,9 +5,6 @@ @var DECOMPRESS_FILES: a list of file names that need to be decompressed """ -from bz2 import BZ2Decompressor -from zlib import decompressobj, MAX_WBITS -from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT from urlparse import urlparse import os @@ -15,9 +12,9 @@ from twisted.python import log from twisted.python.filepath import FilePath from twisted.internet import defer, reactor from twisted.trial import unittest -from twisted.web2 import stream from twisted.web2.http import splitHostPort +from Streams import GrowingFileStream, StreamToFile from Hash import HashObject from apt_p2p_conf import config @@ -27,184 +24,6 @@ DECOMPRESS_FILES = ['release', 'sources', 'packages'] class CacheError(Exception): """Error occurred downloading a file to the cache.""" -class ProxyFileStream(stream.SimpleStream): - """Saves a stream to a file while providing a new stream. - - Also optionally decompresses the file while it is being downloaded. - - @type stream: L{twisted.web2.stream.IByteStream} - @ivar stream: the input stream being read - @type outFile: L{twisted.python.filepath.FilePath} - @ivar outFile: the file being written - @type hash: L{Hash.HashObject} - @ivar hash: the hash object for the file - @type gzfile: C{file} - @ivar gzfile: the open file to write decompressed gzip data to - @type gzdec: L{zlib.decompressobj} - @ivar gzdec: the decompressor to use for the compressed gzip data - @type gzheader: C{boolean} - @ivar gzheader: whether the gzip header still needs to be removed from - the zlib compressed data - @type bz2file: C{file} - @ivar bz2file: the open file to write decompressed bz2 data to - @type bz2dec: L{bz2.BZ2Decompressor} - @ivar bz2dec: the decompressor to use for the compressed bz2 data - @type length: C{int} - @ivar length: the length of the original (compressed) file - @type doneDefer: L{twisted.internet.defer.Deferred} - @ivar doneDefer: the deferred that will fire when done streaming - - @group Stream implementation: read, close - - """ - - def __init__(self, stream, outFile, hash, decompress = None, decFile = None): - """Initializes the proxy. - - @type stream: L{twisted.web2.stream.IByteStream} - @param stream: the input stream to read from - @type outFile: L{twisted.python.filepath.FilePath} - @param outFile: the file to write to - @type hash: L{Hash.HashObject} - @param hash: the hash object to use for the file - @type decompress: C{string} - @param decompress: also decompress the file as this type - (currently only '.gz' and '.bz2' are supported) - @type decFile: C{twisted.python.FilePath} - @param decFile: the file to write the decompressed data to - """ - self.stream = stream - self.outFile = outFile.open('w') - self.hash = hash - self.hash.new() - self.gzfile = None - self.bz2file = None - if decompress == ".gz": - self.gzheader = True - self.gzfile = decFile.open('w') - self.gzdec = decompressobj(-MAX_WBITS) - elif decompress == ".bz2": - self.bz2file = decFile.open('w') - self.bz2dec = BZ2Decompressor() - self.length = self.stream.length - self.doneDefer = defer.Deferred() - - def _done(self): - """Close all the output files, return the result.""" - if not self.outFile.closed: - self.outFile.close() - self.hash.digest() - if self.gzfile: - # Finish the decompression - data_dec = self.gzdec.flush() - self.gzfile.write(data_dec) - self.gzfile.close() - self.gzfile = None - if self.bz2file: - self.bz2file.close() - self.bz2file = None - - def _error(self, err): - """Close all the output files, return the error.""" - if not self.outFile.closed: - self._done() - self.stream.close() - self.doneDefer.errback(err) - - def read(self): - """Read some data from the stream.""" - if self.outFile.closed: - return None - - # Read data from the stream, deal with the possible deferred - data = self.stream.read() - if isinstance(data, defer.Deferred): - data.addCallbacks(self._write, self._error) - return data - - self._write(data) - return data - - def _write(self, data): - """Write the stream data to the file and return it for others to use. - - Also optionally decompresses it. - """ - if data is None: - if not self.outFile.closed: - self._done() - self.doneDefer.callback(self.hash) - return data - - # Write and hash the streamed data - self.outFile.write(data) - self.hash.update(data) - - if self.gzfile: - # Decompress the zlib portion of the file - if self.gzheader: - # Remove the gzip header junk - self.gzheader = False - new_data = self._remove_gzip_header(data) - dec_data = self.gzdec.decompress(new_data) - else: - dec_data = self.gzdec.decompress(data) - self.gzfile.write(dec_data) - if self.bz2file: - # Decompress the bz2 file - dec_data = self.bz2dec.decompress(data) - self.bz2file.write(dec_data) - - return data - - def _remove_gzip_header(self, data): - """Remove the gzip header from the zlib compressed data.""" - # Read, check & discard the header fields - if data[:2] != '\037\213': - raise IOError, 'Not a gzipped file' - if ord(data[2]) != 8: - raise IOError, 'Unknown compression method' - flag = ord(data[3]) - # modtime = self.fileobj.read(4) - # extraflag = self.fileobj.read(1) - # os = self.fileobj.read(1) - - skip = 10 - if flag & FEXTRA: - # Read & discard the extra field - xlen = ord(data[10]) - xlen = xlen + 256*ord(data[11]) - skip = skip + 2 + xlen - if flag & FNAME: - # Read and discard a null-terminated string containing the filename - while True: - if not data[skip] or data[skip] == '\000': - break - skip += 1 - skip += 1 - if flag & FCOMMENT: - # Read and discard a null-terminated string containing a comment - while True: - if not data[skip] or data[skip] == '\000': - break - skip += 1 - skip += 1 - if flag & FHCRC: - skip += 2 # Read & discard the 16-bit header CRC - - return data[skip:] - - def close(self): - """Clean everything up and return None to future reads.""" - log.msg('ProxyFileStream was prematurely closed after only %d/%d bytes' % (self.hash.size, self.length)) - if self.hash.size < self.length: - self._error(CacheError('Prematurely closed, all data was not written')) - elif not self.outFile.closed: - self._done() - self.doneDefer.callback(self.hash) - self.length = 0 - self.stream.close() - class CacheManager: """Manages all downloaded files and requests for cached objects. @@ -377,16 +196,21 @@ class CacheManager: # Create the new stream from the old one. orig_stream = response.stream - response.stream = ProxyFileStream(orig_stream, destFile, hash, ext, decFile) - response.stream.doneDefer.addCallback(self._save_complete, url, destFile, - response.headers.getHeader('Last-Modified'), - decFile) - response.stream.doneDefer.addErrback(self._save_error, url, destFile, decFile) + f = destFile.open('w+') + new_stream = GrowingFileStream(f, orig_stream.length) + hash.new() + df = StreamToFile(hash, orig_stream, f, notify = new_stream.updateAvailable, + decompress = ext, decFile = decFile).run() + df.addCallback(self._save_complete, url, destFile, new_stream, + response.headers.getHeader('Last-Modified'), decFile) + df.addErrback(self._save_error, url, destFile, new_stream, decFile) + response.stream = new_stream # Return the modified response with the new stream return response - def _save_complete(self, hash, url, destFile, modtime = None, decFile = None): + def _save_complete(self, hash, url, destFile, destStream = None, + modtime = None, decFile = None): """Update the modification time and inform the main program. @type hash: L{Hash.HashObject} @@ -394,6 +218,8 @@ class CacheManager: @param url: the URI of the actual mirror request @type destFile: C{twisted.python.FilePath} @param destFile: the file where the download was written to + @type destStream: L{Streams.GrowingFileStream} + @param destStream: the stream to notify that all data is available @type modtime: C{int} @param modtime: the modified time of the cached file (seconds since epoch) (optional, defaults to not setting the modification time of the file) @@ -403,6 +229,8 @@ class CacheManager: """ result = hash.verify() if result or result is None: + if destStream: + destStream.allAvailable() if modtime: os.utime(destFile.path, (modtime, modtime)) @@ -424,22 +252,26 @@ class CacheManager: decHash = HashObject() ext_len = len(destFile.path) - len(decFile.path) df = decHash.hashInThread(decFile) - df.addCallback(self._save_complete, url[:-ext_len], decFile, modtime) + df.addCallback(self._save_complete, url[:-ext_len], decFile, modtime = modtime) df.addErrback(self._save_error, url[:-ext_len], decFile) else: log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url)) - destFile.remove() + if destStream: + destStream.allAvailable(remove = True) if decFile: decFile.remove() - def _save_error(self, failure, url, destFile, decFile = None): + def _save_error(self, failure, url, destFile, destStream = None, decFile = None): """Remove the destination files.""" log.msg('Error occurred downloading %s' % url) log.err(failure) - destFile.restat(False) - if destFile.exists(): - log.msg('Removing the incomplete file: %s' % destFile.path) - destFile.remove() + if destStream: + destStream.allAvailable(remove = True) + else: + destFile.restat(False) + if destFile.exists(): + log.msg('Removing the incomplete file: %s' % destFile.path) + destFile.remove() if decFile: decFile.restat(False) if decFile.exists(): diff --git a/apt_p2p/HTTPServer.py b/apt_p2p/HTTPServer.py index 0c17d3f..652198f 100644 --- a/apt_p2p/HTTPServer.py +++ b/apt_p2p/HTTPServer.py @@ -13,6 +13,7 @@ from twisted.trial import unittest from twisted.python.filepath import FilePath from policies import ThrottlingFactory, ThrottlingProtocol, ProtocolWrapper +from Streams import UploadStream, FileUploadStream, PiecesUploadStream from apt_p2p_conf import config from apt_p2p_Khashmir.bencode import bencode @@ -86,61 +87,21 @@ class FileDownloader(static.File): return self.__class__(path, self.manager, self.defaultType, self.ignoredExts, self.processors, self.indexNames[:]) -class UploadStream: - """Identifier for streams that are uploaded to peers.""" - -class FileUploaderStream(stream.FileStream, UploadStream): - """Modified to make it suitable for streaming to peers. - - Streams the file in small chunks to make it easier to throttle the - streaming to peers. - - @ivar CHUNK_SIZE: the size of chunks of data to send at a time - """ - - CHUNK_SIZE = 4*1024 - - def read(self, sendfile=False): - if self.f is None: - return None - - length = self.length - if length == 0: - self.f = None - return None - - # Remove the SendFileBuffer and mmap use, just use string reads and writes - - readSize = min(length, self.CHUNK_SIZE) - - self.f.seek(self.start) - b = self.f.read(readSize) - bytesRead = len(b) - if not bytesRead: - raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length)) - else: - self.length -= bytesRead - self.start += bytesRead - return b - -class PiecesUploaderStream(stream.MemoryStream, UploadStream): - """Modified to identify it for streaming to peers.""" - class PiecesUploader(static.Data): """Modified to identify it for peer requests. - Uses the modified L{PieceUploaderStream} to stream the pieces for throttling. + Uses the modified L{Streams.PieceUploadStream} to stream the pieces for throttling. """ def render(self, req): return http.Response(responsecode.OK, http_headers.Headers({'content-type': self.contentType()}), - stream=PiecesUploaderStream(self.data)) + stream=PiecesUploadStream(self.data)) class FileUploader(static.File): """Modified to make it suitable for peer requests. - Uses the modified L{FileUploaderStream} to stream the file for throttling, + Uses the modified L{Streams.FileUploadStream} to stream the file for throttling, and doesn't do any listing of directory contents. """ @@ -165,7 +126,7 @@ class FileUploader(static.File): response = http.Response() # Use the modified FileStream - response.stream = FileUploaderStream(f, 0, self.fp.getsize()) + response.stream = FileUploadStream(f, 0, self.fp.getsize()) for (header, value) in ( ("content-type", self.contentType()), @@ -180,8 +141,7 @@ class UploadThrottlingProtocol(ThrottlingProtocol): """Protocol for throttling uploads. Determines whether or not to throttle the upload based on the type of stream. - Uploads use L{FileUploaderStream} or L{twisted.web2.stream.MemorySTream}, - apt uses L{CacheManager.ProxyFileStream} or L{twisted.web.stream.FileStream}. + Uploads use instances of L{Streams.UploadStream}. """ stats = None diff --git a/apt_p2p/PeerManager.py b/apt_p2p/PeerManager.py index 9e3f6da..cfc0f78 100644 --- a/apt_p2p/PeerManager.py +++ b/apt_p2p/PeerManager.py @@ -8,219 +8,21 @@ from binascii import b2a_hex, a2b_hex import sha from twisted.internet import reactor, defer -from twisted.python import log, filepath +from twisted.python import log from twisted.trial import unittest from twisted.web2 import stream from twisted.web2.http import Response, splitHostPort from HTTPDownloader import Peer +from Streams import GrowingFileStream, StreamToFile from util import uncompact from Hash import PIECE_SIZE from apt_p2p_Khashmir.bencode import bdecode from apt_p2p_conf import config - class PeerError(Exception): """An error occurred downloading from peers.""" -class GrowingFileStream(stream.FileStream): - """Modified to stream data from a file as it becomes available. - - @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time - @ivar deferred: waiting for the result of the last read attempt - @ivar available: the number of bytes that are currently available to read - @ivar position: the current position in the file where the next read will begin - @ivar finished: True when no more data will be coming available - """ - - CHUNK_SIZE = 4*1024 - - def __init__(self, f, length = None): - stream.FileStream.__init__(self, f) - self.length = length - self.deferred = None - self.available = 0L - self.position = 0L - self.finished = False - - def updateAvailable(self, newlyAvailable): - """Update the number of bytes that are available. - - Call it with 0 to trigger reading of a fully read file. - - @param newlyAvailable: the number of bytes that just became available - """ - assert not self.finished - self.available += newlyAvailable - - # If a read is pending, let it go - if self.deferred and self.position < self.available: - # Try to read some data from the file - length = self.available - self.position - readSize = min(length, self.CHUNK_SIZE) - self.f.seek(self.position) - b = self.f.read(readSize) - bytesRead = len(b) - - # Check if end of file was reached - if bytesRead: - self.position += bytesRead - deferred = self.deferred - self.deferred = None - deferred.callback(b) - - def allAvailable(self): - """Indicate that no more data will be coming available.""" - self.finished = True - - # If a read is pending, let it go - if self.deferred: - if self.position < self.available: - # Try to read some data from the file - length = self.available - self.position - readSize = min(length, self.CHUNK_SIZE) - self.f.seek(self.position) - b = self.f.read(readSize) - bytesRead = len(b) - - # Check if end of file was reached - if bytesRead: - self.position += bytesRead - deferred = self.deferred - self.deferred = None - deferred.callback(b) - else: - # We're done - self._close() - deferred = self.deferred - self.deferred = None - deferred.callback(None) - else: - # We're done - self._close() - deferred = self.deferred - self.deferred = None - deferred.callback(None) - - def read(self, sendfile=False): - assert not self.deferred, "A previous read is still deferred." - - if self.f is None: - return None - - length = self.available - self.position - readSize = min(length, self.CHUNK_SIZE) - - # If we don't have any available, we're done or deferred - if readSize <= 0: - if self.finished: - self._close() - return None - else: - self.deferred = defer.Deferred() - return self.deferred - - # Try to read some data from the file - self.f.seek(self.position) - b = self.f.read(readSize) - bytesRead = len(b) - if not bytesRead: - # End of file was reached, we're done or deferred - if self.finished: - self._close() - return None - else: - self.deferred = defer.Deferred() - return self.deferred - else: - self.position += bytesRead - return b - - def _close(self): - """Close the temporary file and remove it.""" - self.f.close() - filepath.FilePath(self.f.name).remove() - self.f = None - -class StreamToFile: - """Save a stream to a partial file and hash it. - - @type stream: L{twisted.web2.stream.IByteStream} - @ivar stream: the input stream being read - @type outFile: L{twisted.python.filepath.FilePath} - @ivar outFile: the file being written - @type hasher: hashing object, e.g. C{sha1} - @ivar hasher: the hash object for the data - @type position: C{int} - @ivar position: the current file position to write the next data to - @type length: C{int} - @ivar length: the position in the file to not write beyond - @type doneDefer: L{twisted.internet.defer.Deferred} - @ivar doneDefer: the deferred that will fire when done writing - """ - - def __init__(self, hasher, inputStream, outFile, start = 0, length = None): - """Initializes the file. - - @type hasher: hashing object, e.g. C{sha1} - @param hasher: the hash object for the data - @type inputStream: L{twisted.web2.stream.IByteStream} - @param inputStream: the input stream to read from - @type outFile: L{twisted.python.filepath.FilePath} - @param outFile: the file to write to - @type start: C{int} - @param start: the file position to start writing at - (optional, defaults to the start of the file) - @type length: C{int} - @param length: the maximum amount of data to write to the file - (optional, defaults to not limiting the writing to the file - """ - self.stream = inputStream - self.outFile = outFile - self.hasher = hasher - self.position = start - self.length = None - if length is not None: - self.length = start + length - self.doneDefer = None - - def run(self): - """Start the streaming. - - @rtype: L{twisted.internet.defer.Deferred} - """ - self.doneDefer = stream.readStream(self.stream, self._gotData) - self.doneDefer.addCallbacks(self._done, self._error) - return self.doneDefer - - def _gotData(self, data): - """Process the received data.""" - if self.outFile.closed: - raise PeerError, "outFile was unexpectedly closed" - - if data is None: - raise PeerError, "Data is None?" - - # Make sure we don't go too far - if self.length is not None and self.position + len(data) > self.length: - data = data[:(self.length - self.position)] - - # Write and hash the streamed data - self.outFile.seek(self.position) - self.outFile.write(data) - self.hasher.update(data) - self.position += len(data) - - def _done(self, result): - """Return the result.""" - return self.hasher.digest() - - def _error(self, err): - """Log the error.""" - log.msg('Streaming error') - log.err(err) - return err - class FileDownload: """Manage a download from a list of peers or a mirror. @@ -528,6 +330,11 @@ class FileDownload: #{ Downloading the pieces def getPieces(self): """Download the next pieces from the peers.""" + if self.file.closed: + log.msg('Download has been aborted for %s' % self.path) + self.stream.allAvailable(remove = True) + return + self.sort() piece = self.nextFinish while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces): @@ -554,7 +361,7 @@ class FileDownload: # Check if we're done if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces): log.msg('Download is complete for %s' % self.path) - self.stream.allAvailable() + self.stream.allAvailable(remove = True) def _getPiece(self, response, piece, peer): """Process the retrieved headers from the peer.""" @@ -606,9 +413,9 @@ class FileDownload: self.getPieces() log.err(err) - def _gotPiece(self, response, piece, peer): + def _gotPiece(self, hash, piece, peer): """Process the retrieved piece from the peer.""" - if self.pieces[piece] and response != self.pieces[piece]: + if self.pieces[piece] and hash.digest() != self.pieces[piece]: # Hash doesn't match log.msg('Hash error for piece %d from peer %r' % (piece, peer)) peer.hashError('Piece received from peer does not match expected') @@ -626,7 +433,7 @@ class FileDownload: def _gotError(self, err, piece, peer): """Piece download failed, try again.""" - log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response)) + log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, err)) log.err(err) self.completePieces[piece] = False self.getPieces() diff --git a/apt_p2p/Streams.py b/apt_p2p/Streams.py new file mode 100644 index 0000000..3eebf06 --- /dev/null +++ b/apt_p2p/Streams.py @@ -0,0 +1,382 @@ + +"""Modified streams that are used by Apt-P2P.""" + +from bz2 import BZ2Decompressor +from zlib import decompressobj, MAX_WBITS +from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT +import os + +from twisted.web2 import stream +from twisted.internet import defer +from twisted.python import log, filepath + +class StreamsError(Exception): + """An error occurred in the streaming.""" + +class GrowingFileStream(stream.SimpleStream): + """Modified to stream data from a file as it becomes available. + + @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time + @ivar deferred: waiting for the result of the last read attempt + @ivar available: the number of bytes that are currently available to read + @ivar position: the current position in the file where the next read will begin + @ivar closed: True if the reader has closed the stream + @ivar finished: True when no more data will be coming available + @ivar remove: whether to remove the file when streaming is complete + """ + + CHUNK_SIZE = 32*1024 + + def __init__(self, f, length = None): + self.f = f + self.length = length + self.deferred = None + self.available = 0L + self.position = 0L + self.closed = False + self.finished = False + self.remove = False + + #{ Stream interface + def read(self, sendfile=False): + assert not self.deferred, "A previous read is still deferred." + + if self.f is None: + return None + + length = self.available - self.position + readSize = min(length, self.CHUNK_SIZE) + + # If we don't have any available, we're done or deferred + if readSize <= 0: + if self.finished: + self._close() + return None + else: + self.deferred = defer.Deferred() + return self.deferred + + # Try to read some data from the file + self.f.seek(self.position) + b = self.f.read(readSize) + bytesRead = len(b) + if not bytesRead: + # End of file was reached, we're done or deferred + if self.finished: + self._close() + return None + else: + self.deferred = defer.Deferred() + return self.deferred + else: + self.position += bytesRead + return b + + def split(self, point): + raise StreamsError, "You can not split a GrowingFileStream" + + def close(self): + self.length = 0 + self.closed = True + self._close() + + #{ Growing functions + def updateAvailable(self, newlyAvailable): + """Update the number of bytes that are available. + + Call it with 0 to trigger reading of a fully read file. + + @param newlyAvailable: the number of bytes that just became available + """ + if not self.finished: + self.available += newlyAvailable + + # If a read is pending, let it go + if self.deferred and self.position < self.available: + # Try to read some data from the file + length = self.available - self.position + readSize = min(length, self.CHUNK_SIZE) + self.f.seek(self.position) + b = self.f.read(readSize) + bytesRead = len(b) + + # Check if end of file was reached + if bytesRead: + self.position += bytesRead + deferred = self.deferred + self.deferred = None + deferred.callback(b) + + def allAvailable(self, remove = False): + """Indicate that no more data will be coming available. + + @param remove: whether to remove the file when streaming is complete + """ + self.finished = True + self.remove = remove + + # If a read is pending, let it go + if self.deferred: + if self.position < self.available: + # Try to read some data from the file + length = self.available - self.position + readSize = min(length, self.CHUNK_SIZE) + self.f.seek(self.position) + b = self.f.read(readSize) + bytesRead = len(b) + + # Check if end of file was reached + if bytesRead: + self.position += bytesRead + deferred = self.deferred + self.deferred = None + deferred.callback(b) + else: + # We're done + self._close() + deferred = self.deferred + self.deferred = None + deferred.callback(None) + else: + # We're done + self._close() + deferred = self.deferred + self.deferred = None + deferred.callback(None) + + if self.closed: + self._close() + + def _close(self): + """Close the temporary file and maybe remove it.""" + if self.f: + self.f.close() + if self.remove: + file = filepath.FilePath(self.f.name) + file.restat(False) + if file.exists(): + file.remove() + self.f = None + +class StreamToFile: + """Save a stream to a partial file and hash it. + + Also optionally decompresses the file while it is being downloaded. + + @type stream: L{twisted.web2.stream.IByteStream} + @ivar stream: the input stream being read + @type outFile: C{file} + @ivar outFile: the open file being written + @type hasher: hashing object, e.g. C{sha1} + @ivar hasher: the hash object for the data + @type gzfile: C{file} + @ivar gzfile: the open file to write decompressed gzip data to + @type gzdec: L{zlib.decompressobj} + @ivar gzdec: the decompressor to use for the compressed gzip data + @type gzheader: C{boolean} + @ivar gzheader: whether the gzip header still needs to be removed from + the zlib compressed data + @type bz2file: C{file} + @ivar bz2file: the open file to write decompressed bz2 data to + @type bz2dec: L{bz2.BZ2Decompressor} + @ivar bz2dec: the decompressor to use for the compressed bz2 data + @type position: C{int} + @ivar position: the current file position to write the next data to + @type length: C{int} + @ivar length: the position in the file to not write beyond + @ivar notify: a method that will be notified of the length of received data + @type doneDefer: L{twisted.internet.defer.Deferred} + @ivar doneDefer: the deferred that will fire when done writing + """ + + def __init__(self, hasher, inputStream, outFile, start = 0, length = None, + notify = None, decompress = None, decFile = None): + """Initializes the files. + + @type hasher: hashing object, e.g. C{sha1} + @param hasher: the hash object for the data + @type inputStream: L{twisted.web2.stream.IByteStream} + @param inputStream: the input stream to read from + @type outFile: C{file} + @param outFile: the open file to write to + @type start: C{int} + @param start: the file position to start writing at + (optional, defaults to the start of the file) + @type length: C{int} + @param length: the maximum amount of data to write to the file + (optional, defaults to not limiting the writing to the file + @param notify: a method that will be notified of the length of + received data (optional) + @type decompress: C{string} + @param decompress: also decompress the file as this type + (currently only '.gz' and '.bz2' are supported) + @type decFile: C{twisted.python.FilePath} + @param decFile: the file to write the decompressed data to + """ + self.stream = inputStream + self.outFile = outFile + self.hasher = hasher + self.gzfile = None + self.bz2file = None + if decompress == ".gz": + self.gzheader = True + self.gzfile = decFile.open('w') + self.gzdec = decompressobj(-MAX_WBITS) + elif decompress == ".bz2": + self.bz2file = decFile.open('w') + self.bz2dec = BZ2Decompressor() + self.position = start + self.length = None + if length is not None: + self.length = start + length + self.notify = notify + self.doneDefer = None + + def run(self): + """Start the streaming. + + @rtype: L{twisted.internet.defer.Deferred} + """ + self.doneDefer = stream.readStream(self.stream, self._gotData) + self.doneDefer.addCallbacks(self._done, self._error) + return self.doneDefer + + def _gotData(self, data): + """Process the received data.""" + if self.outFile.closed: + raise StreamsError, "outFile was unexpectedly closed" + + # Make sure we don't go too far + if self.length is not None and self.position + len(data) > self.length: + data = data[:(self.length - self.position)] + + # Write and hash the streamed data + self.outFile.seek(self.position) + self.outFile.write(data) + self.hasher.update(data) + self.position += len(data) + + if self.gzfile: + # Decompress the zlib portion of the file + if self.gzheader: + # Remove the gzip header junk + self.gzheader = False + new_data = self._remove_gzip_header(data) + dec_data = self.gzdec.decompress(new_data) + else: + dec_data = self.gzdec.decompress(data) + self.gzfile.write(dec_data) + if self.bz2file: + # Decompress the bz2 file + dec_data = self.bz2dec.decompress(data) + self.bz2file.write(dec_data) + + if self.notify: + self.notify(len(data)) + + def _remove_gzip_header(self, data): + """Remove the gzip header from the zlib compressed data.""" + # Read, check & discard the header fields + if data[:2] != '\037\213': + raise IOError, 'Not a gzipped file' + if ord(data[2]) != 8: + raise IOError, 'Unknown compression method' + flag = ord(data[3]) + # modtime = self.fileobj.read(4) + # extraflag = self.fileobj.read(1) + # os = self.fileobj.read(1) + + skip = 10 + if flag & FEXTRA: + # Read & discard the extra field + xlen = ord(data[10]) + xlen = xlen + 256*ord(data[11]) + skip = skip + 2 + xlen + if flag & FNAME: + # Read and discard a null-terminated string containing the filename + while True: + if not data[skip] or data[skip] == '\000': + break + skip += 1 + skip += 1 + if flag & FCOMMENT: + # Read and discard a null-terminated string containing a comment + while True: + if not data[skip] or data[skip] == '\000': + break + skip += 1 + skip += 1 + if flag & FHCRC: + skip += 2 # Read & discard the 16-bit header CRC + + return data[skip:] + + def _close(self): + """Close all the output files.""" + # Can't close the outfile, but we should sync it to disk + if not self.outFile.closed: + self.outFile.flush() + + # Close the decompressed file + if self.gzfile: + # Finish the decompression + data_dec = self.gzdec.flush() + self.gzfile.write(data_dec) + self.gzfile.close() + self.gzfile = None + if self.bz2file: + self.bz2file.close() + self.bz2file = None + + def _done(self, result): + """Return the result.""" + self._close() + return self.hasher + + def _error(self, err): + """Log the error and close everything.""" + log.msg('Streaming error') + log.err(err) + self.stream.close() + self._close() + return err + +class UploadStream: + """Identifier for streams that are uploaded to peers.""" + +class PiecesUploadStream(stream.MemoryStream, UploadStream): + """Modified to identify it for streaming to peers.""" + +class FileUploadStream(stream.FileStream, UploadStream): + """Modified to make it suitable for streaming to peers. + + Streams the file in small chunks to make it easier to throttle the + streaming to peers. + + @ivar CHUNK_SIZE: the size of chunks of data to send at a time + """ + + CHUNK_SIZE = 4*1024 + + def read(self, sendfile=False): + if self.f is None: + return None + + length = self.length + if length == 0: + self.f = None + return None + + # Remove the SendFileBuffer and mmap use, just use string reads and writes + + readSize = min(length, self.CHUNK_SIZE) + + self.f.seek(self.start) + b = self.f.read(readSize) + bytesRead = len(b) + if not bytesRead: + raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length)) + else: + self.length -= bytesRead + self.start += bytesRead + return b diff --git a/test.py b/test.py index b6a3ec8..b94e012 100755 --- a/test.py +++ b/test.py @@ -339,6 +339,27 @@ tests = {'1': ('Start a single bootstrap and downloader, test updating and downl (2, ['source', 'crash-whitepaper']), ]), + 'c': ('Test downloading from peers and just a mirror.', + {1: {}}, + {1: {}, + 2: {}}, + [(1, ['update']), + (1, ['install', 'aboot-base', 'ada-reference-manual', + 'fop-doc', 'bison-doc', 'crash-whitepaper', + 'apt-howto-common', 'aptitude-doc-en', 'asr-manpages', + 'alcovebook-sgml-doc', 'airstrike-common', + ]), + (2, ['update']), + (2, ['install', 'aboot-base', 'aap-doc', 'ada-reference-manual', + 'aspectj-doc', 'fop-doc', 'asis-doc', + 'bison-doc', 'crash-whitepaper', + 'bash-doc', 'apt-howto-common', 'autotools-dev', + 'aptitude-doc-en', 'asr-manpages', + 'atomix-data', 'alcovebook-sgml-doc', + 'afbackup-common', 'airstrike-common', + ]), + ]), + } assert 'all' not in tests