@ivar position: the current file position to write the next data to
@type length: C{int}
@ivar length: the position in the file to not write beyond
- @type inform: C{method}
- @ivar inform: a function to call with the length of data received
@type doneDefer: L{twisted.internet.defer.Deferred}
@ivar doneDefer: the deferred that will fire when done writing
"""
- def __init__(self, inputStream, outFile, start = 0, length = None, inform = None):
+ def __init__(self, inputStream, outFile, start = 0, length = None):
"""Initializes the file.
@type inputStream: L{twisted.web2.stream.IByteStream}
@type length: C{int}
@param length: the maximum amount of data to write to the file
(optional, defaults to not limiting the writing to the file
- @type inform: C{method}
- @param inform: a function to call with the length of data received
- (optional, defaults to calling nothing)
"""
self.stream = inputStream
self.outFile = outFile
self.length = None
if length is not None:
self.length = start + length
- self.inform = inform
self.doneDefer = None
def run(self):
self.outFile.write(data)
self.hash.update(data)
self.position += len(data)
- self.inform(len(data))
def _done(self, result):
"""Return the result."""
if parsed[0] == "http":
site = splitHostPort(parsed[0], parsed[1])
self.mirror_path = urlunparse(('', '') + parsed[2:])
- peer = self.manager.getPeer(site)
- peer.mirror = True
+ peer = self.manager.getPeer(site, mirror = True)
self.peerlist.append(peer)
# Special case if there's only one good peer left
else:
# Read the response stream to the file
log.msg('Streaming piece %d from peer %r' % (piece, peer))
- def statUpdate(bytes, stats = self.manager.stats, mirror = peer.mirror):
- stats.receivedBytes(bytes, mirror)
if response.code == 206:
df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
- PIECE_SIZE, statUpdate).run()
+ PIECE_SIZE).run()
else:
- df = StreamToFile(response.stream, self.file,
- inform = statUpdate).run()
+ df = StreamToFile(response.stream, self.file).run()
reactor.callLater(0, df.addCallbacks,
*(self._gotPiece, self._gotError),
**{'callbackArgs': (piece, peer),
assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
site = splitHostPort(parsed[0], parsed[1])
path = urlunparse(('', '') + parsed[2:])
- peer = self.getPeer(site)
- peer.mirror = True
+ peer = self.getPeer(site, mirror = True)
return peer.get(path, method, modtime)
# elif len(peers) == 1:
# site = uncompact(peers[0]['c'])
tmpfile = self.cache_dir.child(hash.hexexpected())
return FileDownload(self, hash, mirror, peers, tmpfile).run()
- def getPeer(self, site):
+ def getPeer(self, site, mirror = False):
"""Create a new peer if necessary and return it.
@type site: (C{string}, C{int})
@param site: the IP address and port of the peer
+ @param mirror: whether the peer is actually a mirror
+ (optional, defaults to False)
"""
if site not in self.clients:
- self.clients[site] = Peer(site[0], site[1])
+ self.clients[site] = Peer(site[0], site[1], self.stats)
+ if mirror:
+ self.clients[site].mirror = True
return self.clients[site]
def close(self):