from apt_p2p_Khashmir.bencode import bdecode
from apt_p2p_conf import config
+
+class PeerError(Exception):
+ """An error occurred downloading from peers."""
+
class GrowingFileStream(stream.FileStream):
"""Modified to stream data from a file as it becomes available.
@ivar position: the current file position to write the next data to
@type length: C{int}
@ivar length: the position in the file to not write beyond
- @type inform: C{method}
- @ivar inform: a function to call with the length of data received
@type doneDefer: L{twisted.internet.defer.Deferred}
@ivar doneDefer: the deferred that will fire when done writing
"""
- def __init__(self, inputStream, outFile, start = 0, length = None, inform = None):
+ def __init__(self, inputStream, outFile, start = 0, length = None):
"""Initializes the file.
@type inputStream: L{twisted.web2.stream.IByteStream}
@type length: C{int}
@param length: the maximum amount of data to write to the file
(optional, defaults to not limiting the writing to the file
- @type inform: C{method}
- @param inform: a function to call with the length of data received
- (optional, defaults to calling nothing)
"""
self.stream = inputStream
self.outFile = outFile
self.length = None
if length is not None:
self.length = start + length
- self.inform = inform
self.doneDefer = None
def run(self):
def _gotData(self, data):
"""Process the received data."""
if self.outFile.closed:
- raise Exception, "outFile was unexpectedly closed"
+ raise PeerError, "outFile was unexpectedly closed"
if data is None:
- raise Exception, "Data is None?"
+ raise PeerError, "Data is None?"
# Make sure we don't go too far
if self.length is not None and self.position + len(data) > self.length:
self.outFile.write(data)
self.hash.update(data)
self.position += len(data)
- self.inform(len(data))
def _done(self, result):
"""Return the result."""
if self.pieces is None:
# Send a request to one or more peers
- log.msg('Checking for a peer to request piece hashes from')
for site in self.peers:
if self.peers[site].get('failed', False) != True:
log.msg('Sending a piece hash request to %r' % (site, ))
if self.outstanding >= 4:
break
- log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
if self.pieces is None and self.outstanding <= 0:
# Continue without the piece hashes
log.msg('Could not retrieve the piece hashes from the peers')
log.msg('Got a piece hash response %d from %r' % (response.code, site))
if response.code != 200:
# Request failed, try a different peer
- log.msg('Did not like response %d from %r' % (response.code, site))
self.getPeerPieces(key, site)
else:
# Read the response stream to a string
if parsed[0] == "http":
site = splitHostPort(parsed[0], parsed[1])
self.mirror_path = urlunparse(('', '') + parsed[2:])
- peer = self.manager.getPeer(site)
- peer.mirror = True
+ peer = self.manager.getPeer(site, mirror = True)
self.peerlist.append(peer)
# Special case if there's only one good peer left
#{ Downloading the pieces
def getPieces(self):
"""Download the next pieces from the peers."""
- log.msg('Checking for more piece requests to send')
self.sort()
piece = self.nextFinish
while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
- log.msg('Checking piece %d' % piece)
if self.completePieces[piece] == False:
# Send a request to the highest ranked peer
peer = self.peerlist.pop()
'errbackArgs': (piece, peer)})
piece += 1
- log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
# Check if we're done
if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
log.msg('We seem to be done with all pieces')
else:
# Read the response stream to the file
log.msg('Streaming piece %d from peer %r' % (piece, peer))
- def statUpdate(bytes, stats = self.manager.stats, mirror = peer.mirror):
- stats.receivedBytes(bytes, mirror)
if response.code == 206:
df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
- PIECE_SIZE, statUpdate).run()
+ PIECE_SIZE).run()
else:
- df = StreamToFile(response.stream, self.file,
- inform = statUpdate).run()
+ df = StreamToFile(response.stream, self.file).run()
reactor.callLater(0, df.addCallbacks,
*(self._gotPiece, self._gotError),
**{'callbackArgs': (piece, peer),
assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
site = splitHostPort(parsed[0], parsed[1])
path = urlunparse(('', '') + parsed[2:])
- peer = self.getPeer(site)
- peer.mirror = True
+ peer = self.getPeer(site, mirror = True)
return peer.get(path, method, modtime)
# elif len(peers) == 1:
# site = uncompact(peers[0]['c'])
tmpfile = self.cache_dir.child(hash.hexexpected())
return FileDownload(self, hash, mirror, peers, tmpfile).run()
- def getPeer(self, site):
+ def getPeer(self, site, mirror = False):
"""Create a new peer if necessary and return it.
@type site: (C{string}, C{int})
@param site: the IP address and port of the peer
+ @param mirror: whether the peer is actually a mirror
+ (optional, defaults to False)
"""
if site not in self.clients:
- self.clients[site] = Peer(site[0], site[1])
+ self.clients[site] = Peer(site[0], site[1], self.stats)
+ if mirror:
+ self.clients[site].mirror = True
return self.clients[site]
def close(self):