2 """Manage a set of peers and the requests to them."""
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
16 from HTTPDownloader import Peer
17 from Streams import GrowingFileStream, StreamToFile
18 from util import uncompact
19 from Hash import PIECE_SIZE
20 from apt_p2p_Khashmir.bencode import bdecode
21 from apt_p2p_conf import config
23 class PeerError(Exception):
24 """An error occurred downloading from peers."""
27 """Manage a download from a list of peers or a mirror.
29 @type manager: L{PeerManager}
30 @ivar manager: the manager to send requests for peers to
31 @type hash: L{Hash.HashObject}
32 @ivar hash: the hash object containing the expected hash for the file
33 @ivar mirror: the URI of the file on the mirror
34 @type compact_peers: C{list} of C{dictionary}
35 @ivar compact_peers: a list of the peer info where the file can be found
37 @ivar file: the open file to right the download to
39 @ivar path: the path to request from peers to access the file
40 @type pieces: C{list} of C{string}
41 @ivar pieces: the hashes of the pieces in the file
42 @type started: C{boolean}
43 @ivar started: whether the download has begun yet
44 @type defer: L{twisted.internet.defer.Deferred}
45 @ivar defer: the deferred that will callback with the result of the download
46 @type peers: C{dictionary}
47 @ivar peers: information about each of the peers available to download from
48 @type outstanding: C{int}
49 @ivar outstanding: the number of requests to peers currently outstanding
50 @type peerlist: C{list} of L{HTTPDownloader.Peer}
51 @ivar peerlist: the sorted list of peers for this download
52 @type stream: L{GrowingFileStream}
53 @ivar stream: the stream of resulting data from the download
54 @type nextFinish: C{int}
55 @ivar nextFinish: the next piece that is needed to finish for the stream
56 @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
57 @ivar completePieces: one per piece, will be False if no requests are
58 outstanding for the piece, True if the piece has been successfully
59 downloaded, or the Peer that a request for this piece has been sent
62 def __init__(self, manager, hash, mirror, compact_peers, file):
63 """Initialize the instance and check for piece hashes.
65 @type manager: L{PeerManager}
66 @param manager: the manager to send requests for peers to
67 @type hash: L{Hash.HashObject}
68 @param hash: the hash object containing the expected hash for the file
69 @param mirror: the URI of the file on the mirror
70 @type compact_peers: C{list} of C{dictionary}
71 @param compact_peers: a list of the peer info where the file can be found
72 @type file: L{twisted.python.filepath.FilePath}
73 @param file: the temporary file to use to store the downloaded file
75 self.manager = manager
78 self.compact_peers = compact_peers
80 self.path = '/~/' + quote_plus(hash.expected())
82 self.mirror_path = None
89 self.file = file.open('w+')
92 """Start the downloading process."""
93 log.msg('Checking for pieces for %s' % self.path)
94 self.defer = defer.Deferred()
97 pieces_string = {0: 0}
99 pieces_dl_hash = {0: 0}
101 for compact_peer in self.compact_peers:
102 # Build a list of all the peers for this download
103 site = uncompact(compact_peer['c'])
104 peer = self.manager.getPeer(site)
105 self.peers.setdefault(site, {})['peer'] = peer
107 # Extract any piece information from the peers list
108 if 't' in compact_peer:
109 self.peers[site]['t'] = compact_peer['t']['t']
110 pieces_string.setdefault(compact_peer['t']['t'], 0)
111 pieces_string[compact_peer['t']['t']] += 1
112 elif 'h' in compact_peer:
113 self.peers[site]['h'] = compact_peer['h']
114 pieces_hash.setdefault(compact_peer['h'], 0)
115 pieces_hash[compact_peer['h']] += 1
116 elif 'l' in compact_peer:
117 self.peers[site]['l'] = compact_peer['l']
118 pieces_dl_hash.setdefault(compact_peer['l'], 0)
119 pieces_dl_hash[compact_peer['l']] += 1
123 # Select the most popular piece info
124 max_found = max(no_pieces, max(pieces_string.values()),
125 max(pieces_hash.values()), max(pieces_dl_hash.values()))
127 if max_found < len(self.peers):
128 log.msg('Misleading piece information found, using most popular %d of %d peers' %
129 (max_found, len(self.peers)))
131 if max_found == no_pieces:
132 # The file is not split into pieces
133 log.msg('No pieces were found for the file')
134 self.pieces = [self.hash.expected()]
136 elif max_found == max(pieces_string.values()):
137 # Small number of pieces in a string
138 for pieces, num in pieces_string.items():
139 # Find the most popular piece string
141 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
142 log.msg('Peer info contained %d piece hashes' % len(self.pieces))
145 elif max_found == max(pieces_hash.values()):
146 # Medium number of pieces stored in the DHT
147 for pieces, num in pieces_hash.items():
148 # Find the most popular piece hash to lookup
150 log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
151 self.getDHTPieces(pieces)
153 elif max_found == max(pieces_dl_hash.values()):
154 # Large number of pieces stored in peers
155 for pieces, num in pieces_dl_hash.items():
156 # Find the most popular piece hash to download
158 log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
159 self.getPeerPieces(pieces)
163 #{ Downloading the piece hashes
164 def getDHTPieces(self, key):
165 """Retrieve the piece information from the DHT.
167 @param key: the key to lookup in the DHT
169 # Remove any peers with the wrong piece hash
170 #for site in self.peers.keys():
171 # if self.peers[site].get('h', '') != key:
172 # del self.peers[site]
174 # Start the DHT lookup
175 lookupDefer = self.manager.dht.get(key)
176 lookupDefer.addBoth(self._getDHTPieces, key)
178 def _getDHTPieces(self, results, key):
179 """Check the retrieved values."""
180 if isinstance(results, list):
181 for result in results:
182 # Make sure the hash matches the key
183 result_hash = sha.new(result.get('t', '')).digest()
184 if result_hash == key:
186 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
187 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
191 log.msg('Could not retrieve the piece hashes from the DHT')
193 log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
195 # Continue without the piece hashes
196 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
199 def getPeerPieces(self, key, failedSite = None):
200 """Retrieve the piece information from the peers.
202 @param key: the key to request from the peers
204 if failedSite is None:
205 log.msg('Starting the lookup of piece hashes in peers')
207 # Remove any peers with the wrong piece hash
208 #for site in self.peers.keys():
209 # if self.peers[site].get('l', '') != key:
210 # del self.peers[site]
212 log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
213 self.peers[failedSite]['failed'] = True
214 self.outstanding -= 1
216 if self.pieces is None:
217 # Send a request to one or more peers
218 for site in self.peers:
219 if self.peers[site].get('failed', False) != True:
220 log.msg('Sending a piece hash request to %r' % (site, ))
221 path = '/~/' + quote_plus(key)
222 lookupDefer = self.peers[site]['peer'].get(path)
223 reactor.callLater(0, lookupDefer.addCallbacks,
224 *(self._getPeerPieces, self._gotPeerError),
225 **{'callbackArgs': (key, site),
226 'errbackArgs': (key, site)})
227 self.outstanding += 1
228 if self.outstanding >= 4:
231 if self.pieces is None and self.outstanding <= 0:
232 # Continue without the piece hashes
233 log.msg('Could not retrieve the piece hashes from the peers')
234 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
237 def _getPeerPieces(self, response, key, site):
238 """Process the retrieved response from the peer."""
239 log.msg('Got a piece hash response %d from %r' % (response.code, site))
240 if response.code != 200:
241 # Request failed, try a different peer
242 self.getPeerPieces(key, site)
244 # Read the response stream to a string
245 self.peers[site]['pieces'] = ''
246 def _gotPeerPiece(data, self = self, site = site):
247 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
248 self.peers[site]['pieces'] += data
249 log.msg('Streaming piece hashes from peer')
250 df = stream.readStream(response.stream, _gotPeerPiece)
251 df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
252 callbackArgs=(key, site), errbackArgs=(key, site))
254 def _gotPeerError(self, err, key, site):
255 """Peer failed, try again."""
256 log.msg('Peer piece hash request failed for %r' % (site, ))
258 self.getPeerPieces(key, site)
260 def _gotPeerPieces(self, result, key, site):
261 """Check the retrieved pieces from the peer."""
262 log.msg('Finished streaming piece hashes from peer %r' % (site, ))
263 if self.pieces is not None:
265 log.msg('Already done')
269 result = bdecode(self.peers[site]['pieces'])
271 log.msg('Error bdecoding piece hashes')
273 self.getPeerPieces(key, site)
276 result_hash = sha.new(result.get('t', '')).digest()
277 if result_hash == key:
279 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
280 log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
283 log.msg('Peer returned a piece string that did not match')
284 self.getPeerPieces(key, site)
286 #{ Downloading the file
288 """Sort the peers by their rank (highest ranked at the end)."""
290 """Sort peers by their rank."""
291 if self.peers[a]['peer'].rank > self.peers[b]['peer'].rank:
293 elif self.peers[a]['peer'].rank < self.peers[b]['peer'].rank:
296 self.sitelist.sort(sort)
298 def startDownload(self):
299 """Start the download from the peers."""
304 log.msg('Starting to download %s' % self.path)
306 assert self.pieces, "You must initialize the piece hashes first"
308 # Use the mirror if there are few peers
309 if len(self.peers) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
310 parsed = urlparse(self.mirror)
311 if parsed[0] == "http":
312 site = splitHostPort(parsed[0], parsed[1])
313 self.mirror_path = urlunparse(('', '') + parsed[2:])
314 peer = self.manager.getPeer(site, mirror = True)
315 self.peers[site] = {}
316 self.peers[site]['peer'] = peer
318 self.sitelist = self.peers.keys()
320 # Special case if there's only one good peer left
321 # if len(self.sitelist) == 1:
322 # log.msg('Downloading from peer %r' % (self.peers[self.sitelist[0]]['peer'], ))
323 # self.defer.callback(self.peers[self.sitelist[0]]['peer'].get(self.path))
326 # Begin to download the pieces
329 self.completePieces = [False for piece in self.pieces]
332 #{ Downloading the pieces
334 """Download the next pieces from the peers."""
336 log.msg('Download has been aborted for %s' % self.path)
337 self.stream.allAvailable(remove = True)
341 piece = self.nextFinish
342 while self.outstanding < 4 and self.sitelist and piece < len(self.completePieces):
343 if self.completePieces[piece] == False:
344 # Send a request to the highest ranked peer
345 site = self.sitelist.pop()
346 self.completePieces[piece] = site
347 log.msg('Sending a request for piece %d to peer %r' % (piece, self.peers[site]['peer']))
349 self.outstanding += 1
351 if self.peers[site]['peer'].mirror:
352 path = self.mirror_path
353 if len(self.completePieces) > 1:
354 df = self.peers[site]['peer'].getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
356 df = self.peers[site]['peer'].get(path)
357 reactor.callLater(0, df.addCallbacks,
358 *(self._getPiece, self._getError),
359 **{'callbackArgs': (piece, site),
360 'errbackArgs': (piece, site)})
363 # Check if we're done
364 if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
365 log.msg('Download is complete for %s' % self.path)
366 self.stream.allAvailable(remove = True)
368 def _getPiece(self, response, piece, site):
369 """Process the retrieved headers from the peer."""
370 if ((len(self.completePieces) > 1 and response.code != 206) or
371 (response.code not in (200, 206))):
372 # Request failed, try a different peer
373 log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, self.peers[site]['peer']))
374 self.peers[site]['peer'].hashError('Peer responded with the wrong type of download: %r' % response.code)
375 self.completePieces[piece] = False
376 if response.stream and response.stream.length:
377 stream.readAndDiscard(response.stream)
380 # Start sending the return file
383 self.stream = GrowingFileStream(self.file, self.hash.expSize)
385 # Get the headers from the peer's response
387 if response.headers.hasHeader('last-modified'):
388 headers['last-modified'] = response.headers.getHeader('last-modified')
389 resp = Response(200, headers, self.stream)
392 # Read the response stream to the file
393 log.msg('Streaming piece %d from peer %r' % (piece, self.peers[site]['peer']))
394 if response.code == 206:
395 df = StreamToFile(self.hash.newPieceHasher(), response.stream,
396 self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
398 df = StreamToFile(self.hash.newHasher(), response.stream,
400 reactor.callLater(0, df.addCallbacks,
401 *(self._gotPiece, self._gotError),
402 **{'callbackArgs': (piece, site),
403 'errbackArgs': (piece, site)})
405 self.outstanding -= 1
406 self.sitelist.append(site)
409 def _getError(self, err, piece, site):
410 """Peer failed, try again."""
411 log.msg('Got error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
412 self.outstanding -= 1
413 self.sitelist.append(site)
414 self.completePieces[piece] = False
418 def _gotPiece(self, hash, piece, site):
419 """Process the retrieved piece from the peer."""
420 if self.pieces[piece] and hash.digest() != self.pieces[piece]:
422 log.msg('Hash error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
423 self.peers[site]['peer'].hashError('Piece received from peer does not match expected')
424 self.completePieces[piece] = False
426 # Successfully completed one of several pieces
427 log.msg('Finished with piece %d from peer %r' % (piece, self.peers[site]['peer']))
428 self.completePieces[piece] = True
429 while (self.nextFinish < len(self.completePieces) and
430 self.completePieces[self.nextFinish] == True):
432 self.stream.updateAvailable(PIECE_SIZE)
436 def _gotError(self, err, piece, site):
437 """Piece download failed, try again."""
438 log.msg('Error streaming piece %d from peer %r: %r' % (piece, self.peers[site]['peer'], err))
440 self.completePieces[piece] = False
444 """Manage a set of peers and the requests to them.
446 @type cache_dir: L{twisted.python.filepath.FilePath}
447 @ivar cache_dir: the directory to use for storing all files
448 @type dht: L{DHTManager.DHT}
449 @ivar dht: the DHT instance
450 @type stats: L{stats.StatsLogger}
451 @ivar stats: the statistics logger to record sent data to
452 @type clients: C{dictionary}
453 @ivar clients: the available peers that have been previously contacted
456 def __init__(self, cache_dir, dht, stats):
457 """Initialize the instance."""
458 self.cache_dir = cache_dir
459 self.cache_dir.restat(False)
460 if not self.cache_dir.exists():
461 self.cache_dir.makedirs()
466 def get(self, hash, mirror, peers = [], method="GET", modtime=None):
467 """Download from a list of peers or fallback to a mirror.
469 @type hash: L{Hash.HashObject}
470 @param hash: the hash object containing the expected hash for the file
471 @param mirror: the URI of the file on the mirror
472 @type peers: C{list} of C{string}
473 @param peers: a list of the peer info where the file can be found
474 (optional, defaults to downloading from the mirror)
475 @type method: C{string}
476 @param method: the HTTP method to use, 'GET' or 'HEAD'
477 (optional, defaults to 'GET')
478 @type modtime: C{int}
479 @param modtime: the modification time to use for an 'If-Modified-Since'
480 header, as seconds since the epoch
481 (optional, defaults to not sending that header)
483 if not peers or method != "GET" or modtime is not None:
484 log.msg('Downloading (%s) from mirror %s' % (method, mirror))
485 parsed = urlparse(mirror)
486 assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
487 site = splitHostPort(parsed[0], parsed[1])
488 path = urlunparse(('', '') + parsed[2:])
489 peer = self.getPeer(site, mirror = True)
490 return peer.get(path, method, modtime)
491 # elif len(peers) == 1:
492 # site = uncompact(peers[0]['c'])
493 # log.msg('Downloading from peer %r' % (site, ))
494 # path = '/~/' + quote_plus(hash.expected())
495 # peer = self.getPeer(site)
496 # return peer.get(path)
498 tmpfile = self.cache_dir.child(hash.hexexpected())
499 return FileDownload(self, hash, mirror, peers, tmpfile).run()
501 def getPeer(self, site, mirror = False):
502 """Create a new peer if necessary and return it.
504 @type site: (C{string}, C{int})
505 @param site: the IP address and port of the peer
506 @param mirror: whether the peer is actually a mirror
507 (optional, defaults to False)
509 if site not in self.clients:
510 self.clients[site] = Peer(site[0], site[1], self.stats)
512 self.clients[site].mirror = True
513 return self.clients[site]
516 """Close all the connections to peers."""
517 for site in self.clients:
518 self.clients[site].close()
521 class TestPeerManager(unittest.TestCase):
522 """Unit tests for the PeerManager."""
528 for p in self.pending_calls:
531 self.pending_calls = []