Move all streams to new Streams module and replace ProxyFileStream with GrowingFileSt...
authorCameron Dale <camrdale@gmail.com>
Fri, 25 Apr 2008 05:18:29 +0000 (22:18 -0700)
committerCameron Dale <camrdale@gmail.com>
Fri, 25 Apr 2008 05:18:29 +0000 (22:18 -0700)
Instead of ProxyFileStream, now use a combination of GrowingFileStream
and StreamToFile like the PeerManager does to download and upload. It's
mostly working, only some errors when aborting apt-get in test 'a'.

Also added a new test 'c' for downloading out of order, which is why
ProxyFileStream can not be used.

apt_p2p/CacheManager.py
apt_p2p/HTTPServer.py
apt_p2p/PeerManager.py
apt_p2p/Streams.py [new file with mode: 0644]
test.py

index b991093..42f89d9 100644 (file)
@@ -5,9 +5,6 @@
 @var DECOMPRESS_FILES: a list of file names that need to be decompressed
 """
 
-from bz2 import BZ2Decompressor
-from zlib import decompressobj, MAX_WBITS
-from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
 from urlparse import urlparse
 import os
 
@@ -15,9 +12,9 @@ from twisted.python import log
 from twisted.python.filepath import FilePath
 from twisted.internet import defer, reactor
 from twisted.trial import unittest
-from twisted.web2 import stream
 from twisted.web2.http import splitHostPort
 
+from Streams import GrowingFileStream, StreamToFile
 from Hash import HashObject
 from apt_p2p_conf import config
 
@@ -27,184 +24,6 @@ DECOMPRESS_FILES = ['release', 'sources', 'packages']
 class CacheError(Exception):
     """Error occurred downloading a file to the cache."""
 
-class ProxyFileStream(stream.SimpleStream):
-    """Saves a stream to a file while providing a new stream.
-    
-    Also optionally decompresses the file while it is being downloaded.
-    
-    @type stream: L{twisted.web2.stream.IByteStream}
-    @ivar stream: the input stream being read
-    @type outFile: L{twisted.python.filepath.FilePath}
-    @ivar outFile: the file being written
-    @type hash: L{Hash.HashObject}
-    @ivar hash: the hash object for the file
-    @type gzfile: C{file}
-    @ivar gzfile: the open file to write decompressed gzip data to
-    @type gzdec: L{zlib.decompressobj}
-    @ivar gzdec: the decompressor to use for the compressed gzip data
-    @type gzheader: C{boolean}
-    @ivar gzheader: whether the gzip header still needs to be removed from
-        the zlib compressed data
-    @type bz2file: C{file}
-    @ivar bz2file: the open file to write decompressed bz2 data to
-    @type bz2dec: L{bz2.BZ2Decompressor}
-    @ivar bz2dec: the decompressor to use for the compressed bz2 data
-    @type length: C{int}
-    @ivar length: the length of the original (compressed) file
-    @type doneDefer: L{twisted.internet.defer.Deferred}
-    @ivar doneDefer: the deferred that will fire when done streaming
-    
-    @group Stream implementation: read, close
-    
-    """
-    
-    def __init__(self, stream, outFile, hash, decompress = None, decFile = None):
-        """Initializes the proxy.
-        
-        @type stream: L{twisted.web2.stream.IByteStream}
-        @param stream: the input stream to read from
-        @type outFile: L{twisted.python.filepath.FilePath}
-        @param outFile: the file to write to
-        @type hash: L{Hash.HashObject}
-        @param hash: the hash object to use for the file
-        @type decompress: C{string}
-        @param decompress: also decompress the file as this type
-            (currently only '.gz' and '.bz2' are supported)
-        @type decFile: C{twisted.python.FilePath}
-        @param decFile: the file to write the decompressed data to
-        """
-        self.stream = stream
-        self.outFile = outFile.open('w')
-        self.hash = hash
-        self.hash.new()
-        self.gzfile = None
-        self.bz2file = None
-        if decompress == ".gz":
-            self.gzheader = True
-            self.gzfile = decFile.open('w')
-            self.gzdec = decompressobj(-MAX_WBITS)
-        elif decompress == ".bz2":
-            self.bz2file = decFile.open('w')
-            self.bz2dec = BZ2Decompressor()
-        self.length = self.stream.length
-        self.doneDefer = defer.Deferred()
-
-    def _done(self):
-        """Close all the output files, return the result."""
-        if not self.outFile.closed:
-            self.outFile.close()
-            self.hash.digest()
-            if self.gzfile:
-                # Finish the decompression
-                data_dec = self.gzdec.flush()
-                self.gzfile.write(data_dec)
-                self.gzfile.close()
-                self.gzfile = None
-            if self.bz2file:
-                self.bz2file.close()
-                self.bz2file = None
-    
-    def _error(self, err):
-        """Close all the output files, return the error."""
-        if not self.outFile.closed:
-            self._done()
-            self.stream.close()
-            self.doneDefer.errback(err)
-
-    def read(self):
-        """Read some data from the stream."""
-        if self.outFile.closed:
-            return None
-        
-        # Read data from the stream, deal with the possible deferred
-        data = self.stream.read()
-        if isinstance(data, defer.Deferred):
-            data.addCallbacks(self._write, self._error)
-            return data
-        
-        self._write(data)
-        return data
-    
-    def _write(self, data):
-        """Write the stream data to the file and return it for others to use.
-        
-        Also optionally decompresses it.
-        """
-        if data is None:
-            if not self.outFile.closed:
-                self._done()
-                self.doneDefer.callback(self.hash)
-            return data
-        
-        # Write and hash the streamed data
-        self.outFile.write(data)
-        self.hash.update(data)
-        
-        if self.gzfile:
-            # Decompress the zlib portion of the file
-            if self.gzheader:
-                # Remove the gzip header junk
-                self.gzheader = False
-                new_data = self._remove_gzip_header(data)
-                dec_data = self.gzdec.decompress(new_data)
-            else:
-                dec_data = self.gzdec.decompress(data)
-            self.gzfile.write(dec_data)
-        if self.bz2file:
-            # Decompress the bz2 file
-            dec_data = self.bz2dec.decompress(data)
-            self.bz2file.write(dec_data)
-
-        return data
-    
-    def _remove_gzip_header(self, data):
-        """Remove the gzip header from the zlib compressed data."""
-        # Read, check & discard the header fields
-        if data[:2] != '\037\213':
-            raise IOError, 'Not a gzipped file'
-        if ord(data[2]) != 8:
-            raise IOError, 'Unknown compression method'
-        flag = ord(data[3])
-        # modtime = self.fileobj.read(4)
-        # extraflag = self.fileobj.read(1)
-        # os = self.fileobj.read(1)
-
-        skip = 10
-        if flag & FEXTRA:
-            # Read & discard the extra field
-            xlen = ord(data[10])
-            xlen = xlen + 256*ord(data[11])
-            skip = skip + 2 + xlen
-        if flag & FNAME:
-            # Read and discard a null-terminated string containing the filename
-            while True:
-                if not data[skip] or data[skip] == '\000':
-                    break
-                skip += 1
-            skip += 1
-        if flag & FCOMMENT:
-            # Read and discard a null-terminated string containing a comment
-            while True:
-                if not data[skip] or data[skip] == '\000':
-                    break
-                skip += 1
-            skip += 1
-        if flag & FHCRC:
-            skip += 2     # Read & discard the 16-bit header CRC
-
-        return data[skip:]
-
-    def close(self):
-        """Clean everything up and return None to future reads."""
-        log.msg('ProxyFileStream was prematurely closed after only %d/%d bytes' % (self.hash.size, self.length))
-        if self.hash.size < self.length:
-            self._error(CacheError('Prematurely closed, all data was not written'))
-        elif not self.outFile.closed:
-            self._done()
-            self.doneDefer.callback(self.hash)
-        self.length = 0
-        self.stream.close()
-
 class CacheManager:
     """Manages all downloaded files and requests for cached objects.
     
@@ -377,16 +196,21 @@ class CacheManager:
             
         # Create the new stream from the old one.
         orig_stream = response.stream
-        response.stream = ProxyFileStream(orig_stream, destFile, hash, ext, decFile)
-        response.stream.doneDefer.addCallback(self._save_complete, url, destFile,
-                                              response.headers.getHeader('Last-Modified'),
-                                              decFile)
-        response.stream.doneDefer.addErrback(self._save_error, url, destFile, decFile)
+        f = destFile.open('w+')
+        new_stream = GrowingFileStream(f, orig_stream.length)
+        hash.new()
+        df = StreamToFile(hash, orig_stream, f, notify = new_stream.updateAvailable,
+                          decompress = ext, decFile = decFile).run()
+        df.addCallback(self._save_complete, url, destFile, new_stream,
+                       response.headers.getHeader('Last-Modified'), decFile)
+        df.addErrback(self._save_error, url, destFile, new_stream, decFile)
+        response.stream = new_stream
 
         # Return the modified response with the new stream
         return response
 
-    def _save_complete(self, hash, url, destFile, modtime = None, decFile = None):
+    def _save_complete(self, hash, url, destFile, destStream = None,
+                       modtime = None, decFile = None):
         """Update the modification time and inform the main program.
         
         @type hash: L{Hash.HashObject}
@@ -394,6 +218,8 @@ class CacheManager:
         @param url: the URI of the actual mirror request
         @type destFile: C{twisted.python.FilePath}
         @param destFile: the file where the download was written to
+        @type destStream: L{Streams.GrowingFileStream}
+        @param destStream: the stream to notify that all data is available
         @type modtime: C{int}
         @param modtime: the modified time of the cached file (seconds since epoch)
             (optional, defaults to not setting the modification time of the file)
@@ -403,6 +229,8 @@ class CacheManager:
         """
         result = hash.verify()
         if result or result is None:
