+ def startDownload(self):
+ """Start the download from the peers."""
+ # Don't start twice
+ if self.started:
+ return
+
+ log.msg('Starting to download %s' % self.path)
+ self.started = True
+ assert self.pieces is not None, "You must initialize the piece hashes first"
+ self.peerlist = [self.peers[site]['peer'] for site in self.peers]
+
+ # Special case if there's only one good peer left
+ if len(self.peerlist) == 1:
+ log.msg('Downloading from peer %r' % (self.peerlist[0], ))
+ self.defer.callback(self.peerlist[0].get(self.path))
+ return
+
+ # Start sending the return file
+ self.stream = GrowingFileStream(self.file, self.hash.expSize)
+ resp = Response(200, {}, self.stream)
+ self.defer.callback(resp)
+
+ # Begin to download the pieces
+ self.outstanding = 0
+ self.nextFinish = 0
+ if self.pieces:
+ self.completePieces = [False for piece in self.pieces]
+ else:
+ self.completePieces = [False]
+ self.getPieces()
+
+ #{ Downloading the pieces
+ def getPieces(self):
+ """Download the next pieces from the peers."""
+ log.msg('Checking for more piece requests to send')
+ self.sort()
+ piece = self.nextFinish
+ while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
+ log.msg('Checking piece %d' % piece)
+ if self.completePieces[piece] == False:
+ # Send a request to the highest ranked peer
+ peer = self.peerlist.pop()
+ self.completePieces[piece] = peer
+ log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
+
+ self.outstanding += 1
+ if self.pieces:
+ df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+ else:
+ df = peer.get(self.path)
+ reactor.callLater(0, df.addCallbacks,
+ *(self._getPiece, self._getError),
+ **{'callbackArgs': (piece, peer),
+ 'errbackArgs': (piece, peer)})
+ piece += 1
+
+ log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
+ # Check if we're done
+ if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
+ log.msg('We seem to be done with all pieces')
+ self.stream.allAvailable()
+
+ def _getPiece(self, response, piece, peer):
+ """Process the retrieved headers from the peer."""
+ log.msg('Got response for piece %d from peer %r' % (piece, peer))
+ if ((len(self.completePieces) > 1 and response.code != 206) or
+ (response.code not in (200, 206))):
+ # Request failed, try a different peer
+ log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
+ peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
+ self.completePieces[piece] = False
+ if response.stream and response.stream.length:
+ stream.readAndDiscard(response.stream)
+ else:
+ # Read the response stream to the file
+ log.msg('Streaming piece %d from peer %r' % (piece, peer))
+ if response.code == 206:
+ df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
+ else:
+ df = StreamToFile(response.stream, self.file).run()
+ df.addCallbacks(self._gotPiece, self._gotError,
+ callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+
+ self.outstanding -= 1
+ self.peerlist.append(peer)
+ self.getPieces()
+
+ def _getError(self, err, piece, peer):
+ """Peer failed, try again."""
+ log.msg('Got error for piece %d from peer %r' % (piece, peer))
+ self.outstanding -= 1
+ self.peerlist.append(peer)
+ self.completePieces[piece] = False
+ self.getPieces()
+ log.err(err)
+
+ def _gotPiece(self, response, piece, peer):
+ """Process the retrieved piece from the peer."""
+ log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
+ if ((self.pieces and response != self.pieces[piece]) or
+ (len(self.pieces) == 0 and response != self.hash.expected())):
+ # Hash doesn't match
+ log.msg('Hash error for piece %d from peer %r' % (piece, peer))
+ peer.hashError('Piece received from peer does not match expected')
+ self.completePieces[piece] = False
+ elif self.pieces:
+ # Successfully completed one of several pieces
+ log.msg('Finished with piece %d from peer %r' % (piece, peer))
+ self.completePieces[piece] = True
+ while (self.nextFinish < len(self.completePieces) and
+ self.completePieces[self.nextFinish] == True):
+ self.nextFinish += 1
+ self.stream.updateAvailable(PIECE_SIZE)
+ else:
+ # Whole download (only one piece) is complete
+ log.msg('Piece %d from peer %r is the last piece' % (piece, peer))
+ self.completePieces[piece] = True
+ self.nextFinish = 1
+ self.stream.updateAvailable(2**30)
+
+ self.getPieces()
+
+ def _gotError(self, err, piece, peer):
+ """Piece download failed, try again."""
+ log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
+ log.err(err)
+ self.completePieces[piece] = False
+ self.getPieces()
+