From 151de5a26973474d2b1a8fc3f071615c09e9a62d Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Thu, 8 May 2008 17:28:07 -0700 Subject: [PATCH] PeerManager keeps a list of site names, peers are kept in a dictionary. --- apt_p2p/PeerManager.py | 70 ++++++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 34 deletions(-) diff --git a/apt_p2p/PeerManager.py b/apt_p2p/PeerManager.py index cfc0f78..0cbee1f 100644 --- a/apt_p2p/PeerManager.py +++ b/apt_p2p/PeerManager.py @@ -288,12 +288,12 @@ class FileDownload: """Sort the peers by their rank (highest ranked at the end).""" def sort(a, b): """Sort peers by their rank.""" - if a.rank > b.rank: + if self.peers[a]['peer'].rank > self.peers[b]['peer'].rank: return 1 - elif a.rank < b.rank: + elif self.peers[a]['peer'].rank < self.peers[b]['peer'].rank: return -1 return 0 - self.peerlist.sort(sort) + self.sitelist.sort(sort) def startDownload(self): """Start the download from the peers.""" @@ -304,21 +304,23 @@ class FileDownload: log.msg('Starting to download %s' % self.path) self.started = True assert self.pieces, "You must initialize the piece hashes first" - self.peerlist = [self.peers[site]['peer'] for site in self.peers] # Use the mirror if there are few peers - if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'): + if len(self.peers) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'): parsed = urlparse(self.mirror) if parsed[0] == "http": site = splitHostPort(parsed[0], parsed[1]) self.mirror_path = urlunparse(('', '') + parsed[2:]) peer = self.manager.getPeer(site, mirror = True) - self.peerlist.append(peer) + self.peers[site] = {} + self.peers[site]['peer'] = peer + + self.sitelist = self.peers.keys() # Special case if there's only one good peer left -# if len(self.peerlist) == 1: -# log.msg('Downloading from peer %r' % (self.peerlist[0], )) -# self.defer.callback(self.peerlist[0].get(self.path)) +# if len(self.sitelist) == 1: +# log.msg('Downloading from peer %r' % (self.peers[self.sitelist[0]]['peer'], )) +# self.defer.callback(self.peers[self.sitelist[0]]['peer'].get(self.path)) # return # Begin to download the pieces @@ -337,25 +339,25 @@ class FileDownload: self.sort() piece = self.nextFinish - while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces): + while self.outstanding < 4 and self.sitelist and piece < len(self.completePieces): if self.completePieces[piece] == False: # Send a request to the highest ranked peer - peer = self.peerlist.pop() - self.completePieces[piece] = peer - log.msg('Sending a request for piece %d to peer %r' % (piece, peer)) + site = self.sitelist.pop() + self.completePieces[piece] = site + log.msg('Sending a request for piece %d to peer %r' % (piece, self.peers[site]['peer'])) self.outstanding += 1 path = self.path - if peer.mirror: + if self.peers[site]['peer'].mirror: path = self.mirror_path if len(self.completePieces) > 1: - df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1) + df = self.peers[site]['peer'].getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1) else: - df = peer.get(path) + df = self.peers[site]['peer'].get(path) reactor.callLater(0, df.addCallbacks, *(self._getPiece, self._getError), - **{'callbackArgs': (piece, peer), - 'errbackArgs': (piece, peer)}) + **{'callbackArgs': (piece, site), + 'errbackArgs': (piece, site)}) piece += 1 # Check if we're done @@ -363,13 +365,13 @@ class FileDownload: log.msg('Download is complete for %s' % self.path) self.stream.allAvailable(remove = True) - def _getPiece(self, response, piece, peer): + def _getPiece(self, response, piece, site): """Process the retrieved headers from the peer.""" if ((len(self.completePieces) > 1 and response.code != 206) or (response.code not in (200, 206))): # Request failed, try a different peer - log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer)) - peer.hashError('Peer responded with the wrong type of download: %r' % response.code) + log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, self.peers[site]['peer'])) + self.peers[site]['peer'].hashError('Peer responded with the wrong type of download: %r' % response.code) self.completePieces[piece] = False if response.stream and response.stream.length: stream.readAndDiscard(response.stream) @@ -388,7 +390,7 @@ class FileDownload: df.callback(resp) # Read the response stream to the file - log.msg('Streaming piece %d from peer %r' % (piece, peer)) + log.msg('Streaming piece %d from peer %r' % (piece, self.peers[site]['peer'])) if response.code == 206: df = StreamToFile(self.hash.newPieceHasher(), response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run() @@ -397,32 +399,32 @@ class FileDownload: self.file).run() reactor.callLater(0, df.addCallbacks, *(self._gotPiece, self._gotError), - **{'callbackArgs': (piece, peer), - 'errbackArgs': (piece, peer)}) + **{'callbackArgs': (piece, site), + 'errbackArgs': (piece, site)}) self.outstanding -= 1 - self.peerlist.append(peer) + self.sitelist.append(site) self.getPieces() - def _getError(self, err, piece, peer): + def _getError(self, err, piece, site): """Peer failed, try again.""" - log.msg('Got error for piece %d from peer %r' % (piece, peer)) + log.msg('Got error for piece %d from peer %r' % (piece, self.peers[site]['peer'])) self.outstanding -= 1 - self.peerlist.append(peer) + self.sitelist.append(site) self.completePieces[piece] = False self.getPieces() log.err(err) - def _gotPiece(self, hash, piece, peer): + def _gotPiece(self, hash, piece, site): """Process the retrieved piece from the peer.""" if self.pieces[piece] and hash.digest() != self.pieces[piece]: # Hash doesn't match - log.msg('Hash error for piece %d from peer %r' % (piece, peer)) - peer.hashError('Piece received from peer does not match expected') + log.msg('Hash error for piece %d from peer %r' % (piece, self.peers[site]['peer'])) + self.peers[site]['peer'].hashError('Piece received from peer does not match expected') self.completePieces[piece] = False else: # Successfully completed one of several pieces - log.msg('Finished with piece %d from peer %r' % (piece, peer)) + log.msg('Finished with piece %d from peer %r' % (piece, self.peers[site]['peer'])) self.completePieces[piece] = True while (self.nextFinish < len(self.completePieces) and self.completePieces[self.nextFinish] == True): @@ -431,9 +433,9 @@ class FileDownload: self.getPieces() - def _gotError(self, err, piece, peer): + def _gotError(self, err, piece, site): """Piece download failed, try again.""" - log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, err)) + log.msg('Error streaming piece %d from peer %r: %r' % (piece, self.peers[site]['peer'], err)) log.err(err) self.completePieces[piece] = False self.getPieces() -- 2.39.5