- @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
- @ivar deferred: waiting for the result of the last read attempt
- @ivar available: the number of bytes that are currently available to read
- @ivar position: the current position in the file where the next read will begin
- @ivar finished: True when no more data will be coming available
- """
-
- CHUNK_SIZE = 4*1024
-
- def __init__(self, f):
- stream.FileStream.__init__(self, f)
- self.length = None
- self.deferred = None
- self.available = 0L
- self.position = 0L
- self.finished = False
-
- def updateAvaliable(self, newlyAvailable):
- """Update the number of bytes that are available.
-
- Call it with 0 to trigger reading of a fully read file.
-
- @param newlyAvailable: the number of bytes that just became available
- """
- assert not self.finished
- self.available += newlyAvailable
-
- # If a read is pending, let it go
- if self.deferred and self.position < self.available:
- # Try to read some data from the file
- length = self.available - self.position
- readSize = min(length, self.CHUNK_SIZE)
- self.f.seek(self.position)
- b = self.f.read(readSize)
- bytesRead = len(b)
-
- # Check if end of file was reached
- if bytesRead:
- self.position += bytesRead
- deferred = self.deferred
- self.deferred = None
- deferred.callback(b)
-
- def allAvailable(self):
- """Indicate that no more data is coming available."""
- self.finished = True
-
- # If a read is pending, let it go
- if self.deferred:
- if self.position < self.available:
- # Try to read some data from the file
- length = self.available - self.position
- readSize = min(length, self.CHUNK_SIZE)
- self.f.seek(self.position)
- b = self.f.read(readSize)
- bytesRead = len(b)
-
- # Check if end of file was reached
- if bytesRead:
- self.position += bytesRead
- deferred = self.deferred
- self.deferred = None
- deferred.callback(b)
- else:
- # We're done
- deferred.callback(None)
- else:
- # We're done
- deferred.callback(None)
-
- def read(self, sendfile=False):
- assert not self.deferred, "A previous read is still deferred."
-
- if self.f is None:
- return None
-
- length = self.available - self.position
- readSize = min(length, self.CHUNK_SIZE)
-
- # If we don't have any available, we're done or deferred
- if readSize <= 0:
- if self.finished:
- return None
- else:
- self.deferred = defer.Deferred()
- return self.deferred
-
- # Try to read some data from the file
- self.f.seek(self.position)
- b = self.f.read(readSize)
- bytesRead = len(b)
- if not bytesRead:
- # End of file was reached, we're done or deferred
- if self.finished:
- return None
- else:
- self.deferred = defer.Deferred()
- return self.deferred
- else:
- self.position += bytesRead
- return b
-
-class StreamToFile(defer.Deferred):
- """Saves a stream to a file.
-
- @type stream: L{twisted.web2.stream.IByteStream}
- @ivar stream: the input stream being read
- @type outFile: L{twisted.python.filepath.FilePath}
- @ivar outFile: the file being written
- @type hash: L{Hash.HashObject}
- @ivar hash: the hash object for the file
- @type length: C{int}
- @ivar length: the length of the original (compressed) file
- @type doneDefer: L{twisted.internet.defer.Deferred}
- @ivar doneDefer: the deferred that will fire when done streaming
- """
-
- def __init__(self, inputStream, outFile, hash, start, length):
- """Initializes the file.
-
- @type inputStream: L{twisted.web2.stream.IByteStream}
- @param inputStream: the input stream to read from
- @type outFile: L{twisted.python.filepath.FilePath}
- @param outFile: the file to write to
- @type hash: L{Hash.HashObject}
- @param hash: the hash object to use for the file
- """
- self.stream = inputStream
- self.outFile = outFile.open('w')
- self.hash = hash
- self.hash.new()
- self.length = self.stream.length
-
- def run(self):
- """Start the streaming."""
- self.doneDefer = stream.readStream(self.stream, _gotData)
- self.doneDefer.addCallbacks(self._done, self._error)
- return self.doneDefer
-
- def _done(self):
- """Close all the output files, return the result."""
- if not self.outFile.closed:
- self.outFile.close()
- self.hash.digest()
- self.doneDefer.callback(self.hash)
-
- def _gotData(self, data):
- self.peers[site]['pieces'] += data
-
- def read(self):
- """Read some data from the stream."""
- if self.outFile.closed:
- return None
-
- # Read data from the stream, deal with the possible deferred
- data = self.stream.read()
- if isinstance(data, defer.Deferred):
- data.addCallbacks(self._write, self._done)
- return data
-
- self._write(data)
- return data
-
- def _write(self, data):
- """Write the stream data to the file and return it for others to use.
-
- Also optionally decompresses it.
- """
- if data is None:
- self._done()
- return data
-
- # Write and hash the streamed data
- self.outFile.write(data)
- self.hash.update(data)
-
- return data
-
- def close(self):
- """Clean everything up and return None to future reads."""
- self.length = 0
- self._done()
- self.stream.close()
-
-