-class ProxyFileStream(stream.SimpleStream):
- """Saves a stream to a file while providing a new stream.
-
- Also optionally decompresses the file while it is being downloaded.
-
- @type stream: L{twisted.web2.stream.IByteStream}
- @ivar stream: the input stream being read
- @type outFile: L{twisted.python.filepath.FilePath}
- @ivar outFile: the file being written
- @type hash: L{Hash.HashObject}
- @ivar hash: the hash object for the file
- @type gzfile: C{file}
- @ivar gzfile: the open file to write decompressed gzip data to
- @type gzdec: L{zlib.decompressobj}
- @ivar gzdec: the decompressor to use for the compressed gzip data
- @type gzheader: C{boolean}
- @ivar gzheader: whether the gzip header still needs to be removed from
- the zlib compressed data
- @type bz2file: C{file}
- @ivar bz2file: the open file to write decompressed bz2 data to
- @type bz2dec: L{bz2.BZ2Decompressor}
- @ivar bz2dec: the decompressor to use for the compressed bz2 data
- @type length: C{int}
- @ivar length: the length of the original (compressed) file
- @type doneDefer: L{twisted.internet.defer.Deferred}
- @ivar doneDefer: the deferred that will fire when done streaming
-
- @group Stream implementation: read, close
-
- """
-
- def __init__(self, stream, outFile, hash, decompress = None, decFile = None):
- """Initializes the proxy.
-
- @type stream: L{twisted.web2.stream.IByteStream}
- @param stream: the input stream to read from
- @type outFile: L{twisted.python.filepath.FilePath}
- @param outFile: the file to write to
- @type hash: L{Hash.HashObject}
- @param hash: the hash object to use for the file
- @type decompress: C{string}
- @param decompress: also decompress the file as this type
- (currently only '.gz' and '.bz2' are supported)
- @type decFile: C{twisted.python.FilePath}
- @param decFile: the file to write the decompressed data to
- """
- self.stream = stream
- self.outFile = outFile.open('w')
- self.hash = hash
- self.hash.new()
- self.gzfile = None
- self.bz2file = None
- if decompress == ".gz":
- self.gzheader = True
- self.gzfile = decFile.open('w')
- self.gzdec = decompressobj(-MAX_WBITS)
- elif decompress == ".bz2":
- self.bz2file = decFile.open('w')
- self.bz2dec = BZ2Decompressor()
- self.length = self.stream.length
- self.doneDefer = defer.Deferred()
-
- def _done(self):
- """Close all the output files, return the result."""
- if not self.outFile.closed:
- self.outFile.close()
- self.hash.digest()
- if self.gzfile:
- # Finish the decompression
- data_dec = self.gzdec.flush()
- self.gzfile.write(data_dec)
- self.gzfile.close()
- self.gzfile = None
- if self.bz2file:
- self.bz2file.close()
- self.bz2file = None
-
- def _error(self, err):
- """Close all the output files, return the error."""
- if not self.outFile.closed:
- self._done()
- self.stream.close()
- self.doneDefer.errback(err)
-
- def read(self):
- """Read some data from the stream."""
- if self.outFile.closed:
- return None
-
- # Read data from the stream, deal with the possible deferred
- data = self.stream.read()
- if isinstance(data, defer.Deferred):
- data.addCallbacks(self._write, self._error)
- return data
-
- self._write(data)
- return data
-
- def _write(self, data):
- """Write the stream data to the file and return it for others to use.
-
- Also optionally decompresses it.
- """
- if data is None:
- if not self.outFile.closed:
- self._done()
- self.doneDefer.callback(self.hash)
- return data
-
- # Write and hash the streamed data
- self.outFile.write(data)
- self.hash.update(data)
-
- if self.gzfile:
- # Decompress the zlib portion of the file
- if self.gzheader:
- # Remove the gzip header junk
- self.gzheader = False
- new_data = self._remove_gzip_header(data)
- dec_data = self.gzdec.decompress(new_data)
- else:
- dec_data = self.gzdec.decompress(data)
- self.gzfile.write(dec_data)
- if self.bz2file:
- # Decompress the bz2 file
- dec_data = self.bz2dec.decompress(data)
- self.bz2file.write(dec_data)
-
- return data
-
- def _remove_gzip_header(self, data):
- """Remove the gzip header from the zlib compressed data."""
- # Read, check & discard the header fields
- if data[:2] != '\037\213':
- raise IOError, 'Not a gzipped file'
- if ord(data[2]) != 8:
- raise IOError, 'Unknown compression method'
- flag = ord(data[3])
- # modtime = self.fileobj.read(4)
- # extraflag = self.fileobj.read(1)
- # os = self.fileobj.read(1)
-
- skip = 10
- if flag & FEXTRA:
- # Read & discard the extra field
- xlen = ord(data[10])
- xlen = xlen + 256*ord(data[11])
- skip = skip + 2 + xlen
- if flag & FNAME:
- # Read and discard a null-terminated string containing the filename
- while True:
- if not data[skip] or data[skip] == '\000':
- break
- skip += 1
- skip += 1
- if flag & FCOMMENT:
- # Read and discard a null-terminated string containing a comment
- while True:
- if not data[skip] or data[skip] == '\000':
- break
- skip += 1
- skip += 1
- if flag & FHCRC:
- skip += 2 # Read & discard the 16-bit header CRC
-
- return data[skip:]
-
- def close(self):
- """Clean everything up and return None to future reads."""
- log.msg('ProxyFileStream was prematurely closed after only %d/%d bytes' % (self.hash.size, self.length))
- if self.hash.size < self.length:
- self._error(CacheError('Prematurely closed, all data was not written'))
- elif not self.outFile.closed:
- self._done()
- self.doneDefer.callback(self.hash)
- self.length = 0
- self.stream.close()
-