2 """Manage a set of peers and the requests to them."""
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
16 from HTTPDownloader import Peer
17 from Streams import GrowingFileStream, StreamToFile
18 from util import uncompact
19 from Hash import PIECE_SIZE
20 from apt_p2p_Khashmir.bencode import bdecode
21 from apt_p2p_conf import config
23 class PeerError(Exception):
24 """An error occurred downloading from peers."""
27 """Manage a download from a list of peers or a mirror.
29 @type manager: L{PeerManager}
30 @ivar manager: the manager to send requests for peers to
31 @type hash: L{Hash.HashObject}
32 @ivar hash: the hash object containing the expected hash for the file
33 @ivar mirror: the URI of the file on the mirror
34 @type compact_peers: C{list} of C{dictionary}
35 @ivar compact_peers: a list of the peer info where the file can be found
37 @ivar file: the open file to right the download to
39 @ivar path: the path to request from peers to access the file
40 @type pieces: C{list} of C{string}
41 @ivar pieces: the hashes of the pieces in the file
42 @type started: C{boolean}
43 @ivar started: whether the download has begun yet
44 @type defer: L{twisted.internet.defer.Deferred}
45 @ivar defer: the deferred that will callback with the result of the download
46 @type peers: C{dictionary}
47 @ivar peers: information about each of the peers available to download from
48 @type outstanding: C{int}
49 @ivar outstanding: the number of requests to peers currently outstanding
50 @type peerlist: C{list} of L{HTTPDownloader.Peer}
51 @ivar peerlist: the sorted list of peers for this download
52 @type stream: L{GrowingFileStream}
53 @ivar stream: the stream of resulting data from the download
54 @type nextFinish: C{int}
55 @ivar nextFinish: the next piece that is needed to finish for the stream
56 @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
57 @ivar completePieces: one per piece, will be False if no requests are
58 outstanding for the piece, True if the piece has been successfully
59 downloaded, or the Peer that a request for this piece has been sent
62 def __init__(self, manager, hash, mirror, compact_peers, file):
63 """Initialize the instance and check for piece hashes.
65 @type manager: L{PeerManager}
66 @param manager: the manager to send requests for peers to
67 @type hash: L{Hash.HashObject}
68 @param hash: the hash object containing the expected hash for the file
69 @param mirror: the URI of the file on the mirror
70 @type compact_peers: C{list} of C{dictionary}
71 @param compact_peers: a list of the peer info where the file can be found
72 @type file: L{twisted.python.filepath.FilePath}
73 @param file: the temporary file to use to store the downloaded file
75 self.manager = manager
78 self.compact_peers = compact_peers
80 self.path = '/~/' + quote_plus(hash.expected())
82 self.mirror_path = None
89 self.file = file.open('w+')
92 """Start the downloading process."""
93 log.msg('Checking for pieces for %s' % self.path)
94 self.defer = defer.Deferred()
97 pieces_string = {0: 0}
99 pieces_dl_hash = {0: 0}
101 for compact_peer in self.compact_peers:
102 # Build a list of all the peers for this download
103 site = uncompact(compact_peer['c'])
104 peer = self.manager.getPeer(site)
105 self.peers.setdefault(site, {})['peer'] = peer
107 # Extract any piece information from the peers list
108 if 't' in compact_peer:
109 self.peers[site]['t'] = compact_peer['t']['t']
110 pieces_string.setdefault(compact_peer['t']['t'], 0)
111 pieces_string[compact_peer['t']['t']] += 1
112 elif 'h' in compact_peer:
113 self.peers[site]['h'] = compact_peer['h']
114 pieces_hash.setdefault(compact_peer['h'], 0)
115 pieces_hash[compact_peer['h']] += 1
116 elif 'l' in compact_peer:
117 self.peers[site]['l'] = compact_peer['l']
118 pieces_dl_hash.setdefault(compact_peer['l'], 0)
119 pieces_dl_hash[compact_peer['l']] += 1
123 # Select the most popular piece info
124 max_found = max(no_pieces, max(pieces_string.values()),
125 max(pieces_hash.values()), max(pieces_dl_hash.values()))
127 if max_found < len(self.peers):
128 log.msg('Misleading piece information found, using most popular %d of %d peers' %
129 (max_found, len(self.peers)))
131 if max_found == no_pieces:
132 # The file is not split into pieces
133 log.msg('No pieces were found for the file')
134 self.pieces = [self.hash.expected()]
136 elif max_found == max(pieces_string.values()):
137 # Small number of pieces in a string
138 for pieces, num in pieces_string.items():
139 # Find the most popular piece string
141 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
142 log.msg('Peer info contained %d piece hashes' % len(self.pieces))
145 elif max_found == max(pieces_hash.values()):
146 # Medium number of pieces stored in the DHT
147 for pieces, num in pieces_hash.items():
148 # Find the most popular piece hash to lookup
150 log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
151 self.getDHTPieces(pieces)
153 elif max_found == max(pieces_dl_hash.values()):
154 # Large number of pieces stored in peers
155 for pieces, num in pieces_dl_hash.items():
156 # Find the most popular piece hash to download
158 log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
159 self.getPeerPieces(pieces)
163 #{ Downloading the piece hashes
164 def getDHTPieces(self, key):
165 """Retrieve the piece information from the DHT.
167 @param key: the key to lookup in the DHT
169 # Remove any peers with the wrong piece hash
170 #for site in self.peers.keys():
171 # if self.peers[site].get('h', '') != key:
172 # del self.peers[site]
174 # Start the DHT lookup
175 lookupDefer = self.manager.dht.get(key)
176 lookupDefer.addBoth(self._getDHTPieces, key)
178 def _getDHTPieces(self, results, key):
179 """Check the retrieved values."""
180 if isinstance(results, list):
181 for result in results:
182 # Make sure the hash matches the key
183 result_hash = sha.new(result.get('t', '')).digest()
184 if result_hash == key:
186 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
187 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
191 log.msg('Could not retrieve the piece hashes from the DHT')
193 log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
195 # Continue without the piece hashes
196 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
199 def getPeerPieces(self, key, failedSite = None):
200 """Retrieve the piece information from the peers.
202 @param key: the key to request from the peers
204 if failedSite is None:
205 log.msg('Starting the lookup of piece hashes in peers')
207 # Remove any peers with the wrong piece hash
208 #for site in self.peers.keys():
209 # if self.peers[site].get('l', '') != key:
210 # del self.peers[site]
212 log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
213 self.peers[failedSite]['failed'] = True
214 self.outstanding -= 1
216 if self.pieces is None:
217 # Send a request to one or more peers
218 for site in self.peers:
219 if self.peers[site].get('failed', False) != True:
220 log.msg('Sending a piece hash request to %r' % (site, ))
221 path = '/~/' + quote_plus(key)
222 lookupDefer = self.peers[site]['peer'].get(path)
223 reactor.callLater(0, lookupDefer.addCallbacks,
224 *(self._getPeerPieces, self._gotPeerError),
225 **{'callbackArgs': (key, site),
226 'errbackArgs': (key, site)})
227 self.outstanding += 1
228 if self.outstanding >= 4:
231 if self.pieces is None and self.outstanding <= 0:
232 # Continue without the piece hashes
233 log.msg('Could not retrieve the piece hashes from the peers')
234 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
237 def _getPeerPieces(self, response, key, site):
238 """Process the retrieved response from the peer."""
239 log.msg('Got a piece hash response %d from %r' % (response.code, site))
240 if response.code != 200:
241 # Request failed, try a different peer
242 self.getPeerPieces(key, site)
244 # Read the response stream to a string
245 self.peers[site]['pieces'] = ''
246 def _gotPeerPiece(data, self = self, site = site):
247 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
248 self.peers[site]['pieces'] += data
249 log.msg('Streaming piece hashes from peer')
250 df = stream.readStream(response.stream, _gotPeerPiece)
251 df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
252 callbackArgs=(key, site), errbackArgs=(key, site))
254 def _gotPeerError(self, err, key, site):
255 """Peer failed, try again."""
256 log.msg('Peer piece hash request failed for %r' % (site, ))
258 self.getPeerPieces(key, site)
260 def _gotPeerPieces(self, result, key, site):
261 """Check the retrieved pieces from the peer."""
262 log.msg('Finished streaming piece hashes from peer %r' % (site, ))
263 if self.pieces is not None:
265 log.msg('Already done')
269 result = bdecode(self.peers[site]['pieces'])
271 log.msg('Error bdecoding piece hashes')
273 self.getPeerPieces(key, site)
276 result_hash = sha.new(result.get('t', '')).digest()
277 if result_hash == key:
279 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
280 log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
283 log.msg('Peer returned a piece string that did not match')
284 self.getPeerPieces(key, site)
286 #{ Downloading the file
288 """Sort the peers by their rank (highest ranked at the end)."""
290 """Sort peers by their rank."""
291 if self.peers[a]['peer'].rank > self.peers[b]['peer'].rank:
293 elif self.peers[a]['peer'].rank < self.peers[b]['peer'].rank:
296 self.sitelist.sort(sort)
298 def startDownload(self):
299 """Start the download from the peers."""
304 log.msg('Starting to download %s' % self.path)
306 assert self.pieces, "You must initialize the piece hashes first"
308 self.sitelist = self.peers.keys()
310 # Special case if there's only one good peer left
311 # if len(self.sitelist) == 1:
312 # log.msg('Downloading from peer %r' % (self.peers[self.sitelist[0]]['peer'], ))
313 # self.defer.callback(self.peers[self.sitelist[0]]['peer'].get(self.path))
316 # Begin to download the pieces
319 self.completePieces = [False for piece in self.pieces]
320 self.addedMirror = False
325 """Use the mirror if there are few peers."""
326 if not self.addedMirror and len(self.sitelist) + self.outstanding < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
327 self.addedMirror = True
328 parsed = urlparse(self.mirror)
329 if parsed[0] == "http":
330 site = splitHostPort(parsed[0], parsed[1])
331 self.mirror_path = urlunparse(('', '') + parsed[2:])
332 peer = self.manager.getPeer(site, mirror = True)
333 self.peers[site] = {}
334 self.peers[site]['peer'] = peer
335 self.sitelist.append(site)
337 #{ Downloading the pieces
339 """Download the next pieces from the peers."""
341 log.msg('Download has been aborted for %s' % self.path)
342 self.stream.allAvailable(remove = True)
346 piece = self.nextFinish
347 while self.outstanding < 4 and self.sitelist and piece < len(self.completePieces):
348 if self.completePieces[piece] == False:
349 # Send a request to the highest ranked peer
350 site = self.sitelist.pop()
351 self.completePieces[piece] = site
352 log.msg('Sending a request for piece %d to peer %r' % (piece, self.peers[site]['peer']))
354 self.outstanding += 1
356 if self.peers[site]['peer'].mirror:
357 path = self.mirror_path
358 if len(self.completePieces) > 1:
359 df = self.peers[site]['peer'].getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
361 df = self.peers[site]['peer'].get(path)
362 reactor.callLater(0, df.addCallbacks,
363 *(self._getPiece, self._getError),
364 **{'callbackArgs': (piece, site),
365 'errbackArgs': (piece, site)})
368 # Check if we're done
369 if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
370 log.msg('Download is complete for %s' % self.path)
371 self.stream.allAvailable(remove = True)
373 # Check if we ran out of peers
374 if self.outstanding <= 0 and not self.sitelist and False in self.completePieces:
375 log.msg("Download failed, no peers left to try.")
377 # Send a return error
380 resp = Response(500, {}, None)
383 # Already streaming the response, try and abort
384 self.stream.allAvailable(remove = True)
386 def _getPiece(self, response, piece, site):
387 """Process the retrieved headers from the peer."""
388 if response.code == 404:
389 # Peer no longer has this file, move on
390 log.msg('Peer sharing piece %d no longer has it: %r' % (piece, self.peers[site]['peer']))
391 self.completePieces[piece] = False
392 if response.stream and response.stream.length:
393 stream.readAndDiscard(response.stream)
395 # Don't add the site back, just move on
397 elif ((len(self.completePieces) > 1 and response.code != 206) or
398 (response.code not in (200, 206))):
399 # Request failed, try a different peer
400 log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, self.peers[site]['peer']))
401 self.peers[site]['peer'].hashError('Peer responded with the wrong type of download: %r' % response.code)
402 self.completePieces[piece] = False
403 self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
404 if response.stream and response.stream.length:
405 stream.readAndDiscard(response.stream)
407 # After 3 errors in a row, drop the peer
408 if self.peers[site]['errors'] >= 3:
412 # Start sending the return file
415 self.stream = GrowingFileStream(self.file, self.hash.expSize)
417 # Get the headers from the peer's response
419 if response.headers.hasHeader('last-modified'):
420 headers['last-modified'] = response.headers.getHeader('last-modified')
421 resp = Response(200, headers, self.stream)
424 # Read the response stream to the file
425 log.msg('Streaming piece %d from peer %r' % (piece, self.peers[site]['peer']))
426 if response.code == 206:
427 df = StreamToFile(self.hash.newPieceHasher(), response.stream,
428 self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
430 df = StreamToFile(self.hash.newHasher(), response.stream,
432 reactor.callLater(0, df.addCallbacks,
433 *(self._gotPiece, self._gotError),
434 **{'callbackArgs': (piece, site),
435 'errbackArgs': (piece, site)})
437 self.outstanding -= 1
439 self.sitelist.append(site)
444 def _getError(self, err, piece, site):
445 """Peer failed, try again."""
446 log.msg('Got error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
447 self.outstanding -= 1
448 self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
449 if self.peers[site]['errors'] < 3:
450 self.sitelist.append(site)
453 self.completePieces[piece] = False
457 def _gotPiece(self, hash, piece, site):
458 """Process the retrieved piece from the peer."""
459 if self.pieces[piece] and hash.digest() != self.pieces[piece]:
461 log.msg('Hash error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
462 self.peers[site]['peer'].hashError('Piece received from peer does not match expected')
463 self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
464 self.completePieces[piece] = False
466 # Successfully completed one of several pieces
467 log.msg('Finished with piece %d from peer %r' % (piece, self.peers[site]['peer']))
468 self.completePieces[piece] = True
469 self.peers[site]['errors'] = 0
470 while (self.nextFinish < len(self.completePieces) and
471 self.completePieces[self.nextFinish] == True):
473 self.stream.updateAvailable(PIECE_SIZE)
477 def _gotError(self, err, piece, site):
478 """Piece download failed, try again."""
479 log.msg('Error streaming piece %d from peer %r: %r' % (piece, self.peers[site]['peer'], err))
481 self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
482 self.completePieces[piece] = False
486 """Manage a set of peers and the requests to them.
488 @type cache_dir: L{twisted.python.filepath.FilePath}
489 @ivar cache_dir: the directory to use for storing all files
490 @type dht: L{DHTManager.DHT}
491 @ivar dht: the DHT instance
492 @type stats: L{stats.StatsLogger}
493 @ivar stats: the statistics logger to record sent data to
494 @type clients: C{dictionary}
495 @ivar clients: the available peers that have been previously contacted
498 def __init__(self, cache_dir, dht, stats):
499 """Initialize the instance."""
500 self.cache_dir = cache_dir
501 self.cache_dir.restat(False)
502 if not self.cache_dir.exists():
503 self.cache_dir.makedirs()
508 def get(self, hash, mirror, peers = [], method="GET", modtime=None):
509 """Download from a list of peers or fallback to a mirror.
511 @type hash: L{Hash.HashObject}
512 @param hash: the hash object containing the expected hash for the file
513 @param mirror: the URI of the file on the mirror
514 @type peers: C{list} of C{string}
515 @param peers: a list of the peer info where the file can be found
516 (optional, defaults to downloading from the mirror)
517 @type method: C{string}
518 @param method: the HTTP method to use, 'GET' or 'HEAD'
519 (optional, defaults to 'GET')
520 @type modtime: C{int}
521 @param modtime: the modification time to use for an 'If-Modified-Since'
522 header, as seconds since the epoch
523 (optional, defaults to not sending that header)
525 if not peers or method != "GET" or modtime is not None:
526 log.msg('Downloading (%s) from mirror %s' % (method, mirror))
527 parsed = urlparse(mirror)
528 assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
529 site = splitHostPort(parsed[0], parsed[1])
530 path = urlunparse(('', '') + parsed[2:])
531 peer = self.getPeer(site, mirror = True)
532 return peer.get(path, method, modtime)
533 # elif len(peers) == 1:
534 # site = uncompact(peers[0]['c'])
535 # log.msg('Downloading from peer %r' % (site, ))
536 # path = '/~/' + quote_plus(hash.expected())
537 # peer = self.getPeer(site)
538 # return peer.get(path)
540 tmpfile = self.cache_dir.child(hash.hexexpected())
541 return FileDownload(self, hash, mirror, peers, tmpfile).run()
543 def getPeer(self, site, mirror = False):
544 """Create a new peer if necessary and return it.
546 @type site: (C{string}, C{int})
547 @param site: the IP address and port of the peer
548 @param mirror: whether the peer is actually a mirror
549 (optional, defaults to False)
551 if site not in self.clients:
552 self.clients[site] = Peer(site[0], site[1], self.stats)
554 self.clients[site].mirror = True
555 return self.clients[site]
558 """Close all the connections to peers."""
559 for site in self.clients:
560 self.clients[site].close()
563 class TestPeerManager(unittest.TestCase):
564 """Unit tests for the PeerManager."""
570 for p in self.pending_calls:
573 self.pending_calls = []