@ivar position: the current file position to write the next data to
@type length: C{int}
@ivar length: the position in the file to not write beyond
+ @type inform: C{method}
+ @ivar inform: a function to call with the length of data received
@type doneDefer: L{twisted.internet.defer.Deferred}
@ivar doneDefer: the deferred that will fire when done writing
"""
- def __init__(self, inputStream, outFile, start = 0, length = None):
+ def __init__(self, inputStream, outFile, start = 0, length = None, inform = None):
"""Initializes the file.
@type inputStream: L{twisted.web2.stream.IByteStream}
@type length: C{int}
@param length: the maximum amount of data to write to the file
(optional, defaults to not limiting the writing to the file
+ @type inform: C{method}
+ @param inform: a function to call with the length of data received
+ (optional, defaults to calling nothing)
"""
self.stream = inputStream
self.outFile = outFile
self.length = None
if length is not None:
self.length = start + length
+ self.inform = inform
self.doneDefer = None
def run(self):
self.outFile.write(data)
self.hash.update(data)
self.position += len(data)
+ self.inform(len(data))
def _done(self, result):
"""Return the result."""
else:
# Read the response stream to the file
log.msg('Streaming piece %d from peer %r' % (piece, peer))
+ def statUpdate(bytes, stats = self.manager.stats, mirror = peer.mirror):
+ stats.receivedBytes(bytes, mirror)
if response.code == 206:
- df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
+ df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
+ PIECE_SIZE, statUpdate).run()
else:
- df = StreamToFile(response.stream, self.file).run()
- df.addCallbacks(self._gotPiece, self._gotError,
- callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+ df = StreamToFile(response.stream, self.file,
+ inform = statUpdate).run()
+ reactor.callLater(0, df.addCallbacks,
+ *(self._gotPiece, self._gotError),
+ **{'callbackArgs': (piece, peer),
+ 'errbackArgs': (piece, peer)})
self.outstanding -= 1
self.peerlist.append(peer)
@ivar cache_dir: the directory to use for storing all files
@type dht: L{interfaces.IDHT}
@ivar dht: the DHT instance
+ @type stats: L{stats.StatsLogger}
+ @ivar stats: the statistics logger to record sent data to
@type clients: C{dictionary}
@ivar clients: the available peers that have been previously contacted
"""
- def __init__(self, cache_dir, dht):
+ def __init__(self, cache_dir, dht, stats):
"""Initialize the instance."""
self.cache_dir = cache_dir
self.cache_dir.restat(False)
if not self.cache_dir.exists():
self.cache_dir.makedirs()
self.dht = dht
+ self.stats = stats
self.clients = {}
def get(self, hash, mirror, peers = [], method="GET", modtime=None):