"""Sort the peers by their rank (highest ranked at the end)."""
def sort(a, b):
"""Sort peers by their rank."""
- if a.rank > b.rank:
+ if self.peers[a]['peer'].rank > self.peers[b]['peer'].rank:
return 1
- elif a.rank < b.rank:
+ elif self.peers[a]['peer'].rank < self.peers[b]['peer'].rank:
return -1
return 0
- self.peerlist.sort(sort)
+ self.sitelist.sort(sort)
def startDownload(self):
"""Start the download from the peers."""
log.msg('Starting to download %s' % self.path)
self.started = True
assert self.pieces, "You must initialize the piece hashes first"
- self.peerlist = [self.peers[site]['peer'] for site in self.peers]
# Use the mirror if there are few peers
- if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
+ if len(self.peers) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
parsed = urlparse(self.mirror)
if parsed[0] == "http":
site = splitHostPort(parsed[0], parsed[1])
self.mirror_path = urlunparse(('', '') + parsed[2:])
peer = self.manager.getPeer(site, mirror = True)
- self.peerlist.append(peer)
+ self.peers[site] = {}
+ self.peers[site]['peer'] = peer
+
+ self.sitelist = self.peers.keys()
# Special case if there's only one good peer left
-# if len(self.peerlist) == 1:
-# log.msg('Downloading from peer %r' % (self.peerlist[0], ))
-# self.defer.callback(self.peerlist[0].get(self.path))
+# if len(self.sitelist) == 1:
+# log.msg('Downloading from peer %r' % (self.peers[self.sitelist[0]]['peer'], ))
+# self.defer.callback(self.peers[self.sitelist[0]]['peer'].get(self.path))
# return
# Begin to download the pieces
self.sort()
piece = self.nextFinish
- while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
+ while self.outstanding < 4 and self.sitelist and piece < len(self.completePieces):
if self.completePieces[piece] == False:
# Send a request to the highest ranked peer
- peer = self.peerlist.pop()
- self.completePieces[piece] = peer
- log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
+ site = self.sitelist.pop()
+ self.completePieces[piece] = site
+ log.msg('Sending a request for piece %d to peer %r' % (piece, self.peers[site]['peer']))
self.outstanding += 1
path = self.path
- if peer.mirror:
+ if self.peers[site]['peer'].mirror:
path = self.mirror_path
if len(self.completePieces) > 1:
- df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+ df = self.peers[site]['peer'].getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
else:
- df = peer.get(path)
+ df = self.peers[site]['peer'].get(path)
reactor.callLater(0, df.addCallbacks,
*(self._getPiece, self._getError),
- **{'callbackArgs': (piece, peer),
- 'errbackArgs': (piece, peer)})
+ **{'callbackArgs': (piece, site),
+ 'errbackArgs': (piece, site)})
piece += 1
# Check if we're done
log.msg('Download is complete for %s' % self.path)
self.stream.allAvailable(remove = True)
- def _getPiece(self, response, piece, peer):
+ def _getPiece(self, response, piece, site):
"""Process the retrieved headers from the peer."""
if ((len(self.completePieces) > 1 and response.code != 206) or
(response.code not in (200, 206))):
# Request failed, try a different peer
- log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
- peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
+ log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, self.peers[site]['peer']))
+ self.peers[site]['peer'].hashError('Peer responded with the wrong type of download: %r' % response.code)
self.completePieces[piece] = False
if response.stream and response.stream.length:
stream.readAndDiscard(response.stream)
df.callback(resp)
# Read the response stream to the file
- log.msg('Streaming piece %d from peer %r' % (piece, peer))
+ log.msg('Streaming piece %d from peer %r' % (piece, self.peers[site]['peer']))
if response.code == 206:
df = StreamToFile(self.hash.newPieceHasher(), response.stream,
self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
self.file).run()
reactor.callLater(0, df.addCallbacks,
*(self._gotPiece, self._gotError),
- **{'callbackArgs': (piece, peer),
- 'errbackArgs': (piece, peer)})
+ **{'callbackArgs': (piece, site),
+ 'errbackArgs': (piece, site)})
self.outstanding -= 1
- self.peerlist.append(peer)
+ self.sitelist.append(site)
self.getPieces()
- def _getError(self, err, piece, peer):
+ def _getError(self, err, piece, site):
"""Peer failed, try again."""
- log.msg('Got error for piece %d from peer %r' % (piece, peer))
+ log.msg('Got error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
self.outstanding -= 1
- self.peerlist.append(peer)
+ self.sitelist.append(site)
self.completePieces[piece] = False
self.getPieces()
log.err(err)
- def _gotPiece(self, hash, piece, peer):
+ def _gotPiece(self, hash, piece, site):
"""Process the retrieved piece from the peer."""
if self.pieces[piece] and hash.digest() != self.pieces[piece]:
# Hash doesn't match
- log.msg('Hash error for piece %d from peer %r' % (piece, peer))
- peer.hashError('Piece received from peer does not match expected')
+ log.msg('Hash error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
+ self.peers[site]['peer'].hashError('Piece received from peer does not match expected')
self.completePieces[piece] = False
else:
# Successfully completed one of several pieces
- log.msg('Finished with piece %d from peer %r' % (piece, peer))
+ log.msg('Finished with piece %d from peer %r' % (piece, self.peers[site]['peer']))
self.completePieces[piece] = True
while (self.nextFinish < len(self.completePieces) and
self.completePieces[self.nextFinish] == True):
self.getPieces()
- def _gotError(self, err, piece, peer):
+ def _gotError(self, err, piece, site):
"""Piece download failed, try again."""
- log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, err))
+ log.msg('Error streaming piece %d from peer %r: %r' % (piece, self.peers[site]['peer'], err))
log.err(err)
self.completePieces[piece] = False
self.getPieces()