From: Cameron Dale Date: Fri, 9 May 2008 02:02:38 +0000 (-0700) Subject: Tolerate a limited number of errors from peers before dropping them. X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=f453f48bb025970f13d1ec10c98b95b9e7f338bc;p=quix0rs-apt-p2p.git Tolerate a limited number of errors from peers before dropping them. --- diff --git a/apt_p2p/PeerManager.py b/apt_p2p/PeerManager.py index 0cbee1f..7513dfd 100644 --- a/apt_p2p/PeerManager.py +++ b/apt_p2p/PeerManager.py @@ -305,16 +305,6 @@ class FileDownload: self.started = True assert self.pieces, "You must initialize the piece hashes first" - # Use the mirror if there are few peers - if len(self.peers) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'): - parsed = urlparse(self.mirror) - if parsed[0] == "http": - site = splitHostPort(parsed[0], parsed[1]) - self.mirror_path = urlunparse(('', '') + parsed[2:]) - peer = self.manager.getPeer(site, mirror = True) - self.peers[site] = {} - self.peers[site]['peer'] = peer - self.sitelist = self.peers.keys() # Special case if there's only one good peer left @@ -327,7 +317,22 @@ class FileDownload: self.outstanding = 0 self.nextFinish = 0 self.completePieces = [False for piece in self.pieces] + self.addedMirror = False + self.addMirror() self.getPieces() + + def addMirror(self): + """Use the mirror if there are few peers.""" + if not self.addedMirror and len(self.sitelist) + self.outstanding < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'): + self.addedMirror = True + parsed = urlparse(self.mirror) + if parsed[0] == "http": + site = splitHostPort(parsed[0], parsed[1]) + self.mirror_path = urlunparse(('', '') + parsed[2:]) + peer = self.manager.getPeer(site, mirror = True) + self.peers[site] = {} + self.peers[site]['peer'] = peer + self.sitelist.append(site) #{ Downloading the pieces def getPieces(self): @@ -364,17 +369,44 @@ class FileDownload: if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces): log.msg('Download is complete for %s' % self.path) self.stream.allAvailable(remove = True) + + # Check if we ran out of peers + if self.outstanding <= 0 and not self.sitelist and False in self.completePieces: + log.msg("Download failed, no peers left to try.") + if self.defer: + # Send a return error + df = self.defer + self.defer = None + resp = Response(500, {}, None) + df.callback(resp) + else: + # Already streaming the response, try and abort + self.stream.allAvailable(remove = True) def _getPiece(self, response, piece, site): """Process the retrieved headers from the peer.""" - if ((len(self.completePieces) > 1 and response.code != 206) or + if response.code == 404: + # Peer no longer has this file, move on + log.msg('Peer sharing piece %d no longer has it: %r' % (piece, self.peers[site]['peer'])) + self.completePieces[piece] = False + if response.stream and response.stream.length: + stream.readAndDiscard(response.stream) + + # Don't add the site back, just move on + site = None + elif ((len(self.completePieces) > 1 and response.code != 206) or (response.code not in (200, 206))): # Request failed, try a different peer log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, self.peers[site]['peer'])) self.peers[site]['peer'].hashError('Peer responded with the wrong type of download: %r' % response.code) self.completePieces[piece] = False + self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1 if response.stream and response.stream.length: stream.readAndDiscard(response.stream) + + # After 3 errors in a row, drop the peer + if self.peers[site]['errors'] >= 3: + site = None else: if self.defer: # Start sending the return file @@ -403,14 +435,21 @@ class FileDownload: 'errbackArgs': (piece, site)}) self.outstanding -= 1 - self.sitelist.append(site) + if site: + self.sitelist.append(site) + else: + self.addMirror() self.getPieces() def _getError(self, err, piece, site): """Peer failed, try again.""" log.msg('Got error for piece %d from peer %r' % (piece, self.peers[site]['peer'])) self.outstanding -= 1 - self.sitelist.append(site) + self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1 + if self.peers[site]['errors'] < 3: + self.sitelist.append(site) + else: + self.addMirror() self.completePieces[piece] = False self.getPieces() log.err(err) @@ -421,11 +460,13 @@ class FileDownload: # Hash doesn't match log.msg('Hash error for piece %d from peer %r' % (piece, self.peers[site]['peer'])) self.peers[site]['peer'].hashError('Piece received from peer does not match expected') + self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1 self.completePieces[piece] = False else: # Successfully completed one of several pieces log.msg('Finished with piece %d from peer %r' % (piece, self.peers[site]['peer'])) self.completePieces[piece] = True + self.peers[site]['errors'] = 0 while (self.nextFinish < len(self.completePieces) and self.completePieces[self.nextFinish] == True): self.nextFinish += 1 @@ -437,6 +478,7 @@ class FileDownload: """Piece download failed, try again.""" log.msg('Error streaming piece %d from peer %r: %r' % (piece, self.peers[site]['peer'], err)) log.err(err) + self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1 self.completePieces[piece] = False self.getPieces()