2 """Manage a set of peers and the requests to them."""
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
16 from HTTPDownloader import Peer
17 from Streams import GrowingFileStream, StreamToFile
18 from util import uncompact
19 from Hash import PIECE_SIZE
20 from apt_p2p_Khashmir.bencode import bdecode
21 from apt_p2p_conf import config
23 class PeerError(Exception):
24 """An error occurred downloading from peers."""
27 """Manage a download from a list of peers or a mirror.
29 @type manager: L{PeerManager}
30 @ivar manager: the manager to send requests for peers to
31 @type hash: L{Hash.HashObject}
32 @ivar hash: the hash object containing the expected hash for the file
33 @ivar mirror: the URI of the file on the mirror
34 @type compact_peers: C{list} of C{dictionary}
35 @ivar compact_peers: a list of the peer info where the file can be found
37 @ivar file: the open file to right the download to
39 @ivar path: the path to request from peers to access the file
40 @type pieces: C{list} of C{string}
41 @ivar pieces: the hashes of the pieces in the file
42 @type started: C{boolean}
43 @ivar started: whether the download has begun yet
44 @type defer: L{twisted.internet.defer.Deferred}
45 @ivar defer: the deferred that will callback with the result of the download
46 @type peers: C{dictionary}
47 @ivar peers: information about each of the peers available to download from
48 @type outstanding: C{int}
49 @ivar outstanding: the number of requests to peers currently outstanding
50 @type peerlist: C{list} of L{HTTPDownloader.Peer}
51 @ivar peerlist: the sorted list of peers for this download
52 @type stream: L{GrowingFileStream}
53 @ivar stream: the stream of resulting data from the download
54 @type nextFinish: C{int}
55 @ivar nextFinish: the next piece that is needed to finish for the stream
56 @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
57 @ivar completePieces: one per piece, will be False if no requests are
58 outstanding for the piece, True if the piece has been successfully
59 downloaded, or the Peer that a request for this piece has been sent
62 def __init__(self, manager, hash, mirror, compact_peers, file):
63 """Initialize the instance and check for piece hashes.
65 @type manager: L{PeerManager}
66 @param manager: the manager to send requests for peers to
67 @type hash: L{Hash.HashObject}
68 @param hash: the hash object containing the expected hash for the file
69 @param mirror: the URI of the file on the mirror
70 @type compact_peers: C{list} of C{dictionary}
71 @param compact_peers: a list of the peer info where the file can be found
72 @type file: L{twisted.python.filepath.FilePath}
73 @param file: the temporary file to use to store the downloaded file
75 self.manager = manager
78 self.compact_peers = compact_peers
80 self.path = '/~/' + quote_plus(hash.expected())
82 self.mirror_path = None
89 self.file = file.open('w+')
92 """Start the downloading process."""
93 log.msg('Checking for pieces for %s' % self.path)
94 self.defer = defer.Deferred()
97 pieces_string = {0: 0}
99 pieces_dl_hash = {0: 0}
101 for compact_peer in self.compact_peers:
102 # Build a list of all the peers for this download
103 site = uncompact(compact_peer['c'])
104 peer = self.manager.getPeer(site)
105 self.peers.setdefault(site, {})['peer'] = peer
107 # Extract any piece information from the peers list
108 if 't' in compact_peer:
109 self.peers[site]['t'] = compact_peer['t']['t']
110 pieces_string.setdefault(compact_peer['t']['t'], 0)
111 pieces_string[compact_peer['t']['t']] += 1
112 elif 'h' in compact_peer:
113 self.peers[site]['h'] = compact_peer['h']
114 pieces_hash.setdefault(compact_peer['h'], 0)
115 pieces_hash[compact_peer['h']] += 1
116 elif 'l' in compact_peer:
117 self.peers[site]['l'] = compact_peer['l']
118 pieces_dl_hash.setdefault(compact_peer['l'], 0)
119 pieces_dl_hash[compact_peer['l']] += 1
123 # Select the most popular piece info
124 max_found = max(no_pieces, max(pieces_string.values()),
125 max(pieces_hash.values()), max(pieces_dl_hash.values()))
127 if max_found < len(self.peers):
128 log.msg('Misleading piece information found, using most popular %d of %d peers' %
129 (max_found, len(self.peers)))
131 if max_found == no_pieces:
132 # The file is not split into pieces
133 log.msg('No pieces were found for the file')
134 self.pieces = [self.hash.expected()]
136 elif max_found == max(pieces_string.values()):
137 # Small number of pieces in a string
138 for pieces, num in pieces_string.items():
139 # Find the most popular piece string
141 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
142 log.msg('Peer info contained %d piece hashes' % len(self.pieces))
145 elif max_found == max(pieces_hash.values()):
146 # Medium number of pieces stored in the DHT
147 for pieces, num in pieces_hash.items():
148 # Find the most popular piece hash to lookup
150 log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
151 self.getDHTPieces(pieces)
153 elif max_found == max(pieces_dl_hash.values()):
154 # Large number of pieces stored in peers
155 for pieces, num in pieces_dl_hash.items():
156 # Find the most popular piece hash to download
158 log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
159 self.getPeerPieces(pieces)
163 #{ Downloading the piece hashes
164 def getDHTPieces(self, key):
165 """Retrieve the piece information from the DHT.
167 @param key: the key to lookup in the DHT
169 # Remove any peers with the wrong piece hash
170 #for site in self.peers.keys():
171 # if self.peers[site].get('h', '') != key:
172 # del self.peers[site]
174 # Start the DHT lookup
175 lookupDefer = self.manager.dht.get(key)
176 lookupDefer.addBoth(self._getDHTPieces, key)
178 def _getDHTPieces(self, results, key):
179 """Check the retrieved values."""
180 if isinstance(results, list):
181 for result in results:
182 # Make sure the hash matches the key
183 result_hash = sha.new(result.get('t', '')).digest()
184 if result_hash == key:
186 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
187 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
191 log.msg('Could not retrieve the piece hashes from the DHT')
193 log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
195 # Continue without the piece hashes
196 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
199 def getPeerPieces(self, key, failedSite = None):
200 """Retrieve the piece information from the peers.
202 @param key: the key to request from the peers
204 if failedSite is None:
205 log.msg('Starting the lookup of piece hashes in peers')
207 # Remove any peers with the wrong piece hash
208 #for site in self.peers.keys():
209 # if self.peers[site].get('l', '') != key:
210 # del self.peers[site]
212 log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
213 self.peers[failedSite]['failed'] = True
214 self.outstanding -= 1
216 if self.pieces is None:
217 # Send a request to one or more peers
218 for site in self.peers:
219 if self.peers[site].get('failed', False) != True:
220 log.msg('Sending a piece hash request to %r' % (site, ))
221 path = '/~/' + quote_plus(key)
222 lookupDefer = self.peers[site]['peer'].get(path)
223 reactor.callLater(0, lookupDefer.addCallbacks,
224 *(self._getPeerPieces, self._gotPeerError),
225 **{'callbackArgs': (key, site),
226 'errbackArgs': (key, site)})
227 self.outstanding += 1
228 if self.outstanding >= 4:
231 if self.pieces is None and self.outstanding <= 0:
232 # Continue without the piece hashes
233 log.msg('Could not retrieve the piece hashes from the peers')
234 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
237 def _getPeerPieces(self, response, key, site):
238 """Process the retrieved response from the peer."""
239 log.msg('Got a piece hash response %d from %r' % (response.code, site))
240 if response.code != 200:
241 # Request failed, try a different peer
242 self.getPeerPieces(key, site)
244 # Read the response stream to a string
245 self.peers[site]['pieces'] = ''
246 def _gotPeerPiece(data, self = self, site = site):
247 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
248 self.peers[site]['pieces'] += data
249 log.msg('Streaming piece hashes from peer')
250 df = stream.readStream(response.stream, _gotPeerPiece)
251 df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
252 callbackArgs=(key, site), errbackArgs=(key, site))
254 def _gotPeerError(self, err, key, site):
255 """Peer failed, try again."""
256 log.msg('Peer piece hash request failed for %r' % (site, ))
258 self.getPeerPieces(key, site)
260 def _gotPeerPieces(self, result, key, site):
261 """Check the retrieved pieces from the peer."""
262 log.msg('Finished streaming piece hashes from peer %r' % (site, ))
263 if self.pieces is not None:
265 log.msg('Already done')
269 result = bdecode(self.peers[site]['pieces'])
271 log.msg('Error bdecoding piece hashes')
273 self.getPeerPieces(key, site)
276 result_hash = sha.new(result.get('t', '')).digest()
277 if result_hash == key:
279 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
280 log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
283 log.msg('Peer returned a piece string that did not match')
284 self.getPeerPieces(key, site)
286 #{ Downloading the file
288 """Sort the peers by their rank (highest ranked at the end)."""
290 """Sort peers by their rank."""
293 elif a.rank < b.rank:
296 self.peerlist.sort(sort)
298 def startDownload(self):
299 """Start the download from the peers."""
304 log.msg('Starting to download %s' % self.path)
306 assert self.pieces, "You must initialize the piece hashes first"
307 self.peerlist = [self.peers[site]['peer'] for site in self.peers]
309 # Use the mirror if there are few peers
310 if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
311 parsed = urlparse(self.mirror)
312 if parsed[0] == "http":
313 site = splitHostPort(parsed[0], parsed[1])
314 self.mirror_path = urlunparse(('', '') + parsed[2:])
315 peer = self.manager.getPeer(site, mirror = True)
316 self.peerlist.append(peer)
318 # Special case if there's only one good peer left
319 # if len(self.peerlist) == 1:
320 # log.msg('Downloading from peer %r' % (self.peerlist[0], ))
321 # self.defer.callback(self.peerlist[0].get(self.path))
324 # Begin to download the pieces
327 self.completePieces = [False for piece in self.pieces]
330 #{ Downloading the pieces
332 """Download the next pieces from the peers."""
334 log.msg('Download has been aborted for %s' % self.path)
335 self.stream.allAvailable(remove = True)
339 piece = self.nextFinish
340 while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
341 if self.completePieces[piece] == False:
342 # Send a request to the highest ranked peer
343 peer = self.peerlist.pop()
344 self.completePieces[piece] = peer
345 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
347 self.outstanding += 1
350 path = self.mirror_path
351 if len(self.completePieces) > 1:
352 df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
355 reactor.callLater(0, df.addCallbacks,
356 *(self._getPiece, self._getError),
357 **{'callbackArgs': (piece, peer),
358 'errbackArgs': (piece, peer)})
361 # Check if we're done
362 if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
363 log.msg('Download is complete for %s' % self.path)
364 self.stream.allAvailable(remove = True)
366 def _getPiece(self, response, piece, peer):
367 """Process the retrieved headers from the peer."""
368 if ((len(self.completePieces) > 1 and response.code != 206) or
369 (response.code not in (200, 206))):
370 # Request failed, try a different peer
371 log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
372 peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
373 self.completePieces[piece] = False
374 if response.stream and response.stream.length:
375 stream.readAndDiscard(response.stream)
378 # Start sending the return file
381 self.stream = GrowingFileStream(self.file, self.hash.expSize)
383 # Get the headers from the peer's response
385 if response.headers.hasHeader('last-modified'):
386 headers['last-modified'] = response.headers.getHeader('last-modified')
387 resp = Response(200, headers, self.stream)
390 # Read the response stream to the file
391 log.msg('Streaming piece %d from peer %r' % (piece, peer))
392 if response.code == 206:
393 df = StreamToFile(self.hash.newPieceHasher(), response.stream,
394 self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
396 df = StreamToFile(self.hash.newHasher(), response.stream,
398 reactor.callLater(0, df.addCallbacks,
399 *(self._gotPiece, self._gotError),
400 **{'callbackArgs': (piece, peer),
401 'errbackArgs': (piece, peer)})
403 self.outstanding -= 1
404 self.peerlist.append(peer)
407 def _getError(self, err, piece, peer):
408 """Peer failed, try again."""
409 log.msg('Got error for piece %d from peer %r' % (piece, peer))
410 self.outstanding -= 1
411 self.peerlist.append(peer)
412 self.completePieces[piece] = False
416 def _gotPiece(self, hash, piece, peer):
417 """Process the retrieved piece from the peer."""
418 if self.pieces[piece] and hash.digest() != self.pieces[piece]:
420 log.msg('Hash error for piece %d from peer %r' % (piece, peer))
421 peer.hashError('Piece received from peer does not match expected')
422 self.completePieces[piece] = False
424 # Successfully completed one of several pieces
425 log.msg('Finished with piece %d from peer %r' % (piece, peer))
426 self.completePieces[piece] = True
427 while (self.nextFinish < len(self.completePieces) and
428 self.completePieces[self.nextFinish] == True):
430 self.stream.updateAvailable(PIECE_SIZE)
434 def _gotError(self, err, piece, peer):
435 """Piece download failed, try again."""
436 log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, err))
438 self.completePieces[piece] = False
442 """Manage a set of peers and the requests to them.
444 @type cache_dir: L{twisted.python.filepath.FilePath}
445 @ivar cache_dir: the directory to use for storing all files
446 @type dht: L{DHTManager.DHT}
447 @ivar dht: the DHT instance
448 @type stats: L{stats.StatsLogger}
449 @ivar stats: the statistics logger to record sent data to
450 @type clients: C{dictionary}
451 @ivar clients: the available peers that have been previously contacted
454 def __init__(self, cache_dir, dht, stats):
455 """Initialize the instance."""
456 self.cache_dir = cache_dir
457 self.cache_dir.restat(False)
458 if not self.cache_dir.exists():
459 self.cache_dir.makedirs()
464 def get(self, hash, mirror, peers = [], method="GET", modtime=None):
465 """Download from a list of peers or fallback to a mirror.
467 @type hash: L{Hash.HashObject}
468 @param hash: the hash object containing the expected hash for the file
469 @param mirror: the URI of the file on the mirror
470 @type peers: C{list} of C{string}
471 @param peers: a list of the peer info where the file can be found
472 (optional, defaults to downloading from the mirror)
473 @type method: C{string}
474 @param method: the HTTP method to use, 'GET' or 'HEAD'
475 (optional, defaults to 'GET')
476 @type modtime: C{int}
477 @param modtime: the modification time to use for an 'If-Modified-Since'
478 header, as seconds since the epoch
479 (optional, defaults to not sending that header)
481 if not peers or method != "GET" or modtime is not None:
482 log.msg('Downloading (%s) from mirror %s' % (method, mirror))
483 parsed = urlparse(mirror)
484 assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
485 site = splitHostPort(parsed[0], parsed[1])
486 path = urlunparse(('', '') + parsed[2:])
487 peer = self.getPeer(site, mirror = True)
488 return peer.get(path, method, modtime)
489 # elif len(peers) == 1:
490 # site = uncompact(peers[0]['c'])
491 # log.msg('Downloading from peer %r' % (site, ))
492 # path = '/~/' + quote_plus(hash.expected())
493 # peer = self.getPeer(site)
494 # return peer.get(path)
496 tmpfile = self.cache_dir.child(hash.hexexpected())
497 return FileDownload(self, hash, mirror, peers, tmpfile).run()
499 def getPeer(self, site, mirror = False):
500 """Create a new peer if necessary and return it.
502 @type site: (C{string}, C{int})
503 @param site: the IP address and port of the peer
504 @param mirror: whether the peer is actually a mirror
505 (optional, defaults to False)
507 if site not in self.clients:
508 self.clients[site] = Peer(site[0], site[1], self.stats)
510 self.clients[site].mirror = True
511 return self.clients[site]
514 """Close all the connections to peers."""
515 for site in self.clients:
516 self.clients[site].close()
519 class TestPeerManager(unittest.TestCase):
520 """Unit tests for the PeerManager."""
526 for p in self.pending_calls:
529 self.pending_calls = []