+class GrowingFileStream(stream.FileStream):
+ """Modified to stream data from a file as it becomes available.
+
+ @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
+ @ivar deferred: waiting for the result of the last read attempt
+ @ivar available: the number of bytes that are currently available to read
+ @ivar position: the current position in the file where the next read will begin
+ @ivar finished: True when no more data will be coming available
+ """
+
+ CHUNK_SIZE = 4*1024
+
+ def __init__(self, f):
+ stream.FileStream.__init__(self, f)
+ self.length = None
+ self.deferred = None
+ self.available = 0L
+ self.position = 0L
+ self.finished = False
+
+ def updateAvaliable(self, newlyAvailable):
+ """Update the number of bytes that are available.
+
+ Call it with 0 to trigger reading of a fully read file.
+
+ @param newlyAvailable: the number of bytes that just became available
+ """
+ assert not self.finished
+ self.available += newlyAvailable
+
+ # If a read is pending, let it go
+ if self.deferred and self.position < self.available:
+ # Try to read some data from the file
+ length = self.available - self.position
+ readSize = min(length, self.CHUNK_SIZE)
+ self.f.seek(self.position)
+ b = self.f.read(readSize)
+ bytesRead = len(b)
+
+ # Check if end of file was reached
+ if bytesRead:
+ self.position += bytesRead
+ deferred = self.deferred
+ self.deferred = None
+ deferred.callback(b)
+
+ def allAvailable(self):
+ """Indicate that no more data is coming available."""
+ self.finished = True
+
+ # If a read is pending, let it go
+ if self.deferred:
+ if self.position < self.available:
+ # Try to read some data from the file
+ length = self.available - self.position
+ readSize = min(length, self.CHUNK_SIZE)
+ self.f.seek(self.position)
+ b = self.f.read(readSize)
+ bytesRead = len(b)
+
+ # Check if end of file was reached
+ if bytesRead:
+ self.position += bytesRead
+ deferred = self.deferred
+ self.deferred = None
+ deferred.callback(b)
+ else:
+ # We're done
+ deferred.callback(None)
+ else:
+ # We're done
+ deferred.callback(None)
+
+ def read(self, sendfile=False):
+ assert not self.deferred, "A previous read is still deferred."
+
+ if self.f is None:
+ return None
+
+ length = self.available - self.position
+ readSize = min(length, self.CHUNK_SIZE)
+
+ # If we don't have any available, we're done or deferred
+ if readSize <= 0:
+ if self.finished:
+ return None
+ else:
+ self.deferred = defer.Deferred()
+ return self.deferred
+
+ # Try to read some data from the file
+ self.f.seek(self.position)
+ b = self.f.read(readSize)
+ bytesRead = len(b)
+ if not bytesRead:
+ # End of file was reached, we're done or deferred
+ if self.finished:
+ return None
+ else:
+ self.deferred = defer.Deferred()
+ return self.deferred
+ else:
+ self.position += bytesRead
+ return b
+
+class StreamToFile(defer.Deferred):
+ """Saves a stream to a file.
+
+ @type stream: L{twisted.web2.stream.IByteStream}
+ @ivar stream: the input stream being read
+ @type outFile: L{twisted.python.filepath.FilePath}
+ @ivar outFile: the file being written
+ @type hash: L{Hash.HashObject}
+ @ivar hash: the hash object for the file
+ @type length: C{int}
+ @ivar length: the length of the original (compressed) file
+ @type doneDefer: L{twisted.internet.defer.Deferred}
+ @ivar doneDefer: the deferred that will fire when done streaming
+ """
+
+ def __init__(self, inputStream, outFile, hash, start, length):
+ """Initializes the file.
+
+ @type inputStream: L{twisted.web2.stream.IByteStream}
+ @param inputStream: the input stream to read from
+ @type outFile: L{twisted.python.filepath.FilePath}
+ @param outFile: the file to write to
+ @type hash: L{Hash.HashObject}
+ @param hash: the hash object to use for the file
+ """
+ self.stream = inputStream
+ self.outFile = outFile.open('w')
+ self.hash = hash
+ self.hash.new()
+ self.length = self.stream.length
+
+ def run(self):
+ """Start the streaming."""
+ self.doneDefer = stream.readStream(self.stream, _gotData)
+ self.doneDefer.addCallbacks(self._done, self._error)
+ return self.doneDefer
+
+ def _done(self):
+ """Close all the output files, return the result."""
+ if not self.outFile.closed:
+ self.outFile.close()
+ self.hash.digest()
+ self.doneDefer.callback(self.hash)
+
+ def _gotData(self, data):
+ self.peers[site]['pieces'] += data
+
+ def read(self):
+ """Read some data from the stream."""
+ if self.outFile.closed:
+ return None
+
+ # Read data from the stream, deal with the possible deferred
+ data = self.stream.read()
+ if isinstance(data, defer.Deferred):
+ data.addCallbacks(self._write, self._done)
+ return data
+
+ self._write(data)
+ return data
+
+ def _write(self, data):
+ """Write the stream data to the file and return it for others to use.
+
+ Also optionally decompresses it.
+ """
+ if data is None:
+ self._done()
+ return data
+
+ # Write and hash the streamed data
+ self.outFile.write(data)
+ self.hash.update(data)
+
+ return data
+
+ def close(self):
+ """Clean everything up and return None to future reads."""
+ self.length = 0
+ self._done()
+ self.stream.close()
+
+
+class FileDownload: