2 """Modified streams that are used by Apt-P2P."""
4 from bz2 import BZ2Decompressor
5 from zlib import decompressobj, MAX_WBITS
6 from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
9 from twisted.web2 import stream
10 from twisted.internet import defer
11 from twisted.python import log, filepath
13 class StreamsError(Exception):
14 """An error occurred in the streaming."""
16 class GrowingFileStream(stream.SimpleStream):
17 """Modified to stream data from a file as it becomes available.
19 @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
20 @ivar deferred: waiting for the result of the last read attempt
21 @ivar available: the number of bytes that are currently available to read
22 @ivar position: the current position in the file where the next read will begin
23 @ivar closed: True if the reader has closed the stream
24 @ivar finished: True when no more data will be coming available
25 @ivar remove: whether to remove the file when streaming is complete
30 def __init__(self, f, length = None):
41 def read(self, sendfile=False):
42 assert not self.deferred, "A previous read is still deferred."
47 length = self.available - self.position
48 readSize = min(length, self.CHUNK_SIZE)
50 # If we don't have any available, we're done or deferred
56 self.deferred = defer.Deferred()
59 # Try to read some data from the file
60 self.f.seek(self.position)
61 b = self.f.read(readSize)
64 # End of file was reached, we're done or deferred
69 self.deferred = defer.Deferred()
72 self.position += bytesRead
75 def split(self, point):
76 raise StreamsError, "You can not split a GrowingFileStream"
84 def updateAvailable(self, newlyAvailable):
85 """Update the number of bytes that are available.
87 Call it with 0 to trigger reading of a fully read file.
89 @param newlyAvailable: the number of bytes that just became available
92 self.available += newlyAvailable
94 # If a read is pending, let it go
95 if self.deferred and self.position < self.available:
96 # Try to read some data from the file
97 length = self.available - self.position
98 readSize = min(length, self.CHUNK_SIZE)
99 self.f.seek(self.position)
100 b = self.f.read(readSize)
103 # Check if end of file was reached
105 self.position += bytesRead
106 deferred = self.deferred
110 def allAvailable(self, remove = False):
111 """Indicate that no more data will be coming available.
113 @param remove: whether to remove the file when streaming is complete
118 # If a read is pending, let it go
120 if self.position < self.available:
121 # Try to read some data from the file
122 length = self.available - self.position
123 readSize = min(length, self.CHUNK_SIZE)
124 self.f.seek(self.position)
125 b = self.f.read(readSize)
128 # Check if end of file was reached
130 self.position += bytesRead
131 deferred = self.deferred
137 deferred = self.deferred
139 deferred.callback(None)
143 deferred = self.deferred
145 deferred.callback(None)
151 """Close the temporary file and maybe remove it."""
155 file = filepath.FilePath(self.f.name)
162 """Save a stream to a partial file and hash it.
164 Also optionally decompresses the file while it is being downloaded.
166 @type stream: L{twisted.web2.stream.IByteStream}
167 @ivar stream: the input stream being read
168 @type outFile: C{file}
169 @ivar outFile: the open file being written
170 @type hasher: hashing object, e.g. C{sha1}
171 @ivar hasher: the hash object for the data
172 @type gzfile: C{file}
173 @ivar gzfile: the open file to write decompressed gzip data to
174 @type gzdec: L{zlib.decompressobj}
175 @ivar gzdec: the decompressor to use for the compressed gzip data
176 @type gzheader: C{boolean}
177 @ivar gzheader: whether the gzip header still needs to be removed from
178 the zlib compressed data
179 @type bz2file: C{file}
180 @ivar bz2file: the open file to write decompressed bz2 data to
181 @type bz2dec: L{bz2.BZ2Decompressor}
182 @ivar bz2dec: the decompressor to use for the compressed bz2 data
183 @type position: C{int}
184 @ivar position: the current file position to write the next data to
186 @ivar length: the position in the file to not write beyond
187 @ivar notify: a method that will be notified of the length of received data
188 @type doneDefer: L{twisted.internet.defer.Deferred}
189 @ivar doneDefer: the deferred that will fire when done writing
192 def __init__(self, hasher, inputStream, outFile, start = 0, length = None,
193 notify = None, decompress = None, decFile = None):
194 """Initializes the files.
196 @type hasher: hashing object, e.g. C{sha1}
197 @param hasher: the hash object for the data
198 @type inputStream: L{twisted.web2.stream.IByteStream}
199 @param inputStream: the input stream to read from
200 @type outFile: C{file}
201 @param outFile: the open file to write to
203 @param start: the file position to start writing at
204 (optional, defaults to the start of the file)
206 @param length: the maximum amount of data to write to the file
207 (optional, defaults to not limiting the writing to the file
208 @param notify: a method that will be notified of the length of
209 received data (optional)
210 @type decompress: C{string}
211 @param decompress: also decompress the file as this type
212 (currently only '.gz' and '.bz2' are supported)
213 @type decFile: C{twisted.python.FilePath}
214 @param decFile: the file to write the decompressed data to
216 self.stream = inputStream
217 self.outFile = outFile
221 if decompress == ".gz":
223 self.gzfile = decFile.open('w')
224 self.gzdec = decompressobj(-MAX_WBITS)
225 elif decompress == ".bz2":
226 self.bz2file = decFile.open('w')
227 self.bz2dec = BZ2Decompressor()
228 self.position = start
230 if length is not None:
231 self.length = start + length
233 self.doneDefer = None
236 """Start the streaming.
238 @rtype: L{twisted.internet.defer.Deferred}
240 self.doneDefer = stream.readStream(self.stream, self._gotData)
241 self.doneDefer.addCallbacks(self._done, self._error)
242 return self.doneDefer
244 def _gotData(self, data):
245 """Process the received data."""
246 if self.outFile.closed:
247 raise StreamsError, "outFile was unexpectedly closed"
249 # Make sure we don't go too far
250 if self.length is not None and self.position + len(data) > self.length:
251 data = data[:(self.length - self.position)]
253 # Write and hash the streamed data
254 self.outFile.seek(self.position)
255 self.outFile.write(data)
256 self.hasher.update(data)
257 self.position += len(data)
260 # Decompress the zlib portion of the file
262 # Remove the gzip header junk
263 self.gzheader = False
264 new_data = self._remove_gzip_header(data)
265 dec_data = self.gzdec.decompress(new_data)
267 dec_data = self.gzdec.decompress(data)
268 self.gzfile.write(dec_data)
270 # Decompress the bz2 file
271 dec_data = self.bz2dec.decompress(data)
272 self.bz2file.write(dec_data)
275 self.notify(len(data))
277 def _remove_gzip_header(self, data):
278 """Remove the gzip header from the zlib compressed data."""
279 # Read, check & discard the header fields
280 if data[:2] != '\037\213':
281 raise IOError, 'Not a gzipped file'
282 if ord(data[2]) != 8:
283 raise IOError, 'Unknown compression method'
285 # modtime = self.fileobj.read(4)
286 # extraflag = self.fileobj.read(1)
287 # os = self.fileobj.read(1)
291 # Read & discard the extra field
293 xlen = xlen + 256*ord(data[11])
294 skip = skip + 2 + xlen
296 # Read and discard a null-terminated string containing the filename
298 if not data[skip] or data[skip] == '\000':
303 # Read and discard a null-terminated string containing a comment
305 if not data[skip] or data[skip] == '\000':
310 skip += 2 # Read & discard the 16-bit header CRC
315 """Close all the output files."""
316 # Can't close the outfile, but we should sync it to disk
317 if not self.outFile.closed:
320 # Close the decompressed file
322 # Finish the decompression
323 data_dec = self.gzdec.flush()
324 self.gzfile.write(data_dec)
331 def _done(self, result):
332 """Return the result."""
336 def _error(self, err):
337 """Log the error and close everything."""
338 log.msg('Streaming error')
345 """Identifier for streams that are uploaded to peers."""
347 class PiecesUploadStream(stream.MemoryStream, UploadStream):
348 """Modified to identify it for streaming to peers."""
350 class FileUploadStream(stream.FileStream, UploadStream):
351 """Modified to make it suitable for streaming to peers.
353 Streams the file in small chunks to make it easier to throttle the
356 @ivar CHUNK_SIZE: the size of chunks of data to send at a time
361 def read(self, sendfile=False):
370 # Remove the SendFileBuffer and mmap use, just use string reads and writes
372 readSize = min(length, self.CHUNK_SIZE)
374 self.f.seek(self.start)
375 b = self.f.read(readSize)
378 raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
380 self.length -= bytesRead
381 self.start += bytesRead