+            if destStream:
+                destStream.allAvailable()
             if modtime:
                 os.utime(destFile.path, (modtime, modtime))
             
@@ -424,22 +252,26 @@ class CacheManager:
                 decHash = HashObject()
                 ext_len = len(destFile.path) - len(decFile.path)
                 df = decHash.hashInThread(decFile)
-                df.addCallback(self._save_complete, url[:-ext_len], decFile, modtime)
+                df.addCallback(self._save_complete, url[:-ext_len], decFile, modtime = modtime)
                 df.addErrback(self._save_error, url[:-ext_len], decFile)
         else:
             log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
-            destFile.remove()
+            if destStream:
+                destStream.allAvailable(remove = True)
             if decFile:
                 decFile.remove()
 
-    def _save_error(self, failure, url, destFile, decFile = None):
+    def _save_error(self, failure, url, destFile, destStream = None, decFile = None):
         """Remove the destination files."""
         log.msg('Error occurred downloading %s' % url)
         log.err(failure)
-        destFile.restat(False)
-        if destFile.exists():
-            log.msg('Removing the incomplete file: %s' % destFile.path)
-            destFile.remove()
+        if destStream:
+            destStream.allAvailable(remove = True)
+        else:
+            destFile.restat(False)
+            if destFile.exists():
+                log.msg('Removing the incomplete file: %s' % destFile.path)
+                destFile.remove()
         if decFile:
             decFile.restat(False)
             if decFile.exists():
index 0c17d3f..652198f 100644 (file)
@@ -13,6 +13,7 @@ from twisted.trial import unittest
 from twisted.python.filepath import FilePath
 
 from policies import ThrottlingFactory, ThrottlingProtocol, ProtocolWrapper
+from Streams import UploadStream, FileUploadStream, PiecesUploadStream
 from apt_p2p_conf import config
 from apt_p2p_Khashmir.bencode import bencode
 
@@ -86,61 +87,21 @@ class FileDownloader(static.File):
         return self.__class__(path, self.manager, self.defaultType, self.ignoredExts,
                               self.processors, self.indexNames[:])
         
-class UploadStream:
-    """Identifier for streams that are uploaded to peers."""
-    
-class FileUploaderStream(stream.FileStream, UploadStream):
-    """Modified to make it suitable for streaming to peers.
-    
-    Streams the file in small chunks to make it easier to throttle the
-    streaming to peers.
-    
-    @ivar CHUNK_SIZE: the size of chunks of data to send at a time
-    """
-
-    CHUNK_SIZE = 4*1024
-    
-    def read(self, sendfile=False):
-        if self.f is None:
-            return None
-
-        length = self.length
-        if length == 0:
-            self.f = None
-            return None
-        
-        # Remove the SendFileBuffer and mmap use, just use string reads and writes
-
-        readSize = min(length, self.CHUNK_SIZE)
-
-        self.f.seek(self.start)
-        b = self.f.read(readSize)
-        bytesRead = len(b)
-        if not bytesRead:
-            raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
-        else:
-            self.length -= bytesRead
-            self.start += bytesRead
-            return b
-
-class PiecesUploaderStream(stream.MemoryStream, UploadStream):
-    """Modified to identify it for streaming to peers."""
-
 class PiecesUploader(static.Data):
     """Modified to identify it for peer requests.
     
-    Uses the modified L{PieceUploaderStream} to stream the pieces for throttling.
+    Uses the modified L{Streams.PieceUploadStream} to stream the pieces for throttling.
     """
 
     def render(self, req):
         return http.Response(responsecode.OK,
                              http_headers.Headers({'content-type': self.contentType()}),
-                             stream=PiecesUploaderStream(self.data))
+                             stream=PiecesUploadStream(self.data))
         
 class FileUploader(static.File):
     """Modified to make it suitable for peer requests.
     
-    Uses the modified L{FileUploaderStream} to stream the file for throttling,
+    Uses the modified L{Streams.FileUploadStream} to stream the file for throttling,
     and doesn't do any listing of directory contents.
     """
 
