@var DECOMPRESS_FILES: a list of file names that need to be decompressed
-from bz2 import BZ2Decompressor
-from zlib import decompressobj, MAX_WBITS
from urlparse import urlparse
import os
from twisted.python.filepath import FilePath
from twisted.internet import defer, reactor
from twisted.trial import unittest
-from twisted.web2 import stream
from twisted.web2.http import splitHostPort
+from Streams import GrowingFileStream, StreamToFile
from Hash import HashObject
from apt_p2p_conf import config
class CacheError(Exception):
"""Error occurred downloading a file to the cache."""
-class ProxyFileStream(stream.SimpleStream):
- """Saves a stream to a file while providing a new stream.
- Also optionally decompresses the file while it is being downloaded.
- @type stream: L{twisted.web2.stream.IByteStream}
- @ivar stream: the input stream being read
- @type outFile: L{twisted.python.filepath.FilePath}
- @ivar outFile: the file being written
- @type hash: L{Hash.HashObject}
- @ivar hash: the hash object for the file
- @type gzfile: C{file}
- @ivar gzfile: the open file to write decompressed gzip data to
- @type gzdec: L{zlib.decompressobj}
- @ivar gzdec: the decompressor to use for the compressed gzip data
- @type gzheader: C{boolean}
- @ivar gzheader: whether the gzip header still needs to be removed from
- the zlib compressed data
- @type bz2file: C{file}
- @ivar bz2file: the open file to write decompressed bz2 data to
- @type bz2dec: L{bz2.BZ2Decompressor}
- @ivar bz2dec: the decompressor to use for the compressed bz2 data
- @type length: C{int}
- @ivar length: the length of the original (compressed) file
- @type doneDefer: L{twisted.internet.defer.Deferred}
- @ivar doneDefer: the deferred that will fire when done streaming
- @group Stream implementation: read, close
- """
- def __init__(self, stream, outFile, hash, decompress = None, decFile = None):
- """Initializes the proxy.
- @type stream: L{twisted.web2.stream.IByteStream}
- @param stream: the input stream to read from
- @type outFile: L{twisted.python.filepath.FilePath}
- @param outFile: the file to write to
- @type hash: L{Hash.HashObject}
- @param hash: the hash object to use for the file
- @type decompress: C{string}
- @param decompress: also decompress the file as this type
- (currently only '.gz' and '.bz2' are supported)
- @type decFile: C{twisted.python.FilePath}
- @param decFile: the file to write the decompressed data to
- """
- self.stream = stream
- self.outFile = outFile.open('w')
- self.hash = hash
- self.hash.new()
- self.gzfile = None
- self.bz2file = None
- if decompress == ".gz":
- self.gzheader = True
- self.gzfile = decFile.open('w')
- self.gzdec = decompressobj(-MAX_WBITS)
- elif decompress == ".bz2":
- self.bz2file = decFile.open('w')
- self.bz2dec = BZ2Decompressor()
- self.length = self.stream.length
- self.doneDefer = defer.Deferred()
- def _done(self):
- """Close all the output files, return the result."""
- if not self.outFile.closed:
- self.outFile.close()
- self.hash.digest()
- if self.gzfile:
- # Finish the decompression
- data_dec = self.gzdec.flush()
- self.gzfile.write(data_dec)
- self.gzfile.close()
- self.gzfile = None
- if self.bz2file:
- self.bz2file.close()
- self.bz2file = None
- def _error(self, err):
- """Close all the output files, return the error."""
- if not self.outFile.closed:
- self._done()
- self.stream.close()
- self.doneDefer.errback(err)
- def read(self):
- """Read some data from the stream."""
- if self.outFile.closed:
- return None
- # Read data from the stream, deal with the possible deferred
- data = self.stream.read()
- if isinstance(data, defer.Deferred):
- data.addCallbacks(self._write, self._error)
- return data
- self._write(data)
- return data
- def _write(self, data):
- """Write the stream data to the file and return it for others to use.
- Also optionally decompresses it.
- """
- if data is None:
- if not self.outFile.closed:
- self._done()
- self.doneDefer.callback(self.hash)
- return data
- # Write and hash the streamed data
- self.outFile.write(data)
- self.hash.update(data)
- if self.gzfile:
- # Decompress the zlib portion of the file
- if self.gzheader:
- # Remove the gzip header junk
- self.gzheader = False
- new_data = self._remove_gzip_header(data)
- dec_data = self.gzdec.decompress(new_data)
- else:
- dec_data = self.gzdec.decompress(data)
- self.gzfile.write(dec_data)
- if self.bz2file:
- # Decompress the bz2 file
- dec_data = self.bz2dec.decompress(data)
- self.bz2file.write(dec_data)
- return data
- def _remove_gzip_header(self, data):
- """Remove the gzip header from the zlib compressed data."""
- # Read, check & discard the header fields
- if data[:2] != '\037\213':
- raise IOError, 'Not a gzipped file'
- if ord(data[2]) != 8:
- raise IOError, 'Unknown compression method'
- flag = ord(data[3])
- # modtime = self.fileobj.read(4)
- # extraflag = self.fileobj.read(1)
- # os = self.fileobj.read(1)
- skip = 10
- if flag & FEXTRA:
- # Read & discard the extra field
- xlen = ord(data[10])
- xlen = xlen + 256*ord(data[11])
- skip = skip + 2 + xlen
- if flag & FNAME:
- # Read and discard a null-terminated string containing the filename
- while True:
- if not data[skip] or data[skip] == '\000':
- break
- skip += 1
- skip += 1
- if flag & FCOMMENT:
- # Read and discard a null-terminated string containing a comment
- while True:
- if not data[skip] or data[skip] == '\000':
- break
- skip += 1
- skip += 1
- if flag & FHCRC:
- skip += 2 # Read & discard the 16-bit header CRC
- return data[skip:]
- def close(self):
- """Clean everything up and return None to future reads."""
- log.msg('ProxyFileStream was prematurely closed after only %d/%d bytes' % (self.hash.size, self.length))
- if self.hash.size < self.length:
- self._error(CacheError('Prematurely closed, all data was not written'))
- elif not self.outFile.closed:
- self._done()
- self.doneDefer.callback(self.hash)
- self.length = 0
- self.stream.close()
class CacheManager:
"""Manages all downloaded files and requests for cached objects.
# Create the new stream from the old one.
orig_stream = response.stream
- response.stream = ProxyFileStream(orig_stream, destFile, hash, ext, decFile)
- response.stream.doneDefer.addCallback(self._save_complete, url, destFile,
- response.headers.getHeader('Last-Modified'),
- decFile)
- response.stream.doneDefer.addErrback(self._save_error, url, destFile, decFile)
+ f = destFile.open('w+')
+ new_stream = GrowingFileStream(f, orig_stream.length)
+ hash.new()
+ df = StreamToFile(hash, orig_stream, f, notify = new_stream.updateAvailable,
+ decompress = ext, decFile = decFile).run()
+ df.addCallback(self._save_complete, url, destFile, new_stream,
+ response.headers.getHeader('Last-Modified'), decFile)
+ df.addErrback(self._save_error, url, destFile, new_stream, decFile)
+ response.stream = new_stream
# Return the modified response with the new stream
return response
- def _save_complete(self, hash, url, destFile, modtime = None, decFile = None):
+ def _save_complete(self, hash, url, destFile, destStream = None,
+ modtime = None, decFile = None):
"""Update the modification time and inform the main program.
@type hash: L{Hash.HashObject}
@param url: the URI of the actual mirror request
@type destFile: C{twisted.python.FilePath}
@param destFile: the file where the download was written to
+ @type destStream: L{Streams.GrowingFileStream}
+ @param destStream: the stream to notify that all data is available
@type modtime: C{int}
@param modtime: the modified time of the cached file (seconds since epoch)
(optional, defaults to not setting the modification time of the file)
result = hash.verify()
if result or result is None:
+ if destStream:
+ destStream.allAvailable()
if modtime:
os.utime(destFile.path, (modtime, modtime))
decHash = HashObject()
ext_len = len(destFile.path) - len(decFile.path)
df = decHash.hashInThread(decFile)
- df.addCallback(self._save_complete, url[:-ext_len], decFile, modtime)
+ df.addCallback(self._save_complete, url[:-ext_len], decFile, modtime = modtime)
df.addErrback(self._save_error, url[:-ext_len], decFile)
log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
- destFile.remove()
+ if destStream:
+ destStream.allAvailable(remove = True)
if decFile:
- def _save_error(self, failure, url, destFile, decFile = None):
+ def _save_error(self, failure, url, destFile, destStream = None, decFile = None):
"""Remove the destination files."""
log.msg('Error occurred downloading %s' % url)
- destFile.restat(False)
- if destFile.exists():
- log.msg('Removing the incomplete file: %s' % destFile.path)
- destFile.remove()
+ if destStream:
+ destStream.allAvailable(remove = True)
+ else:
+ destFile.restat(False)
+ if destFile.exists():
+ log.msg('Removing the incomplete file: %s' % destFile.path)
+ destFile.remove()
if decFile:
if decFile.exists():
from twisted.python.filepath import FilePath
from policies import ThrottlingFactory, ThrottlingProtocol, ProtocolWrapper
+from Streams import UploadStream, FileUploadStream, PiecesUploadStream
from apt_p2p_conf import config
from apt_p2p_Khashmir.bencode import bencode
return self.__class__(path, self.manager, self.defaultType, self.ignoredExts,
self.processors, self.indexNames[:])
-class UploadStream:
- """Identifier for streams that are uploaded to peers."""
-class FileUploaderStream(stream.FileStream, UploadStream):
- """Modified to make it suitable for streaming to peers.
- Streams the file in small chunks to make it easier to throttle the
- streaming to peers.
- @ivar CHUNK_SIZE: the size of chunks of data to send at a time
- """
- CHUNK_SIZE = 4*1024
- def read(self, sendfile=False):
- if self.f is None:
- return None
- length = self.length
- if length == 0:
- self.f = None
- return None
- # Remove the SendFileBuffer and mmap use, just use string reads and writes
- readSize = min(length, self.CHUNK_SIZE)
- self.f.seek(self.start)
- b = self.f.read(readSize)
- bytesRead = len(b)
- if not bytesRead:
- raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
- else:
- self.length -= bytesRead
- self.start += bytesRead
- return b
-class PiecesUploaderStream(stream.MemoryStream, UploadStream):
- """Modified to identify it for streaming to peers."""
class PiecesUploader(static.Data):
"""Modified to identify it for peer requests.
- Uses the modified L{PieceUploaderStream} to stream the pieces for throttling.
+ Uses the modified L{Streams.PieceUploadStream} to stream the pieces for throttling.
def render(self, req):
return http.Response(responsecode.OK,
http_headers.Headers({'content-type': self.contentType()}),
- stream=PiecesUploaderStream(self.data))
+ stream=PiecesUploadStream(self.data))
class FileUploader(static.File):
"""Modified to make it suitable for peer requests.
- Uses the modified L{FileUploaderStream} to stream the file for throttling,
+ Uses the modified L{Streams.FileUploadStream} to stream the file for throttling,
and doesn't do any listing of directory contents.
response = http.Response()
# Use the modified FileStream
- response.stream = FileUploaderStream(f, 0, self.fp.getsize())
+ response.stream = FileUploadStream(f, 0, self.fp.getsize())
for (header, value) in (
("content-type", self.contentType()),
"""Protocol for throttling uploads.
Determines whether or not to throttle the upload based on the type of stream.
- Uploads use L{FileUploaderStream} or L{twisted.web2.stream.MemorySTream},
- apt uses L{CacheManager.ProxyFileStream} or L{twisted.web.stream.FileStream}.
+ Uploads use instances of L{Streams.UploadStream}.
stats = None
import sha
from twisted.internet import reactor, defer
-from twisted.python import log, filepath
+from twisted.python import log
from twisted.trial import unittest
from twisted.web2 import stream
from twisted.web2.http import Response, splitHostPort
from HTTPDownloader import Peer
+from Streams import GrowingFileStream, StreamToFile
from util import uncompact
from Hash import PIECE_SIZE
from apt_p2p_Khashmir.bencode import bdecode
from apt_p2p_conf import config
class PeerError(Exception):
"""An error occurred downloading from peers."""
-class GrowingFileStream(stream.FileStream):
- """Modified to stream data from a file as it becomes available.
- @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
- @ivar deferred: waiting for the result of the last read attempt
- @ivar available: the number of bytes that are currently available to read
- @ivar position: the current position in the file where the next read will begin
- @ivar finished: True when no more data will be coming available
- """
- CHUNK_SIZE = 4*1024
- def __init__(self, f, length = None):
- stream.FileStream.__init__(self, f)
- self.length = length
- self.deferred = None
- self.available = 0L
- self.position = 0L
- self.finished = False
- def updateAvailable(self, newlyAvailable):
- """Update the number of bytes that are available.
- Call it with 0 to trigger reading of a fully read file.
- @param newlyAvailable: the number of bytes that just became available
- """
- assert not self.finished
- self.available += newlyAvailable
- # If a read is pending, let it go
- if self.deferred and self.position < self.available:
- # Try to read some data from the file
- length = self.available - self.position
- readSize = min(length, self.CHUNK_SIZE)
- self.f.seek(self.position)
- b = self.f.read(readSize)
- bytesRead = len(b)
- # Check if end of file was reached
- if bytesRead:
- self.position += bytesRead
- deferred = self.deferred
- self.deferred = None
- deferred.callback(b)
- def allAvailable(self):
- """Indicate that no more data will be coming available."""
- self.finished = True
- # If a read is pending, let it go
- if self.deferred:
- if self.position < self.available:
- # Try to read some data from the file
- length = self.available - self.position
- readSize = min(length, self.CHUNK_SIZE)
- self.f.seek(self.position)
- b = self.f.read(readSize)
- bytesRead = len(b)
- # Check if end of file was reached
- if bytesRead:
- self.position += bytesRead
- deferred = self.deferred
- self.deferred = None
- deferred.callback(b)
- else:
- # We're done
- self._close()
- deferred = self.deferred
- self.deferred = None
- deferred.callback(None)
- else:
- # We're done
- self._close()
- deferred = self.deferred
- self.deferred = None
- deferred.callback(None)
- def read(self, sendfile=False):
- assert not self.deferred, "A previous read is still deferred."
- if self.f is None:
- return None
- length = self.available - self.position
- readSize = min(length, self.CHUNK_SIZE)
- # If we don't have any available, we're done or deferred
- if readSize <= 0:
- if self.finished:
- self._close()
- return None
- else:
- self.deferred = defer.Deferred()
- return self.deferred
- # Try to read some data from the file
- self.f.seek(self.position)
- b = self.f.read(readSize)
- bytesRead = len(b)
- if not bytesRead:
- # End of file was reached, we're done or deferred
- if self.finished:
- self._close()
- return None
- else:
- self.deferred = defer.Deferred()
- return self.deferred
- else:
- self.position += bytesRead
- return b
- def _close(self):
- """Close the temporary file and remove it."""
- self.f.close()
- filepath.FilePath(self.f.name).remove()
- self.f = None
-class StreamToFile:
- """Save a stream to a partial file and hash it.
- @type stream: L{twisted.web2.stream.IByteStream}
- @ivar stream: the input stream being read
- @type outFile: L{twisted.python.filepath.FilePath}
- @ivar outFile: the file being written
- @type hasher: hashing object, e.g. C{sha1}
- @ivar hasher: the hash object for the data
- @type position: C{int}
- @ivar position: the current file position to write the next data to
- @type length: C{int}
- @ivar length: the position in the file to not write beyond
- @type doneDefer: L{twisted.internet.defer.Deferred}
- @ivar doneDefer: the deferred that will fire when done writing
- """
- def __init__(self, hasher, inputStream, outFile, start = 0, length = None):
- """Initializes the file.
- @type hasher: hashing object, e.g. C{sha1}
- @param hasher: the hash object for the data
- @type inputStream: L{twisted.web2.stream.IByteStream}
- @param inputStream: the input stream to read from
- @type outFile: L{twisted.python.filepath.FilePath}
- @param outFile: the file to write to
- @type start: C{int}
- @param start: the file position to start writing at
- (optional, defaults to the start of the file)
- @type length: C{int}
- @param length: the maximum amount of data to write to the file
- (optional, defaults to not limiting the writing to the file
- """
- self.stream = inputStream
- self.outFile = outFile
- self.hasher = hasher
- self.position = start
- self.length = None
- if length is not None:
- self.length = start + length
- self.doneDefer = None
- def run(self):
- """Start the streaming.
- @rtype: L{twisted.internet.defer.Deferred}
- """
- self.doneDefer = stream.readStream(self.stream, self._gotData)
- self.doneDefer.addCallbacks(self._done, self._error)
- return self.doneDefer
- def _gotData(self, data):
- """Process the received data."""
- if self.outFile.closed:
- raise PeerError, "outFile was unexpectedly closed"
- if data is None:
- raise PeerError, "Data is None?"
- # Make sure we don't go too far
- if self.length is not None and self.position + len(data) > self.length:
- data = data[:(self.length - self.position)]
- # Write and hash the streamed data
- self.outFile.seek(self.position)
- self.outFile.write(data)
- self.hasher.update(data)
- self.position += len(data)
- def _done(self, result):
- """Return the result."""
- return self.hasher.digest()
- def _error(self, err):
- """Log the error."""
- log.msg('Streaming error')
- log.err(err)
- return err
class FileDownload:
"""Manage a download from a list of peers or a mirror.
#{ Downloading the pieces
def getPieces(self):
"""Download the next pieces from the peers."""
+ if self.file.closed:
+ log.msg('Download has been aborted for %s' % self.path)
+ self.stream.allAvailable(remove = True)
+ return
piece = self.nextFinish
while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
# Check if we're done
if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
log.msg('Download is complete for %s' % self.path)
- self.stream.allAvailable()
+ self.stream.allAvailable(remove = True)
def _getPiece(self, response, piece, peer):
"""Process the retrieved headers from the peer."""
- def _gotPiece(self, response, piece, peer):
+ def _gotPiece(self, hash, piece, peer):
"""Process the retrieved piece from the peer."""
- if self.pieces[piece] and response != self.pieces[piece]:
+ if self.pieces[piece] and hash.digest() != self.pieces[piece]:
# Hash doesn't match
log.msg('Hash error for piece %d from peer %r' % (piece, peer))
peer.hashError('Piece received from peer does not match expected')
def _gotError(self, err, piece, peer):
"""Piece download failed, try again."""
- log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
+ log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, err))
self.completePieces[piece] = False
--- /dev/null
+"""Modified streams that are used by Apt-P2P."""
+from bz2 import BZ2Decompressor
+from zlib import decompressobj, MAX_WBITS
+import os
+from twisted.web2 import stream
+from twisted.internet import defer
+from twisted.python import log, filepath
+class StreamsError(Exception):
+ """An error occurred in the streaming."""
+class GrowingFileStream(stream.SimpleStream):
+ """Modified to stream data from a file as it becomes available.
+ @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
+ @ivar deferred: waiting for the result of the last read attempt
+ @ivar available: the number of bytes that are currently available to read
+ @ivar position: the current position in the file where the next read will begin
+ @ivar closed: True if the reader has closed the stream
+ @ivar finished: True when no more data will be coming available
+ @ivar remove: whether to remove the file when streaming is complete
+ """
+ CHUNK_SIZE = 32*1024
+ def __init__(self, f, length = None):
+ self.f = f
+ self.length = length
+ self.deferred = None
+ self.available = 0L
+ self.position = 0L
+ self.closed = False
+ self.finished = False
+ self.remove = False
+ #{ Stream interface
+ def read(self, sendfile=False):
+ assert not self.deferred, "A previous read is still deferred."
+ if self.f is None:
+ return None
+ length = self.available - self.position
+ readSize = min(length, self.CHUNK_SIZE)
+ # If we don't have any available, we're done or deferred
+ if readSize <= 0:
+ if self.finished:
+ self._close()
+ return None
+ else:
+ self.deferred = defer.Deferred()
+ return self.deferred
+ # Try to read some data from the file
+ self.f.seek(self.position)
+ b = self.f.read(readSize)
+ bytesRead = len(b)
+ if not bytesRead:
+ # End of file was reached, we're done or deferred
+ if self.finished:
+ self._close()
+ return None
+ else:
+ self.deferred = defer.Deferred()
+ return self.deferred
+ else:
+ self.position += bytesRead
+ return b
+ def split(self, point):
+ raise StreamsError, "You can not split a GrowingFileStream"
+ def close(self):
+ self.length = 0
+ self.closed = True
+ self._close()
+ #{ Growing functions
+ def updateAvailable(self, newlyAvailable):
+ """Update the number of bytes that are available.
+ Call it with 0 to trigger reading of a fully read file.
+ @param newlyAvailable: the number of bytes that just became available
+ """
+ if not self.finished:
+ self.available += newlyAvailable
+ # If a read is pending, let it go
+ if self.deferred and self.position < self.available:
+ # Try to read some data from the file
+ length = self.available - self.position
+ readSize = min(length, self.CHUNK_SIZE)
+ self.f.seek(self.position)
+ b = self.f.read(readSize)
+ bytesRead = len(b)
+ # Check if end of file was reached
+ if bytesRead:
+ self.position += bytesRead
+ deferred = self.deferred
+ self.deferred = None
+ deferred.callback(b)
+ def allAvailable(self, remove = False):
+ """Indicate that no more data will be coming available.
+ @param remove: whether to remove the file when streaming is complete
+ """
+ self.finished = True
+ self.remove = remove
+ # If a read is pending, let it go
+ if self.deferred:
+ if self.position < self.available:
+ # Try to read some data from the file
+ length = self.available - self.position
+ readSize = min(length, self.CHUNK_SIZE)
+ self.f.seek(self.position)
+ b = self.f.read(readSize)
+ bytesRead = len(b)
+ # Check if end of file was reached
+ if bytesRead:
+ self.position += bytesRead
+ deferred = self.deferred
+ self.deferred = None
+ deferred.callback(b)
+ else:
+ # We're done
+ self._close()
+ deferred = self.deferred
+ self.deferred = None
+ deferred.callback(None)
+ else:
+ # We're done
+ self._close()
+ deferred = self.deferred
+ self.deferred = None
+ deferred.callback(None)
+ if self.closed:
+ self._close()
+ def _close(self):
+ """Close the temporary file and maybe remove it."""
+ if self.f:
+ self.f.close()
+ if self.remove:
+ file = filepath.FilePath(self.f.name)
+ file.restat(False)
+ if file.exists():
+ file.remove()
+ self.f = None
+class StreamToFile:
+ """Save a stream to a partial file and hash it.
+ Also optionally decompresses the file while it is being downloaded.
+ @type stream: L{twisted.web2.stream.IByteStream}
+ @ivar stream: the input stream being read
+ @type outFile: C{file}
+ @ivar outFile: the open file being written
+ @type hasher: hashing object, e.g. C{sha1}
+ @ivar hasher: the hash object for the data
+ @type gzfile: C{file}
+ @ivar gzfile: the open file to write decompressed gzip data to
+ @type gzdec: L{zlib.decompressobj}
+ @ivar gzdec: the decompressor to use for the compressed gzip data
+ @type gzheader: C{boolean}
+ @ivar gzheader: whether the gzip header still needs to be removed from
+ the zlib compressed data
+ @type bz2file: C{file}
+ @ivar bz2file: the open file to write decompressed bz2 data to
+ @type bz2dec: L{bz2.BZ2Decompressor}
+ @ivar bz2dec: the decompressor to use for the compressed bz2 data
+ @type position: C{int}
+ @ivar position: the current file position to write the next data to
+ @type length: C{int}
+ @ivar length: the position in the file to not write beyond
+ @ivar notify: a method that will be notified of the length of received data
+ @type doneDefer: L{twisted.internet.defer.Deferred}
+ @ivar doneDefer: the deferred that will fire when done writing
+ """
+ def __init__(self, hasher, inputStream, outFile, start = 0, length = None,
+ notify = None, decompress = None, decFile = None):
+ """Initializes the files.
+ @type hasher: hashing object, e.g. C{sha1}
+ @param hasher: the hash object for the data
+ @type inputStream: L{twisted.web2.stream.IByteStream}
+ @param inputStream: the input stream to read from
+ @type outFile: C{file}
+ @param outFile: the open file to write to
+ @type start: C{int}
+ @param start: the file position to start writing at
+ (optional, defaults to the start of the file)
+ @type length: C{int}
+ @param length: the maximum amount of data to write to the file
+ (optional, defaults to not limiting the writing to the file
+ @param notify: a method that will be notified of the length of
+ received data (optional)
+ @type decompress: C{string}
+ @param decompress: also decompress the file as this type
+ (currently only '.gz' and '.bz2' are supported)
+ @type decFile: C{twisted.python.FilePath}
+ @param decFile: the file to write the decompressed data to
+ """
+ self.stream = inputStream
+ self.outFile = outFile
+ self.hasher = hasher
+ self.gzfile = None
+ self.bz2file = None
+ if decompress == ".gz":
+ self.gzheader = True
+ self.gzfile = decFile.open('w')
+ self.gzdec = decompressobj(-MAX_WBITS)
+ elif decompress == ".bz2":
+ self.bz2file = decFile.open('w')
+ self.bz2dec = BZ2Decompressor()
+ self.position = start
+ self.length = None
+ if length is not None:
+ self.length = start + length
+ self.notify = notify
+ self.doneDefer = None
+ def run(self):
+ """Start the streaming.
+ @rtype: L{twisted.internet.defer.Deferred}
+ """
+ self.doneDefer = stream.readStream(self.stream, self._gotData)
+ self.doneDefer.addCallbacks(self._done, self._error)
+ return self.doneDefer
+ def _gotData(self, data):
+ """Process the received data."""
+ if self.outFile.closed:
+ raise StreamsError, "outFile was unexpectedly closed"
+ # Make sure we don't go too far
+ if self.length is not None and self.position + len(data) > self.length:
+ data = data[:(self.length - self.position)]
+ # Write and hash the streamed data
+ self.outFile.seek(self.position)
+ self.outFile.write(data)
+ self.hasher.update(data)
+ self.position += len(data)
+ if self.gzfile:
+ # Decompress the zlib portion of the file
+ if self.gzheader:
+ # Remove the gzip header junk
+ self.gzheader = False
+ new_data = self._remove_gzip_header(data)
+ dec_data = self.gzdec.decompress(new_data)
+ else:
+ dec_data = self.gzdec.decompress(data)
+ self.gzfile.write(dec_data)
+ if self.bz2file:
+ # Decompress the bz2 file
+ dec_data = self.bz2dec.decompress(data)
+ self.bz2file.write(dec_data)
+ if self.notify:
+ self.notify(len(data))
+ def _remove_gzip_header(self, data):
+ """Remove the gzip header from the zlib compressed data."""
+ # Read, check & discard the header fields
+ if data[:2] != '\037\213':
+ raise IOError, 'Not a gzipped file'
+ if ord(data[2]) != 8:
+ raise IOError, 'Unknown compression method'
+ flag = ord(data[3])
+ # modtime = self.fileobj.read(4)
+ # extraflag = self.fileobj.read(1)
+ # os = self.fileobj.read(1)
+ skip = 10
+ if flag & FEXTRA:
+ # Read & discard the extra field
+ xlen = ord(data[10])
+ xlen = xlen + 256*ord(data[11])
+ skip = skip + 2 + xlen
+ if flag & FNAME:
+ # Read and discard a null-terminated string containing the filename
+ while True:
+ if not data[skip] or data[skip] == '\000':
+ break
+ skip += 1
+ skip += 1
+ if flag & FCOMMENT:
+ # Read and discard a null-terminated string containing a comment
+ while True:
+ if not data[skip] or data[skip] == '\000':
+ break
+ skip += 1
+ skip += 1
+ if flag & FHCRC:
+ skip += 2 # Read & discard the 16-bit header CRC
+ return data[skip:]
+ def _close(self):
+ """Close all the output files."""
+ # Can't close the outfile, but we should sync it to disk
+ if not self.outFile.closed:
+ self.outFile.flush()
+ # Close the decompressed file
+ if self.gzfile:
+ # Finish the decompression
+ data_dec = self.gzdec.flush()
+ self.gzfile.write(data_dec)
+ self.gzfile.close()
+ self.gzfile = None
+ if self.bz2file:
+ self.bz2file.close()
+ self.bz2file = None
+ def _done(self, result):
+ """Return the result."""
+ self._close()
+ return self.hasher
+ def _error(self, err):
+ """Log the error and close everything."""
+ log.msg('Streaming error')
+ log.err(err)
+ self.stream.close()
+ self._close()
+ return err
+class UploadStream:
+ """Identifier for streams that are uploaded to peers."""
+class PiecesUploadStream(stream.MemoryStream, UploadStream):
+ """Modified to identify it for streaming to peers."""
+class FileUploadStream(stream.FileStream, UploadStream):
+ """Modified to make it suitable for streaming to peers.
+ Streams the file in small chunks to make it easier to throttle the
+ streaming to peers.
+ @ivar CHUNK_SIZE: the size of chunks of data to send at a time
+ """
+ CHUNK_SIZE = 4*1024
+ def read(self, sendfile=False):
+ if self.f is None:
+ return None
+ length = self.length
+ if length == 0:
+ self.f = None
+ return None
+ # Remove the SendFileBuffer and mmap use, just use string reads and writes
+ readSize = min(length, self.CHUNK_SIZE)
+ self.f.seek(self.start)
+ b = self.f.read(readSize)
+ bytesRead = len(b)
+ if not bytesRead:
+ raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
+ else:
+ self.length -= bytesRead
+ self.start += bytesRead
+ return b
(2, ['source', 'crash-whitepaper']),
+ 'c': ('Test downloading from peers and just a mirror.',
+ {1: {}},
+ {1: {},
+ 2: {}},
+ [(1, ['update']),
+ (1, ['install', 'aboot-base', 'ada-reference-manual',
+ 'fop-doc', 'bison-doc', 'crash-whitepaper',
+ 'apt-howto-common', 'aptitude-doc-en', 'asr-manpages',
+ 'alcovebook-sgml-doc', 'airstrike-common',
+ ]),
+ (2, ['update']),
+ (2, ['install', 'aboot-base', 'aap-doc', 'ada-reference-manual',
+ 'aspectj-doc', 'fop-doc', 'asis-doc',
+ 'bison-doc', 'crash-whitepaper',
+ 'bash-doc', 'apt-howto-common', 'autotools-dev',
+ 'aptitude-doc-en', 'asr-manpages',
+ 'atomix-data', 'alcovebook-sgml-doc',
+ 'afbackup-common', 'airstrike-common',
+ ]),
+ ]),
assert 'all' not in tests