X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p%2FPeerManager.py;h=737669751da78ee9a3242996aed50e41dc5aa18b;hb=297459dc70c104886d3c1794894539aa9ea26c4d;hp=0f64e8f3161f2e99e3cee9f16ec9f37fd2803f92;hpb=695f86cd055c50859f1a9919e96756fb3aa4c429;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/PeerManager.py b/apt_p2p/PeerManager.py index 0f64e8f..7376697 100644 --- a/apt_p2p/PeerManager.py +++ b/apt_p2p/PeerManager.py @@ -15,8 +15,9 @@ from twisted.web2.http import Response, splitHostPort from HTTPDownloader import Peer from util import uncompact -from hash import PIECE_SIZE +from Hash import PIECE_SIZE from apt_p2p_Khashmir.bencode import bdecode +from apt_p2p_conf import config class GrowingFileStream(stream.FileStream): """Modified to stream data from a file as it becomes available. @@ -30,15 +31,15 @@ class GrowingFileStream(stream.FileStream): CHUNK_SIZE = 4*1024 - def __init__(self, f): + def __init__(self, f, length = None): stream.FileStream.__init__(self, f) - self.length = None + self.length = length self.deferred = None self.available = 0L self.position = 0L self.finished = False - def updateAvaliable(self, newlyAvailable): + def updateAvailable(self, newlyAvailable): """Update the number of bytes that are available. Call it with 0 to trigger reading of a fully read file. @@ -86,9 +87,13 @@ class GrowingFileStream(stream.FileStream): deferred.callback(b) else: # We're done + deferred = self.deferred + self.deferred = None deferred.callback(None) else: # We're done + deferred = self.deferred + self.deferred = None deferred.callback(None) def read(self, sendfile=False): @@ -123,7 +128,7 @@ class GrowingFileStream(stream.FileStream): self.position += bytesRead return b -class StreamToFile(defer.Deferred): +class StreamToFile: """Save a stream to a partial file and hash it. @type stream: L{twisted.web2.stream.IByteStream} @@ -168,6 +173,7 @@ class StreamToFile(defer.Deferred): @rtype: L{twisted.internet.defer.Deferred} """ + log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position)) self.doneDefer = stream.readStream(self.stream, self._gotData) self.doneDefer.addCallbacks(self._done, self._error) return self.doneDefer @@ -192,10 +198,12 @@ class StreamToFile(defer.Deferred): def _done(self, result): """Return the result.""" + log.msg('Streaming is complete') return self.hash.digest() def _error(self, err): """Log the error.""" + log.msg('Streaming error') log.err(err) return err @@ -254,27 +262,29 @@ class FileDownload: self.compact_peers = compact_peers self.path = '/~/' + quote_plus(hash.expected()) + self.mirror_path = None self.pieces = None self.started = False file.restat(False) if file.exists(): file.remove() - self.file = file.open('w') + self.file = file.open('w+') def run(self): """Start the downloading process.""" + log.msg('Checking for pieces for %s' % self.path) self.defer = defer.Deferred() self.peers = {} no_pieces = 0 - pieces_string = {} - pieces_hash = {} - pieces_dl_hash = {} + pieces_string = {0: 0} + pieces_hash = {0: 0} + pieces_dl_hash = {0: 0} for compact_peer in self.compact_peers: # Build a list of all the peers for this download site = uncompact(compact_peer['c']) - peer = manager.getPeer(site) + peer = self.manager.getPeer(site) self.peers.setdefault(site, {})['peer'] = peer # Extract any piece information from the peers list @@ -303,7 +313,8 @@ class FileDownload: if max_found == no_pieces: # The file is not split into pieces - self.pieces = [] + log.msg('No pieces were found for the file') + self.pieces = [self.hash.expected()] self.startDownload() elif max_found == max(pieces_string.values()): # Small number of pieces in a string @@ -319,13 +330,15 @@ class FileDownload: for pieces, num in pieces_hash.items(): # Find the most popular piece hash to lookup if num == max_found: + log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces) self.getDHTPieces(pieces) break elif max_found == max(pieces_dl_hash.values()): # Large number of pieces stored in peers - for pieces, num in pieces_hash.items(): + for pieces, num in pieces_dl_hash.items(): # Find the most popular piece hash to download if num == max_found: + log.msg('Found a hash for pieces to lookup in peers: %r' % pieces) self.getPeerPieces(pieces) break return self.defer @@ -343,23 +356,27 @@ class FileDownload: # Start the DHT lookup lookupDefer = self.manager.dht.getValue(key) - lookupDefer.addCallback(self._getDHTPieces, key) + lookupDefer.addBoth(self._getDHTPieces, key) def _getDHTPieces(self, results, key): """Check the retrieved values.""" - for result in results: - # Make sure the hash matches the key - result_hash = sha.new(result.get('t', '')).digest() - if result_hash == key: - pieces = result['t'] - self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)] - log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces)) - self.startDownload() - return + if isinstance(results, list): + for result in results: + # Make sure the hash matches the key + result_hash = sha.new(result.get('t', '')).digest() + if result_hash == key: + pieces = result['t'] + self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)] + log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces)) + self.startDownload() + return + + log.msg('Could not retrieve the piece hashes from the DHT') + else: + log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, )) # Continue without the piece hashes - log.msg('Could not retrieve the piece hashes from the DHT') - self.pieces = [] + self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)] self.startDownload() def getPeerPieces(self, key, failedSite = None): @@ -368,61 +385,76 @@ class FileDownload: @param key: the key to request from the peers """ if failedSite is None: + log.msg('Starting the lookup of piece hashes in peers') self.outstanding = 0 # Remove any peers with the wrong piece hash #for site in self.peers.keys(): # if self.peers[site].get('l', '') != key: # del self.peers[site] else: + log.msg('Piece hash lookup failed for peer %r' % (failedSite, )) self.peers[failedSite]['failed'] = True self.outstanding -= 1 if self.pieces is None: # Send a request to one or more peers + log.msg('Checking for a peer to request piece hashes from') for site in self.peers: if self.peers[site].get('failed', False) != True: + log.msg('Sending a piece hash request to %r' % (site, )) path = '/~/' + quote_plus(key) lookupDefer = self.peers[site]['peer'].get(path) - lookupDefer.addCallbacks(self._getPeerPieces, self._gotPeerError, - callbackArgs=(key, site), errbackArgs=(key, site)) + reactor.callLater(0, lookupDefer.addCallbacks, + *(self._getPeerPieces, self._gotPeerError), + **{'callbackArgs': (key, site), + 'errbackArgs': (key, site)}) self.outstanding += 1 - if self.outstanding >= 3: + if self.outstanding >= 4: break - if self.pieces is None and self.outstanding == 0: + log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding) + if self.pieces is None and self.outstanding <= 0: # Continue without the piece hashes log.msg('Could not retrieve the piece hashes from the peers') - self.pieces = [] + self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)] self.startDownload() def _getPeerPieces(self, response, key, site): """Process the retrieved response from the peer.""" + log.msg('Got a piece hash response %d from %r' % (response.code, site)) if response.code != 200: # Request failed, try a different peer + log.msg('Did not like response %d from %r' % (response.code, site)) self.getPeerPieces(key, site) else: # Read the response stream to a string self.peers[site]['pieces'] = '' def _gotPeerPiece(data, self = self, site = site): + log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data))) self.peers[site]['pieces'] += data + log.msg('Streaming piece hashes from peer') df = stream.readStream(response.stream, _gotPeerPiece) df.addCallbacks(self._gotPeerPieces, self._gotPeerError, callbackArgs=(key, site), errbackArgs=(key, site)) def _gotPeerError(self, err, key, site): """Peer failed, try again.""" + log.msg('Peer piece hash request failed for %r' % (site, )) log.err(err) self.getPeerPieces(key, site) def _gotPeerPieces(self, result, key, site): """Check the retrieved pieces from the peer.""" + log.msg('Finished streaming piece hashes from peer %r' % (site, )) if self.pieces is not None: # Already done + log.msg('Already done') return try: result = bdecode(self.peers[site]['pieces']) except: + log.msg('Error bdecoding piece hashes') log.err() self.getPeerPieces(key, site) return @@ -431,7 +463,7 @@ class FileDownload: if result_hash == key: pieces = result['t'] self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)] - log.msg('Retrieved %d piece hashes from the peer' % len(self.pieces)) + log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site)) self.startDownload() else: log.msg('Peer returned a piece string that did not match') @@ -455,73 +487,91 @@ class FileDownload: if self.started: return + log.msg('Starting to download %s' % self.path) self.started = True - assert self.pieces is not None, "You must initialize the piece hashes first" + assert self.pieces, "You must initialize the piece hashes first" self.peerlist = [self.peers[site]['peer'] for site in self.peers] + # Use the mirror if there are few peers + if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'): + parsed = urlparse(self.mirror) + if parsed[0] == "http": + site = splitHostPort(parsed[0], parsed[1]) + self.mirror_path = urlunparse(('', '') + parsed[2:]) + peer = self.manager.getPeer(site, mirror = True) + self.peerlist.append(peer) + # Special case if there's only one good peer left - if len(self.peerlist) == 1: - log.msg('Downloading from peer %r' % (self.peerlist[0], )) - self.defer.callback(self.peerlist[0].get(self.path)) - return +# if len(self.peerlist) == 1: +# log.msg('Downloading from peer %r' % (self.peerlist[0], )) +# self.defer.callback(self.peerlist[0].get(self.path)) +# return # Start sending the return file - self.stream = GrowingFileStream(self.file) + self.stream = GrowingFileStream(self.file, self.hash.expSize) resp = Response(200, {}, self.stream) self.defer.callback(resp) # Begin to download the pieces self.outstanding = 0 self.nextFinish = 0 - if self.pieces: - self.completePieces = [False for piece in self.pieces] - else: - self.completePieces = [False] + self.completePieces = [False for piece in self.pieces] self.getPieces() #{ Downloading the pieces def getPieces(self): """Download the next pieces from the peers.""" + log.msg('Checking for more piece requests to send') self.sort() piece = self.nextFinish while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces): + log.msg('Checking piece %d' % piece) if self.completePieces[piece] == False: # Send a request to the highest ranked peer peer = self.peerlist.pop() self.completePieces[piece] = peer + log.msg('Sending a request for piece %d to peer %r' % (piece, peer)) self.outstanding += 1 - if self.pieces: - df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1) + if peer.mirror: + df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1) else: - df = peer.get(self.path) + df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1) reactor.callLater(0, df.addCallbacks, *(self._getPiece, self._getError), **{'callbackArgs': (piece, peer), 'errbackArgs': (piece, peer)}) - piece += 1 + piece += 1 - # Check if we're don + log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces))) + # Check if we're done if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces): + log.msg('We seem to be done with all pieces') self.stream.allAvailable() def _getPiece(self, response, piece, peer): """Process the retrieved headers from the peer.""" + log.msg('Got response for piece %d from peer %r' % (piece, peer)) if ((len(self.completePieces) > 1 and response.code != 206) or (response.code not in (200, 206))): # Request failed, try a different peer - peer.hashError() + log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer)) + peer.hashError('Peer responded with the wrong type of download: %r' % response.code) self.completePieces[piece] = False if response.stream and response.stream.length: stream.readAndDiscard(response.stream) else: # Read the response stream to the file + log.msg('Streaming piece %d from peer %r' % (piece, peer)) if response.code == 206: - df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run() + df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, + PIECE_SIZE).run() else: df = StreamToFile(response.stream, self.file).run() - df.addCallbacks(self._gotPiece, self._gotError, - callbackArgs=(piece, peer), errbackArgs=(piece, peer)) + reactor.callLater(0, df.addCallbacks, + *(self._gotPiece, self._gotError), + **{'callbackArgs': (piece, peer), + 'errbackArgs': (piece, peer)}) self.outstanding -= 1 self.peerlist.append(peer) @@ -529,6 +579,7 @@ class FileDownload: def _getError(self, err, piece, peer): """Peer failed, try again.""" + log.msg('Got error for piece %d from peer %r' % (piece, peer)) self.outstanding -= 1 self.peerlist.append(peer) self.completePieces[piece] = False @@ -537,27 +588,26 @@ class FileDownload: def _gotPiece(self, response, piece, peer): """Process the retrieved piece from the peer.""" - if ((self.pieces and response != self.pieces[piece]) or - (len(self.pieces) == 0 and response == self.hash.expected())): + log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response)) + if self.pieces[piece] and response != self.pieces[piece]: # Hash doesn't match - peer.hashError() + log.msg('Hash error for piece %d from peer %r' % (piece, peer)) + peer.hashError('Piece received from peer does not match expected') self.completePieces[piece] = False - elif self.pieces: + else: # Successfully completed one of several pieces + log.msg('Finished with piece %d from peer %r' % (piece, peer)) self.completePieces[piece] = True while (self.nextFinish < len(self.completePieces) and self.completePieces[self.nextFinish] == True): self.nextFinish += 1 self.stream.updateAvailable(PIECE_SIZE) - else: - # Whole download (only one piece) is complete - self.completePieces[piece] = True - self.stream.updateAvailable(2**30) self.getPieces() def _gotError(self, err, piece, peer): """Piece download failed, try again.""" + log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response)) log.err(err) self.completePieces[piece] = False self.getPieces() @@ -569,17 +619,20 @@ class PeerManager: @ivar cache_dir: the directory to use for storing all files @type dht: L{interfaces.IDHT} @ivar dht: the DHT instance + @type stats: L{stats.StatsLogger} + @ivar stats: the statistics logger to record sent data to @type clients: C{dictionary} @ivar clients: the available peers that have been previously contacted """ - def __init__(self, cache_dir, dht): + def __init__(self, cache_dir, dht, stats): """Initialize the instance.""" self.cache_dir = cache_dir self.cache_dir.restat(False) if not self.cache_dir.exists(): self.cache_dir.makedirs() self.dht = dht + self.stats = stats self.clients = {} def get(self, hash, mirror, peers = [], method="GET", modtime=None): @@ -605,26 +658,30 @@ class PeerManager: assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0] site = splitHostPort(parsed[0], parsed[1]) path = urlunparse(('', '') + parsed[2:]) - peer = self.getPeer(site) + peer = self.getPeer(site, mirror = True) return peer.get(path, method, modtime) - elif len(peers) == 1: - site = uncompact(peers[0]['c']) - log.msg('Downloading from peer %r' % (site, )) - path = '/~/' + quote_plus(hash.expected()) - peer = self.getPeer(site) - return peer.get(path) +# elif len(peers) == 1: +# site = uncompact(peers[0]['c']) +# log.msg('Downloading from peer %r' % (site, )) +# path = '/~/' + quote_plus(hash.expected()) +# peer = self.getPeer(site) +# return peer.get(path) else: tmpfile = self.cache_dir.child(hash.hexexpected()) return FileDownload(self, hash, mirror, peers, tmpfile).run() - def getPeer(self, site): + def getPeer(self, site, mirror = False): """Create a new peer if necessary and return it. @type site: (C{string}, C{int}) @param site: the IP address and port of the peer + @param mirror: whether the peer is actually a mirror + (optional, defaults to False) """ if site not in self.clients: - self.clients[site] = Peer(site[0], site[1]) + self.clients[site] = Peer(site[0], site[1], self.stats) + if mirror: + self.clients[site].mirror = True return self.clients[site] def close(self): @@ -639,60 +696,6 @@ class TestPeerManager(unittest.TestCase): manager = None pending_calls = [] - def gotResp(self, resp, num, expect): - self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code) - if expect is not None: - self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect)) - def print_(n): - pass - def printdone(n): - pass - stream.readStream(resp.stream, print_).addCallback(printdone) - - def test_download(self): - """Tests a normal download.""" - self.manager = PeerManager() - self.timeout = 10 - - host = 'www.ietf.org' - d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt') - d.addCallback(self.gotResp, 1, 1070) - return d - - def test_head(self): - """Tests a 'HEAD' request.""" - self.manager = PeerManager() - self.timeout = 10 - - host = 'www.ietf.org' - d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt', method = "HEAD") - d.addCallback(self.gotResp, 1, 0) - return d - - def test_multiple_downloads(self): - """Tests multiple downloads with queueing and connection closing.""" - self.manager = PeerManager() - self.timeout = 120 - lastDefer = defer.Deferred() - - def newRequest(host, path, num, expect, last=False): - d = self.manager.get('', 'http://' + host + ':' + str(80) + path) - d.addCallback(self.gotResp, num, expect) - if last: - d.addBoth(lastDefer.callback) - - newRequest('www.ietf.org', "/rfc/rfc0006.txt", 1, 1776) - newRequest('www.ietf.org', "/rfc/rfc2362.txt", 2, 159833) - newRequest('www.google.ca', "/", 3, None) - self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None)) - self.pending_calls.append(reactor.callLater(10, newRequest, 'www.ietf.org', '/rfc/rfc0048.txt', 5, 41696)) - self.pending_calls.append(reactor.callLater(30, newRequest, 'www.ietf.org', '/rfc/rfc0022.txt', 6, 4606)) - self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None)) - self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0014.txt', 8, 27)) - self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0001.txt', 9, 21088)) - self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True)) - return lastDefer - def tearDown(self): for p in self.pending_calls: if p.active():