Final version of INFOCOM paper.
[quix0rs-apt-p2p.git] / apt_p2p / Streams.py
1
2 """Modified streams that are used by Apt-P2P."""
3
4 from bz2 import BZ2Decompressor
5 from zlib import decompressobj, MAX_WBITS
6 from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
7 import os
8
9 from twisted.web2 import stream
10 from twisted.internet import defer
11 from twisted.python import log, filepath
12
13 class StreamsError(Exception):
14     """An error occurred in the streaming."""
15
16 class GrowingFileStream(stream.SimpleStream):
17     """Modified to stream data from a file as it becomes available.
18     
19     @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
20     @ivar deferred: waiting for the result of the last read attempt
21     @ivar available: the number of bytes that are currently available to read
22     @ivar position: the current position in the file where the next read will begin
23     @ivar closed: True if the reader has closed the stream
24     @ivar finished: True when no more data will be coming available
25     @ivar remove: whether to remove the file when streaming is complete
26     """
27
28     CHUNK_SIZE = 32*1024
29
30     def __init__(self, f, length = None):
31         self.f = f
32         self.length = length
33         self.deferred = None
34         self.available = 0L
35         self.position = 0L
36         self.closed = False
37         self.finished = False
38         self.remove = False
39
40     #{ Stream interface
41     def read(self, sendfile=False):
42         assert not self.deferred, "A previous read is still deferred."
43
44         if self.f is None:
45             return None
46
47         length = self.available - self.position
48         readSize = min(length, self.CHUNK_SIZE)
49
50         # If we don't have any available, we're done or deferred
51         if readSize <= 0:
52             if self.finished:
53                 self._close()
54                 return None
55             else:
56                 self.deferred = defer.Deferred()
57                 return self.deferred
58
59         # Try to read some data from the file
60         self.f.seek(self.position)
61         b = self.f.read(readSize)
62         bytesRead = len(b)
63         if not bytesRead:
64             # End of file was reached, we're done or deferred
65             if self.finished:
66                 self._close()
67                 return None
68             else:
69                 self.deferred = defer.Deferred()
70                 return self.deferred
71         else:
72             self.position += bytesRead
73             return b
74     
75     def split(self, point):
76         raise StreamsError, "You can not split a GrowingFileStream"
77     
78     def close(self):
79         self.length = 0
80         self.closed = True
81         self._close()
82
83     #{ Growing functions
84     def updateAvailable(self, newlyAvailable):
85         """Update the number of bytes that are available.
86         
87         Call it with 0 to trigger reading of a fully read file.
88         
89         @param newlyAvailable: the number of bytes that just became available
90         """
91         if not self.finished:
92             self.available += newlyAvailable
93         
94         # If a read is pending, let it go
95         if self.deferred and self.position < self.available:
96             # Try to read some data from the file
97             length = self.available - self.position
98             readSize = min(length, self.CHUNK_SIZE)
99             self.f.seek(self.position)
100             b = self.f.read(readSize)
101             bytesRead = len(b)
102             
103             # Check if end of file was reached
104             if bytesRead:
105                 self.position += bytesRead
106                 deferred = self.deferred
107                 self.deferred = None
108                 deferred.callback(b)
109
110     def allAvailable(self, remove = False):
111         """Indicate that no more data will be coming available.
112         
113         @param remove: whether to remove the file when streaming is complete
114         """
115         self.finished = True
116         self.remove = remove
117
118         # If a read is pending, let it go
119         if self.deferred:
120             if self.position < self.available:
121                 # Try to read some data from the file
122                 length = self.available - self.position
123                 readSize = min(length, self.CHUNK_SIZE)
124                 self.f.seek(self.position)
125                 b = self.f.read(readSize)
126                 bytesRead = len(b)
127     
128                 # Check if end of file was reached
129                 if bytesRead:
130                     self.position += bytesRead
131                     deferred = self.deferred
132                     self.deferred = None
133                     deferred.callback(b)
134                 else:
135                     # We're done
136                     self._close()
137                     deferred = self.deferred
138                     self.deferred = None
139                     deferred.callback(None)
140             else:
141                 # We're done
142                 self._close()
143                 deferred = self.deferred
144                 self.deferred = None
145                 deferred.callback(None)
146                 
147         if self.closed:
148             self._close()
149         
150     def _close(self):
151         """Close the temporary file and maybe remove it."""
152         if self.f:
153             self.f.close()
154             if self.remove:
155                 file = filepath.FilePath(self.f.name)
156                 file.restat(False)
157                 if file.exists():
158                     file.remove()
159             self.f = None
160         
161 class StreamToFile:
162     """Save a stream to a partial file and hash it.
163     
164     Also optionally decompresses the file while it is being downloaded.
165
166     @type stream: L{twisted.web2.stream.IByteStream}
167     @ivar stream: the input stream being read
168     @type outFile: C{file}
169     @ivar outFile: the open file being written
170     @type hasher: hashing object, e.g. C{sha1}
171     @ivar hasher: the hash object for the data
172     @type gzfile: C{file}
173     @ivar gzfile: the open file to write decompressed gzip data to
174     @type gzdec: L{zlib.decompressobj}
175     @ivar gzdec: the decompressor to use for the compressed gzip data
176     @type gzheader: C{boolean}
177     @ivar gzheader: whether the gzip header still needs to be removed from
178         the zlib compressed data
179     @type bz2file: C{file}
180     @ivar bz2file: the open file to write decompressed bz2 data to
181     @type bz2dec: L{bz2.BZ2Decompressor}
182     @ivar bz2dec: the decompressor to use for the compressed bz2 data
183     @type position: C{int}
184     @ivar position: the current file position to write the next data to
185     @type length: C{int}
186     @ivar length: the position in the file to not write beyond
187     @ivar notify: a method that will be notified of the length of received data
188     @type doneDefer: L{twisted.internet.defer.Deferred}
189     @ivar doneDefer: the deferred that will fire when done writing
190     """
191     
192     def __init__(self, hasher, inputStream, outFile, start = 0, length = None,
193                  notify = None, decompress = None, decFile = None):
194         """Initializes the files.
195         
196         @type hasher: hashing object, e.g. C{sha1}
197         @param hasher: the hash object for the data
198         @type inputStream: L{twisted.web2.stream.IByteStream}
199         @param inputStream: the input stream to read from
200         @type outFile: C{file}
201         @param outFile: the open file to write to
202         @type start: C{int}
203         @param start: the file position to start writing at
204             (optional, defaults to the start of the file)
205         @type length: C{int}
206         @param length: the maximum amount of data to write to the file
207             (optional, defaults to not limiting the writing to the file
208         @param notify: a method that will be notified of the length of
209             received data (optional)
210         @type decompress: C{string}
211         @param decompress: also decompress the file as this type
212             (currently only '.gz' and '.bz2' are supported)
213         @type decFile: C{twisted.python.FilePath}
214         @param decFile: the file to write the decompressed data to
215         """
216         self.stream = inputStream
217         self.outFile = outFile
218         self.hasher = hasher
219         self.gzfile = None
220         self.bz2file = None
221         if decompress == ".gz":
222             self.gzheader = True
223             self.gzfile = decFile.open('w')
224             self.gzdec = decompressobj(-MAX_WBITS)
225         elif decompress == ".bz2":
226             self.bz2file = decFile.open('w')
227             self.bz2dec = BZ2Decompressor()
228         self.position = start
229         self.length = None
230         if length is not None:
231             self.length = start + length
232         self.notify = notify
233         self.doneDefer = None
234         
235     def run(self):
236         """Start the streaming.
237
238         @rtype: L{twisted.internet.defer.Deferred}
239         """
240         self.doneDefer = stream.readStream(self.stream, self._gotData)
241         self.doneDefer.addCallbacks(self._done, self._error)
242         return self.doneDefer
243
244     def _gotData(self, data):
245         """Process the received data."""
246         if self.outFile.closed:
247             raise StreamsError, "outFile was unexpectedly closed"
248         
249         # Make sure we don't go too far
250         if self.length is not None and self.position + len(data) > self.length:
251             data = data[:(self.length - self.position)]
252         
253         # Write and hash the streamed data
254         self.outFile.seek(self.position)
255         self.outFile.write(data)
256         self.hasher.update(data)
257         self.position += len(data)
258         
259         if self.gzfile:
260             # Decompress the zlib portion of the file
261             if self.gzheader:
262                 # Remove the gzip header junk
263                 self.gzheader = False
264                 new_data = self._remove_gzip_header(data)
265                 dec_data = self.gzdec.decompress(new_data)
266             else:
267                 dec_data = self.gzdec.decompress(data)
268             self.gzfile.write(dec_data)
269         if self.bz2file:
270             # Decompress the bz2 file
271             dec_data = self.bz2dec.decompress(data)
272             self.bz2file.write(dec_data)
273             
274         if self.notify:
275             self.notify(len(data))
276
277     def _remove_gzip_header(self, data):
278         """Remove the gzip header from the zlib compressed data."""
279         # Read, check & discard the header fields
280         if data[:2] != '\037\213':
281             raise IOError, 'Not a gzipped file'
282         if ord(data[2]) != 8:
283             raise IOError, 'Unknown compression method'
284         flag = ord(data[3])
285         # modtime = self.fileobj.read(4)
286         # extraflag = self.fileobj.read(1)
287         # os = self.fileobj.read(1)
288
289         skip = 10
290         if flag & FEXTRA:
291             # Read & discard the extra field
292             xlen = ord(data[10])
293             xlen = xlen + 256*ord(data[11])
294             skip = skip + 2 + xlen
295         if flag & FNAME:
296             # Read and discard a null-terminated string containing the filename
297             while True:
298                 if not data[skip] or data[skip] == '\000':
299                     break
300                 skip += 1
301             skip += 1
302         if flag & FCOMMENT:
303             # Read and discard a null-terminated string containing a comment
304             while True:
305                 if not data[skip] or data[skip] == '\000':
306                     break
307                 skip += 1
308             skip += 1
309         if flag & FHCRC:
310             skip += 2     # Read & discard the 16-bit header CRC
311
312         return data[skip:]
313
314     def _close(self):
315         """Close all the output files."""
316         # Can't close the outfile, but we should sync it to disk
317         if not self.outFile.closed:
318             self.outFile.flush()
319         
320         # Close the decompressed file
321         if self.gzfile:
322             # Finish the decompression 
323             data_dec = self.gzdec.flush()
324             self.gzfile.write(data_dec)
325             self.gzfile.close()
326             self.gzfile = None
327         if self.bz2file:
328             self.bz2file.close()
329             self.bz2file = None
330     
331     def _done(self, result):
332         """Return the result."""
333         self._close()
334         return self.hasher
335     
336     def _error(self, err):
337         """Log the error and close everything."""
338         log.msg('Streaming error')
339         log.err(err)
340         self.stream.close()
341         self._close()
342         return err
343     
344 class UploadStream:
345     """Identifier for streams that are uploaded to peers."""
346     
347 class PiecesUploadStream(stream.MemoryStream, UploadStream):
348     """Modified to identify it for streaming to peers."""
349
350 class FileUploadStream(stream.FileStream, UploadStream):
351     """Modified to make it suitable for streaming to peers.
352     
353     Streams the file in small chunks to make it easier to throttle the
354     streaming to peers.
355     
356     @ivar CHUNK_SIZE: the size of chunks of data to send at a time
357     """
358
359     CHUNK_SIZE = 4*1024
360     
361     def read(self, sendfile=False):
362         if self.f is None:
363             return None
364
365         length = self.length
366         if length == 0:
367             self.f = None
368             return None
369         
370         # Remove the SendFileBuffer and mmap use, just use string reads and writes
371
372         readSize = min(length, self.CHUNK_SIZE)
373
374         self.f.seek(self.start)
375         b = self.f.read(readSize)
376         bytesRead = len(b)
377         if not bytesRead:
378             raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
379         else:
380             self.length -= bytesRead
381             self.start += bytesRead
382             return b