Tolerate a limited number of errors from peers before dropping them.
authorCameron Dale <camrdale@gmail.com>
Fri, 9 May 2008 02:02:38 +0000 (19:02 -0700)
committerCameron Dale <camrdale@gmail.com>
Fri, 9 May 2008 02:02:38 +0000 (19:02 -0700)
apt_p2p/PeerManager.py

index 0cbee1f..7513dfd 100644 (file)
@@ -305,16 +305,6 @@ class FileDownload:
         self.started = True
         assert self.pieces, "You must initialize the piece hashes first"
         
-        # Use the mirror if there are few peers
-        if len(self.peers) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
-            parsed = urlparse(self.mirror)
-            if parsed[0] == "http":
-                site = splitHostPort(parsed[0], parsed[1])
-                self.mirror_path = urlunparse(('', '') + parsed[2:])
-                peer = self.manager.getPeer(site, mirror = True)
-                self.peers[site] = {}
-                self.peers[site]['peer'] = peer
-        
         self.sitelist = self.peers.keys()
         
         # Special case if there's only one good peer left
@@ -327,7 +317,22 @@ class FileDownload:
         self.outstanding = 0
         self.nextFinish = 0
         self.completePieces = [False for piece in self.pieces]
+        self.addedMirror = False
+        self.addMirror()
         self.getPieces()
+
+    def addMirror(self):
+        """Use the mirror if there are few peers."""
+        if not self.addedMirror and len(self.sitelist) + self.outstanding < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
+            self.addedMirror = True
+            parsed = urlparse(self.mirror)
+            if parsed[0] == "http":
+                site = splitHostPort(parsed[0], parsed[1])
+                self.mirror_path = urlunparse(('', '') + parsed[2:])
+                peer = self.manager.getPeer(site, mirror = True)
+                self.peers[site] = {}
+                self.peers[site]['peer'] = peer
+                self.sitelist.append(site)
         
     #{ Downloading the pieces
     def getPieces(self):
@@ -364,17 +369,44 @@ class FileDownload:
         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
             log.msg('Download is complete for %s' % self.path)
             self.stream.allAvailable(remove = True)
+            
+        # Check if we ran out of peers
+        if self.outstanding <= 0 and not self.sitelist and False in self.completePieces:
+            log.msg("Download failed, no peers left to try.")
+            if self.defer:
+                # Send a return error
+                df = self.defer
+                self.defer = None
+                resp = Response(500, {}, None)
+                df.callback(resp)
+            else:
+                # Already streaming the response, try and abort
+                self.stream.allAvailable(remove = True)
     
     def _getPiece(self, response, piece, site):
         """Process the retrieved headers from the peer."""
-        if ((len(self.completePieces) > 1 and response.code != 206) or
+        if response.code == 404:
+            # Peer no longer has this file, move on
+            log.msg('Peer sharing piece %d no longer has it: %r' % (piece, self.peers[site]['peer']))
+            self.completePieces[piece] = False
+            if response.stream and response.stream.length:
+                stream.readAndDiscard(response.stream)
+            
+            # Don't add the site back, just move on
+            site = None
+        elif ((len(self.completePieces) > 1 and response.code != 206) or
             (response.code not in (200, 206))):
             # Request failed, try a different peer
             log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, self.peers[site]['peer']))
             self.peers[site]['peer'].hashError('Peer responded with the wrong type of download: %r' % response.code)
             self.completePieces[piece] = False
+            self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
             if response.stream and response.stream.length:
                 stream.readAndDiscard(response.stream)
+
+            # After 3 errors in a row, drop the peer
+            if self.peers[site]['errors'] >= 3:
+                site = None
         else:
             if self.defer:
                 # Start sending the return file
@@ -403,14 +435,21 @@ class FileDownload:
                                  'errbackArgs': (piece, site)})
 
         self.outstanding -= 1
-        self.sitelist.append(site)
+        if site:
+            self.sitelist.append(site)
+        else:
+            self.addMirror()
         self.getPieces()
 
     def _getError(self, err, piece, site):
         """Peer failed, try again."""
         log.msg('Got error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
         self.outstanding -= 1
-        self.sitelist.append(site)
+        self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
+        if self.peers[site]['errors'] < 3:
+            self.sitelist.append(site)
+        else:
+            self.addMirror()
         self.completePieces[piece] = False
         self.getPieces()
         log.err(err)
@@ -421,11 +460,13 @@ class FileDownload:
             # Hash doesn't match
             log.msg('Hash error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
             self.peers[site]['peer'].hashError('Piece received from peer does not match expected')
+            self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
             self.completePieces[piece] = False
         else:
             # Successfully completed one of several pieces
             log.msg('Finished with piece %d from peer %r' % (piece, self.peers[site]['peer']))
             self.completePieces[piece] = True
+            self.peers[site]['errors'] = 0
             while (self.nextFinish < len(self.completePieces) and
                    self.completePieces[self.nextFinish] == True):
                 self.nextFinish += 1
@@ -437,6 +478,7 @@ class FileDownload:
         """Piece download failed, try again."""
         log.msg('Error streaming piece %d from peer %r: %r' % (piece, self.peers[site]['peer'], err))
         log.err(err)
+        self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
         self.completePieces[piece] = False
         self.getPieces()