self.started = True
assert self.pieces, "You must initialize the piece hashes first"
- # Use the mirror if there are few peers
- if len(self.peers) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
- parsed = urlparse(self.mirror)
- if parsed[0] == "http":
- site = splitHostPort(parsed[0], parsed[1])
- self.mirror_path = urlunparse(('', '') + parsed[2:])
- peer = self.manager.getPeer(site, mirror = True)
- self.peers[site] = {}
- self.peers[site]['peer'] = peer
-
self.sitelist = self.peers.keys()
# Special case if there's only one good peer left
self.outstanding = 0
self.nextFinish = 0
self.completePieces = [False for piece in self.pieces]
+ self.addedMirror = False
+ self.addMirror()
self.getPieces()
+
+ def addMirror(self):
+ """Use the mirror if there are few peers."""
+ if not self.addedMirror and len(self.sitelist) + self.outstanding < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
+ self.addedMirror = True
+ parsed = urlparse(self.mirror)
+ if parsed[0] == "http":
+ site = splitHostPort(parsed[0], parsed[1])
+ self.mirror_path = urlunparse(('', '') + parsed[2:])
+ peer = self.manager.getPeer(site, mirror = True)
+ self.peers[site] = {}
+ self.peers[site]['peer'] = peer
+ self.sitelist.append(site)
#{ Downloading the pieces
def getPieces(self):
if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
log.msg('Download is complete for %s' % self.path)
self.stream.allAvailable(remove = True)
+
+ # Check if we ran out of peers
+ if self.outstanding <= 0 and not self.sitelist and False in self.completePieces:
+ log.msg("Download failed, no peers left to try.")
+ if self.defer:
+ # Send a return error
+ df = self.defer
+ self.defer = None
+ resp = Response(500, {}, None)
+ df.callback(resp)
+ else:
+ # Already streaming the response, try and abort
+ self.stream.allAvailable(remove = True)
def _getPiece(self, response, piece, site):
"""Process the retrieved headers from the peer."""
- if ((len(self.completePieces) > 1 and response.code != 206) or
+ if response.code == 404:
+ # Peer no longer has this file, move on
+ log.msg('Peer sharing piece %d no longer has it: %r' % (piece, self.peers[site]['peer']))
+ self.completePieces[piece] = False
+ if response.stream and response.stream.length:
+ stream.readAndDiscard(response.stream)
+
+ # Don't add the site back, just move on
+ site = None
+ elif ((len(self.completePieces) > 1 and response.code != 206) or
(response.code not in (200, 206))):
# Request failed, try a different peer
log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, self.peers[site]['peer']))
self.peers[site]['peer'].hashError('Peer responded with the wrong type of download: %r' % response.code)
self.completePieces[piece] = False
+ self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
if response.stream and response.stream.length:
stream.readAndDiscard(response.stream)
+
+ # After 3 errors in a row, drop the peer
+ if self.peers[site]['errors'] >= 3:
+ site = None
else:
if self.defer:
# Start sending the return file
'errbackArgs': (piece, site)})
self.outstanding -= 1
- self.sitelist.append(site)
+ if site:
+ self.sitelist.append(site)
+ else:
+ self.addMirror()
self.getPieces()
def _getError(self, err, piece, site):
"""Peer failed, try again."""
log.msg('Got error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
self.outstanding -= 1
- self.sitelist.append(site)
+ self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
+ if self.peers[site]['errors'] < 3:
+ self.sitelist.append(site)
+ else:
+ self.addMirror()
self.completePieces[piece] = False
self.getPieces()
log.err(err)
# Hash doesn't match
log.msg('Hash error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
self.peers[site]['peer'].hashError('Piece received from peer does not match expected')
+ self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
self.completePieces[piece] = False
else:
# Successfully completed one of several pieces
log.msg('Finished with piece %d from peer %r' % (piece, self.peers[site]['peer']))
self.completePieces[piece] = True
+ self.peers[site]['errors'] = 0
while (self.nextFinish < len(self.completePieces) and
self.completePieces[self.nextFinish] == True):
self.nextFinish += 1
"""Piece download failed, try again."""
log.msg('Error streaming piece %d from peer %r: %r' % (piece, self.peers[site]['peer'], err))
log.err(err)
+ self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
self.completePieces[piece] = False
self.getPieces()