@@ -165,7 +126,7 @@ class FileUploader(static.File):
 
         response = http.Response()
         # Use the modified FileStream
-        response.stream = FileUploaderStream(f, 0, self.fp.getsize())
+        response.stream = FileUploadStream(f, 0, self.fp.getsize())
 
         for (header, value) in (
             ("content-type", self.contentType()),
@@ -180,8 +141,7 @@ class UploadThrottlingProtocol(ThrottlingProtocol):
     """Protocol for throttling uploads.
     
     Determines whether or not to throttle the upload based on the type of stream.
-    Uploads use L{FileUploaderStream} or L{twisted.web2.stream.MemorySTream},
-    apt uses L{CacheManager.ProxyFileStream} or L{twisted.web.stream.FileStream}.
+    Uploads use instances of L{Streams.UploadStream}.
     """
     
     stats = None
index 9e3f6da..cfc0f78 100644 (file)
@@ -8,219 +8,21 @@ from binascii import b2a_hex, a2b_hex
 import sha
 
 from twisted.internet import reactor, defer
-from twisted.python import log, filepath
+from twisted.python import log
 from twisted.trial import unittest
 from twisted.web2 import stream
 from twisted.web2.http import Response, splitHostPort
 
 from HTTPDownloader import Peer
+from Streams import GrowingFileStream, StreamToFile
 from util import uncompact
 from Hash import PIECE_SIZE
 from apt_p2p_Khashmir.bencode import bdecode
 from apt_p2p_conf import config
 
-
 class PeerError(Exception):
     """An error occurred downloading from peers."""
     
-class GrowingFileStream(stream.FileStream):
-    """Modified to stream data from a file as it becomes available.
-    
-    @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
-    @ivar deferred: waiting for the result of the last read attempt
-    @ivar available: the number of bytes that are currently available to read
-    @ivar position: the current position in the file where the next read will begin
-    @ivar finished: True when no more data will be coming available
-    """
-
-    CHUNK_SIZE = 4*1024
-
-    def __init__(self, f, length = None):
-        stream.FileStream.__init__(self, f)
-        self.length = length
-        self.deferred = None
-        self.available = 0L
-        self.position = 0L
-        self.finished = False
-
-    def updateAvailable(self, newlyAvailable):
-        """Update the number of bytes that are available.
-        
-        Call it with 0 to trigger reading of a fully read file.
-        
-        @param newlyAvailable: the number of bytes that just became available
-        """
-        assert not self.finished
-        self.available += newlyAvailable
-        
-        # If a read is pending, let it go
-        if self.deferred and self.position < self.available:
-            # Try to read some data from the file
-            length = self.available - self.position
-            readSize = min(length, self.CHUNK_SIZE)
-            self.f.seek(self.position)
-            b = self.f.read(readSize)
-            bytesRead = len(b)
-            
-            # Check if end of file was reached
-            if bytesRead:
-                self.position += bytesRead
-                deferred = self.deferred
-                self.deferred = None
-                deferred.callback(b)
-
-    def allAvailable(self):
-        """Indicate that no more data will be coming available."""
-        self.finished = True
-
-        # If a read is pending, let it go
-        if self.deferred:
-            if self.position < self.available:
-                # Try to read some data from the file
-                length = self.available - self.position
-                readSize = min(length, self.CHUNK_SIZE)
-                self.f.seek(self.position)
-                b = self.f.read(readSize)
-                bytesRead = len(b)
-    
-                # Check if end of file was reached
-                if bytesRead:
-                    self.position += bytesRead
-                    deferred = self.deferred
-                    self.deferred = None
-                    deferred.callback(b)
-                else:
-                    # We're done
-                    self._close()
-                    deferred = self.deferred
-                    self.deferred = None
-                    deferred.callback(None)
-            else:
-                # We're done
-                self._close()
-                deferred = self.deferred
-                self.deferred = None
-                deferred.callback(None)
-        
-    def read(self, sendfile=False):
-        assert not self.deferred, "A previous read is still deferred."
-
-        if self.f is None:
-            return None
-
-        length = self.available - self.position
-        readSize = min(length, self.CHUNK_SIZE)
-
-        # If we don't have any available, we're done or deferred
-        if readSize <= 0:
-            if self.finished:
-                self._close()
-                return None
-            else:
-                self.deferred = defer.Deferred()
-                return self.deferred
-
-        # Try to read some data from the file
-        self.f.seek(self.position)
-        b = self.f.read(readSize)
-        bytesRead = len(b)
-        if not bytesRead:
-            # End of file was reached, we're done or deferred
-            if self.finished:
-                self._close()
-                return None
-            else:
-                self.deferred = defer.Deferred()
-                return self.deferred
-        else:
-            self.position += bytesRead
-            return b
-
-    def _close(self):
-        """Close the temporary file and remove it."""
-        self.f.close()
-        filepath.FilePath(self.f.name).remove()
-        self.f = None
-        
-class StreamToFile:
-    """Save a stream to a partial file and hash it.
-    
-    @type stream: L{twisted.web2.stream.IByteStream}
-    @ivar stream: the input stream being read
-    @type outFile: L{twisted.python.filepath.FilePath}
-    @ivar outFile: the file being written
-    @type hasher: hashing object, e.g. C{sha1}
-    @ivar hasher: the hash object for the data
-    @type position: C{int}
-    @ivar position: the current file position to write the next data to
-    @type length: C{int}
-    @ivar length: the position in the file to not write beyond
-    @type doneDefer: L{twisted.internet.defer.Deferred}
-    @ivar doneDefer: the deferred that will fire when done writing
-    """
-    
-    def __init__(self, hasher, inputStream, outFile, start = 0, length = None):
-        """Initializes the file.
-        
-        @type hasher: hashing object, e.g. C{sha1}
-        @param hasher: the hash object for the data
-        @type inputStream: L{twisted.web2.stream.IByteStream}
-        @param inputStream: the input stream to read from
-        @type outFile: L{twisted.python.filepath.FilePath}
-        @param outFile: the file to write to
-        @type start: C{int}
-        @param start: the file position to start writing at
-            (optional, defaults to the start of the file)
-        @type length: C{int}
-        @param length: the maximum amount of data to write to the file
-            (optional, defaults to not limiting the writing to the file
-        """
-        self.stream = inputStream
-        self.outFile = outFile
-        self.hasher = hasher
-        self.position = start
-        self.length = None
-        if length is not None:
-            self.length = start + length
-        self.doneDefer = None
-        
-    def run(self):
-        """Start the streaming.
-
-        @rtype: L{twisted.internet.defer.Deferred}
-        """
-        self.doneDefer = stream.readStream(self.stream, self._gotData)
-        self.doneDefer.addCallbacks(self._done, self._error)
-        return self.doneDefer
-
-    def _gotData(self, data):
-        """Process the received data."""
-        if self.outFile.closed:
-            raise PeerError, "outFile was unexpectedly closed"
-        
-        if data is None:
-            raise PeerError, "Data is None?"
-        
-        # Make sure we don't go too far
-        if self.length is not None and self.position + len(data) > self.length:
-            data = data[:(self.length - self.position)]
-        
-        # Write and hash the streamed data
-        self.outFile.seek(self.position)
-        self.outFile.write(data)
-        self.hasher.update(data)
-        self.position += len(data)
-        
-    def _done(self, result):
-        """Return the result."""
-        return self.hasher.digest()
-    
-    def _error(self, err):
-        """Log the error."""
-        log.msg('Streaming error')
-        log.err(err)
-        return err
-    
 class FileDownload:
     """Manage a download from a list of peers or a mirror.
     
@@ -528,6 +330,11 @@ class FileDownload:
     #{ Downloading the pieces
     def getPieces(self):
         """Download the next pieces from the peers."""
+        if self.file.closed:
+            log.msg('Download has been aborted for %s' % self.path)
+            self.stream.allAvailable(remove = True)
+            return
+            
         self.sort()
         piece = self.nextFinish
         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
@@ -554,7 +361,7 @@ class FileDownload:
         # Check if we're done
         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
             log.msg('Download is complete for %s' % self.path)
-            self.stream.allAvailable()
+            self.stream.allAvailable(remove = True)
     
     def _getPiece(self, response, piece, peer):
         """Process the retrieved headers from the peer."""
@@ -606,9 +413,9 @@ class FileDownload:
         self.getPieces()
         log.err(err)
 
-    def _gotPiece(self, response, piece, peer):
+    def _gotPiece(self, hash, piece, peer):
         """Process the retrieved piece from the peer."""
-        if self.pieces[piece] and response != self.pieces[piece]:
+        if self.pieces[piece] and hash.digest() != self.pieces[piece]:
             # Hash doesn't match
             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
             peer.hashError('Piece received from peer does not match expected')
@@ -626,7 +433,7 @@ class FileDownload:
 
     def _gotError(self, err, piece, peer):
         """Piece download failed, try again."""
-        log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
+        log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, err))
         log.err(err)
         self.completePieces[piece] = False
         self.getPieces()
diff --git a/apt_p2p/Streams.py b/apt_p2p/Streams.py
new file mode 100644 (file)
index 0000000..3eebf06
--- /dev/null
@@ -0,0 +1,382 @@
+
+"""Modified streams that are used by Apt-P2P."""
+
+from bz2 import BZ2Decompressor
+from zlib import decompressobj, MAX_WBITS
+from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
+import os
+
+from twisted.web2 import stream
+from twisted.internet import defer
+from twisted.python import log, filepath
+
+class StreamsError(Exception):
+    """An error occurred in the streaming."""
+
+class GrowingFileStream(stream.SimpleStream):
+    """Modified to stream data from a file as it becomes available.
+    
+    @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
+    @ivar deferred: waiting for the result of the last read attempt
+    @ivar available: the number of bytes that are currently available to read
+    @ivar position: the current position in the file where the next read will begin
+    @ivar closed: True if the reader has closed the stream
+    @ivar finished: True when no more data will be coming available
+    @ivar remove: whether to remove the file when streaming is complete
+    """
+
+    CHUNK_SIZE = 32*1024
+
+    def __init__(self, f, length = None):
+        self.f = f
+        self.length = length
+        self.deferred = None
+        self.available = 0L
+        self.position = 0L
+        self.closed = False
+        self.finished = False
+        self.remove = False
+
+    #{ Stream interface
+    def read(self, sendfile=False):
+        assert not self.deferred, "A previous read is still deferred."
+
+        if self.f is None:
+            return None
+
+        length = self.available - self.position
+        readSize = min(length, self.CHUNK_SIZE)
+
+        # If we don't have any available, we're done or deferred
+        if readSize <= 0:
+            if self.finished:
+                self._close()
+                return None
+            else:
+                self.deferred = defer.Deferred()
+                return self.deferred
+
+        # Try to read some data from the file
+        self.f.seek(self.position)
+        b = self.f.read(readSize)
+        bytesRead = len(b)
+        if not bytesRead:
+            # End of file was reached, we're done or deferred
+            if self.finished:
+                self._close()
+                return None
+            else:
+                self.deferred = defer.Deferred()
+                return self.deferred
+        else:
+            self.position += bytesRead
+            return b
+    
+    def split(self, point):
+        raise StreamsError, "You can not split a GrowingFileStream"
+    
+    def close(self):
+        self.length = 0
+        self.closed = True
+        self._close()
+
+    #{ Growing functions
+    def updateAvailable(self, newlyAvailable):
+        """Update the number of bytes that are available.
+        
+        Call it with 0 to trigger reading of a fully read file.
+        
+        @param newlyAvailable: the number of bytes that just became available
+        """
+        if not self.finished:
+            self.available += newlyAvailable
+        
+        # If a read is pending, let it go
+        if self.deferred and self.position < self.available:
+            # Try to read some data from the file
+            length = self.available - self.position
+            readSize = min(length, self.CHUNK_SIZE)
+            self.f.seek(self.position)
+            b = self.f.read(readSize)
+            bytesRead = len(b)
+            
+            # Check if end of file was reached
+            if bytesRead:
+                self.position += bytesRead
+                deferred = self.deferred
+                self.deferred = None
+                deferred.callback(b)
+
+    def allAvailable(self, remove = False):
+        """Indicate that no more data will be coming available.
+        
+        @param remove: whether to remove the file when streaming is complete
+        """
+        self.finished = True
+        self.remove = remove
+
+        # If a read is pending, let it go
+        if self.deferred:
+            if self.position < self.available:
+                # Try to read some data from the file
+                length = self.available - self.position
+                readSize = min(length, self.CHUNK_SIZE)
+                self.f.seek(self.position)
+                b = self.f.read(readSize)
+                bytesRead = len(b)
+    
+                # Check if end of file was reached
+                if bytesRead:
+                    self.position += bytesRead
+                    deferred = self.deferred
+                    self.deferred = None
+                    deferred.callback(b)
+                else:
+                    # We're done
+                    self._close()
+                    deferred = self.deferred
+                    self.deferred = None
+                    deferred.callback(None)
+            else:
+                # We're done
+                self._close()
+                deferred = self.deferred
+                self.deferred = None
+                deferred.callback(None)
+                
+        if self.closed:
+            self._close()
+        
+    def _close(self):
+        """Close the temporary file and maybe remove it."""
+        if self.f:
+            self.f.close()
+            if self.remove:
+                file = filepath.FilePath(self.f.name)
+                file.restat(False)
+                if file.exists():
+                    file.remove()
+            self.f = None
+        
+class StreamToFile:
+    """Save a stream to a partial file and hash it.
+    
+    Also optionally decompresses the file while it is being downloaded.
+
+    @type stream: L{twisted.web2.stream.IByteStream}
+    @ivar stream: the input stream being read
+    @type outFile: C{file}
+    @ivar outFile: the open file being written
+    @type hasher: hashing object, e.g. C{sha1}
+    @ivar hasher: the hash object for the data
+    @type gzfile: C{file}
+    @ivar gzfile: the open file to write decompressed gzip data to
+    @type gzdec: L{zlib.decompressobj}
+    @ivar gzdec: the decompressor to use for the compressed gzip data
+    @type gzheader: C{boolean}
+    @ivar gzheader: whether the gzip header still needs to be removed from
+        the zlib compressed data
+    @type bz2file: C{file}
+    @ivar bz2file: the open file to write decompressed bz2 data to
+    @type bz2dec: L{bz2.BZ2Decompressor}
+    @ivar bz2dec: the decompressor to use for the compressed bz2 data
+    @type position: C{int}
+    @ivar position: the current file position to write the next data to
+    @type length: C{int}
+    @ivar length: the position in the file to not write beyond
+    @ivar notify: a method that will be notified of the length of received data
+    @type doneDefer: L{twisted.internet.defer.Deferred}
+    @ivar doneDefer: the deferred that will fire when done writing
+    """
+    
+    def __init__(self, hasher, inputStream, outFile, start = 0, length = None,
+                 notify = None, decompress = None, decFile = None):
+        """Initializes the files.
+        
+        @type hasher: hashing object, e.g. C{sha1}
+        @param hasher: the hash object for the data
+        @type inputStream: L{twisted.web2.stream.IByteStream}
+        @param inputStream: the input stream to read from
+        @type outFile: C{file}
+        @param outFile: the open file to write to
+        @type start: C{int}
+        @param start: the file position to start writing at
+            (optional, defaults to the start of the file)
+        @type length: C{int}
+        @param length: the maximum amount of data to write to the file
+            (optional, defaults to not limiting the writing to the file
+        @param notify: a method that will be notified of the length of
+            received data (optional)
+        @type decompress: C{string}
+        @param decompress: also decompress the file as this type
+            (currently only '.gz' and '.bz2' are supported)
+        @type decFile: C{twisted.python.FilePath}
+        @param decFile: the file to write the decompressed data to
+        """
+        self.stream = inputStream
+        self.outFile = outFile
+        self.hasher = hasher
+        self.gzfile = None
+        self.bz2file = None
+        if decompress == ".gz":
+            self.gzheader = True
+            self.gzfile = decFile.open('w')
+            self.gzdec = decompressobj(-MAX_WBITS)
+        elif decompress == ".bz2":
+            self.bz2file = decFile.open('w')
+            self.bz2dec = BZ2Decompressor()
+        self.position = start
+        self.length = None
+        if length is not None:
+            self.length = start + length
+        self.notify = notify
+        self.doneDefer = None
+        
+    def run(self):
+        """Start the streaming.
+
+        @rtype: L{twisted.internet.defer.Deferred}
+        """
+        self.doneDefer = stream.readStream(self.stream, self._gotData)
+        self.doneDefer.addCallbacks(self._done, self._error)
+        return self.doneDefer
+
+    def _gotData(self, data):
+        """Process the received data."""
+        if self.outFile.closed:
+            raise StreamsError, "outFile was unexpectedly closed"
+        
+        # Make sure we don't go too far
+        if self.length is not None and self.position + len(data) > self.length:
+            data = data[:(self.length - self.position)]
+        
+        # Write and hash the streamed data
+        self.outFile.seek(self.position)
+        self.outFile.write(data)
+        self.hasher.update(data)
+        self.position += len(data)
+        
+        if self.gzfile:
+            # Decompress the zlib portion of the file
+            if self.gzheader:
+                # Remove the gzip header junk
+                self.gzheader = False
+                new_data = self._remove_gzip_header(data)
+                dec_data = self.gzdec.decompress(new_data)
+            else:
+                dec_data = self.gzdec.decompress(data)
+            self.gzfile.write(dec_data)
+        if self.bz2file:
+            # Decompress the bz2 file
+            dec_data = self.bz2dec.decompress(data)
+            self.bz2file.write(dec_data)
+            
+        if self.notify:
+            self.notify(len(data))
+
+    def _remove_gzip_header(self, data):
+        """Remove the gzip header from the zlib compressed data."""
+        # Read, check & discard the header fields
+        if data[:2] != '\037\213':
+            raise IOError, 'Not a gzipped file'
+        if ord(data[2]) != 8:
+            raise IOError, 'Unknown compression method'
+        flag = ord(data[3])
+        # modtime = self.fileobj.read(4)
+        # extraflag = self.fileobj.read(1)
+        # os = self.fileobj.read(1)
+
+        skip = 10
+        if flag & FEXTRA:
+            # Read & discard the extra field
+            xlen = ord(data[10])
+            xlen = xlen + 256*ord(data[11])
+            skip = skip + 2 + xlen
+        if flag & FNAME:
+            # Read and discard a null-terminated string containing the filename
+            while True:
+                if not data[skip] or data[skip] == '\000':
+                    break
+                skip += 1
+            skip += 1
+        if flag & FCOMMENT:
+            # Read and discard a null-terminated string containing a comment
+            while True:
+                if not data[skip] or data[skip] == '\000':
+                    break
+                skip += 1
+            skip += 1
+        if flag & FHCRC:
+            skip += 2     # Read & discard the 16-bit header CRC
+
+        return data[skip:]
+
+    def _close(self):
+        """Close all the output files."""
+        # Can't close the outfile, but we should sync it to disk
+        if not self.outFile.closed:
+            self.outFile.flush()
+        
+        # Close the decompressed file
+        if self.gzfile:
+            # Finish the decompression 
+            data_dec = self.gzdec.flush()
+            self.gzfile.write(data_dec)
+            self.gzfile.close()
+            self.gzfile = None
+        if self.bz2file:
+            self.bz2file.close()
+            self.bz2file = None
+    
+    def _done(self, result):
+        """Return the result."""
+        self._close()
+        return self.hasher
+    
+    def _error(self, err):
+        """Log the error and close everything."""
+        log.msg('Streaming error')
+        log.err(err)
+        self.stream.close()
+        self._close()
+        return err
+    
+class UploadStream:
+    """Identifier for streams that are uploaded to peers."""
+    
+class PiecesUploadStream(stream.MemoryStream, UploadStream):
+    """Modified to identify it for streaming to peers."""
+
+class FileUploadStream(stream.FileStream, UploadStream):
+    """Modified to make it suitable for streaming to peers.
+    
+    Streams the file in small chunks to make it easier to throttle the
+    streaming to peers.
+    
+    @ivar CHUNK_SIZE: the size of chunks of data to send at a time
+    """
+
+    CHUNK_SIZE = 4*1024
+    
+    def read(self, sendfile=False):
+        if self.f is None:
+            return None
+
+        length = self.length
+        if length == 0:
+            self.f = None
+            return None
+        
+        # Remove the SendFileBuffer and mmap use, just use string reads and writes
+
+        readSize = min(length, self.CHUNK_SIZE)
+
+        self.f.seek(self.start)
+        b = self.f.read(readSize)
+        bytesRead = len(b)
+        if not bytesRead:
+            raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
+        else:
+            self.length -= bytesRead
+            self.start += bytesRead
+            return b
diff --git a/test.py b/test.py
index b6a3ec8..b94e012 100755 (executable)
--- a/test.py
+++ b/test.py
@@ -339,6 +339,27 @@ tests = {'1': ('Start a single bootstrap and downloader, test updating and downl
                 (2, ['source', 'crash-whitepaper']),
                 ]),
                 
+        'c': ('Test downloading from peers and just a mirror.',
+             {1: {}},
+             {1: {},
+              2: {}},
+             [(1, ['update']), 
+              (1, ['install', 'aboot-base', 'ada-reference-manual',
+                   'fop-doc', 'bison-doc', 'crash-whitepaper',
+                   'apt-howto-common', 'aptitude-doc-en', 'asr-manpages',
+                   'alcovebook-sgml-doc', 'airstrike-common',
+                   ]),
+              (2, ['update']), 
+              (2, ['install', 'aboot-base', 'aap-doc', 'ada-reference-manual',
+                   'aspectj-doc', 'fop-doc', 'asis-doc',
+                   'bison-doc', 'crash-whitepaper',
+                   'bash-doc', 'apt-howto-common', 'autotools-dev',
+                   'aptitude-doc-en', 'asr-manpages',
+                   'atomix-data', 'alcovebook-sgml-doc',
+                   'afbackup-common', 'airstrike-common',
+                   ]),
+              ]),
+
          }
 
 assert 'all' not in tests