Final version of INFOCOM paper.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
8 import sha
9
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
15
16 from HTTPDownloader import Peer
17 from Streams import GrowingFileStream, StreamToFile
18 from util import uncompact
19 from Hash import PIECE_SIZE
20 from apt_p2p_Khashmir.bencode import bdecode
21 from apt_p2p_conf import config
22
23 class PeerError(Exception):
24     """An error occurred downloading from peers."""
25     
26 class FileDownload:
27     """Manage a download from a list of peers or a mirror.
28     
29     @type manager: L{PeerManager}
30     @ivar manager: the manager to send requests for peers to
31     @type hash: L{Hash.HashObject}
32     @ivar hash: the hash object containing the expected hash for the file
33     @ivar mirror: the URI of the file on the mirror
34     @type compact_peers: C{list} of C{dictionary}
35     @ivar compact_peers: a list of the peer info where the file can be found
36     @type file: C{file}
37     @ivar file: the open file to right the download to
38     @type path: C{string}
39     @ivar path: the path to request from peers to access the file
40     @type pieces: C{list} of C{string} 
41     @ivar pieces: the hashes of the pieces in the file
42     @type started: C{boolean}
43     @ivar started: whether the download has begun yet
44     @type defer: L{twisted.internet.defer.Deferred}
45     @ivar defer: the deferred that will callback with the result of the download
46     @type peers: C{dictionary}
47     @ivar peers: information about each of the peers available to download from
48     @type outstanding: C{int}
49     @ivar outstanding: the number of requests to peers currently outstanding
50     @type peerlist: C{list} of L{HTTPDownloader.Peer}
51     @ivar peerlist: the sorted list of peers for this download
52     @type stream: L{GrowingFileStream}
53     @ivar stream: the stream of resulting data from the download
54     @type nextFinish: C{int}
55     @ivar nextFinish: the next piece that is needed to finish for the stream
56     @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
57     @ivar completePieces: one per piece, will be False if no requests are
58         outstanding for the piece, True if the piece has been successfully
59         downloaded, or the Peer that a request for this piece has been sent  
60     """
61     
62     def __init__(self, manager, hash, mirror, compact_peers, file):
63         """Initialize the instance and check for piece hashes.
64         
65         @type manager: L{PeerManager}
66         @param manager: the manager to send requests for peers to
67         @type hash: L{Hash.HashObject}
68         @param hash: the hash object containing the expected hash for the file
69         @param mirror: the URI of the file on the mirror
70         @type compact_peers: C{list} of C{dictionary}
71         @param compact_peers: a list of the peer info where the file can be found
72         @type file: L{twisted.python.filepath.FilePath}
73         @param file: the temporary file to use to store the downloaded file
74         """
75         self.manager = manager
76         self.hash = hash
77         self.mirror = mirror
78         self.compact_peers = compact_peers
79         
80         self.path = '/~/' + quote_plus(hash.expected())
81         self.defer = None
82         self.mirror_path = None
83         self.pieces = None
84         self.started = False
85         
86         file.restat(False)
87         if file.exists():
88             file.remove()
89         self.file = file.open('w+')
90
91     def run(self):
92         """Start the downloading process."""
93         log.msg('Checking for pieces for %s' % self.path)
94         self.defer = defer.Deferred()
95         self.peers = {}
96         no_pieces = 0
97         pieces_string = {0: 0}
98         pieces_hash = {0: 0}
99         pieces_dl_hash = {0: 0}
100
101         for compact_peer in self.compact_peers:
102             # Build a list of all the peers for this download
103             site = uncompact(compact_peer['c'])
104             peer = self.manager.getPeer(site)
105             self.peers.setdefault(site, {})['peer'] = peer
106
107             # Extract any piece information from the peers list
108             if 't' in compact_peer:
109                 self.peers[site]['t'] = compact_peer['t']['t']
110                 pieces_string.setdefault(compact_peer['t']['t'], 0)
111                 pieces_string[compact_peer['t']['t']] += 1
112             elif 'h' in compact_peer:
113                 self.peers[site]['h'] = compact_peer['h']
114                 pieces_hash.setdefault(compact_peer['h'], 0)
115                 pieces_hash[compact_peer['h']] += 1
116             elif 'l' in compact_peer:
117                 self.peers[site]['l'] = compact_peer['l']
118                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
119                 pieces_dl_hash[compact_peer['l']] += 1
120             else:
121                 no_pieces += 1
122         
123         # Select the most popular piece info
124         max_found = max(no_pieces, max(pieces_string.values()),
125                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
126
127         if max_found < len(self.peers):
128             log.msg('Misleading piece information found, using most popular %d of %d peers' % 
129                     (max_found, len(self.peers)))
130
131         if max_found == no_pieces:
132             # The file is not split into pieces
133             log.msg('No pieces were found for the file')
134             self.pieces = [self.hash.expected()]
135             self.startDownload()
136         elif max_found == max(pieces_string.values()):
137             # Small number of pieces in a string
138             for pieces, num in pieces_string.items():
139                 # Find the most popular piece string
140                 if num == max_found:
141                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
142                     log.msg('Peer info contained %d piece hashes' % len(self.pieces))
143                     self.startDownload()
144                     break
145         elif max_found == max(pieces_hash.values()):
146             # Medium number of pieces stored in the DHT
147             for pieces, num in pieces_hash.items():
148                 # Find the most popular piece hash to lookup
149                 if num == max_found:
150                     log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
151                     self.getDHTPieces(pieces)
152                     break
153         elif max_found == max(pieces_dl_hash.values()):
154             # Large number of pieces stored in peers
155             for pieces, num in pieces_dl_hash.items():
156                 # Find the most popular piece hash to download
157                 if num == max_found:
158                     log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
159                     self.getPeerPieces(pieces)
160                     break
161         return self.defer
162
163     #{ Downloading the piece hashes
164     def getDHTPieces(self, key):
165         """Retrieve the piece information from the DHT.
166         
167         @param key: the key to lookup in the DHT
168         """
169         # Remove any peers with the wrong piece hash
170         #for site in self.peers.keys():
171         #    if self.peers[site].get('h', '') != key:
172         #        del self.peers[site]
173
174         # Start the DHT lookup
175         lookupDefer = self.manager.dht.get(key)
176         lookupDefer.addBoth(self._getDHTPieces, key)
177         
178     def _getDHTPieces(self, results, key):
179         """Check the retrieved values."""
180         if isinstance(results, list):
181             for result in results:
182                 # Make sure the hash matches the key
183                 result_hash = sha.new(result.get('t', '')).digest()
184                 if result_hash == key:
185                     pieces = result['t']
186                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
187                     log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
188                     self.startDownload()
189                     return
190                 
191             log.msg('Could not retrieve the piece hashes from the DHT')
192         else:
193             log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
194             
195         # Continue without the piece hashes
196         self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
197         self.startDownload()
198
199     def getPeerPieces(self, key, failedSite = None):
200         """Retrieve the piece information from the peers.
201         
202         @param key: the key to request from the peers
203         """
204         if failedSite is None:
205             log.msg('Starting the lookup of piece hashes in peers')
206             self.outstanding = 0
207             # Remove any peers with the wrong piece hash
208             #for site in self.peers.keys():
209             #    if self.peers[site].get('l', '') != key:
210             #        del self.peers[site]
211         else:
212             log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
213             self.peers[failedSite]['failed'] = True
214             self.outstanding -= 1
215
216         if self.pieces is None:
217             # Send a request to one or more peers
218             for site in self.peers:
219                 if self.peers[site].get('failed', False) != True:
220                     log.msg('Sending a piece hash request to %r' % (site, ))
221                     path = '/~/' + quote_plus(key)
222                     lookupDefer = self.peers[site]['peer'].get(path)
223                     reactor.callLater(0, lookupDefer.addCallbacks,
224                                       *(self._getPeerPieces, self._gotPeerError),
225                                       **{'callbackArgs': (key, site),
226                                          'errbackArgs': (key, site)})
227                     self.outstanding += 1
228                     if self.outstanding >= 4:
229                         break
230         
231         if self.pieces is None and self.outstanding <= 0:
232             # Continue without the piece hashes
233             log.msg('Could not retrieve the piece hashes from the peers')
234             self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
235             self.startDownload()
236         
237     def _getPeerPieces(self, response, key, site):
238         """Process the retrieved response from the peer."""
239         log.msg('Got a piece hash response %d from %r' % (response.code, site))
240         if response.code != 200:
241             # Request failed, try a different peer
242             self.getPeerPieces(key, site)
243         else:
244             # Read the response stream to a string
245             self.peers[site]['pieces'] = ''
246             def _gotPeerPiece(data, self = self, site = site):
247                 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
248                 self.peers[site]['pieces'] += data
249             log.msg('Streaming piece hashes from peer')
250             df = stream.readStream(response.stream, _gotPeerPiece)
251             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
252                             callbackArgs=(key, site), errbackArgs=(key, site))
253
254     def _gotPeerError(self, err, key, site):
255         """Peer failed, try again."""
256         log.msg('Peer piece hash request failed for %r' % (site, ))
257         log.err(err)
258         self.getPeerPieces(key, site)
259
260     def _gotPeerPieces(self, result, key, site):
261         """Check the retrieved pieces from the peer."""
262         log.msg('Finished streaming piece hashes from peer %r' % (site, ))
263         if self.pieces is not None:
264             # Already done
265             log.msg('Already done')
266             return
267         
268         try:
269             result = bdecode(self.peers[site]['pieces'])
270         except:
271             log.msg('Error bdecoding piece hashes')
272             log.err()
273             self.getPeerPieces(key, site)
274             return
275             
276         result_hash = sha.new(result.get('t', '')).digest()
277         if result_hash == key:
278             pieces = result['t']
279             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
280             log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
281             self.startDownload()
282         else:
283             log.msg('Peer returned a piece string that did not match')
284             self.getPeerPieces(key, site)
285
286     #{ Downloading the file
287     def sort(self):
288         """Sort the peers by their rank (highest ranked at the end)."""
289         def sort(a, b):
290             """Sort peers by their rank."""
291             if self.peers[a]['peer'].rank > self.peers[b]['peer'].rank:
292                 return 1
293             elif self.peers[a]['peer'].rank < self.peers[b]['peer'].rank:
294                 return -1
295             return 0
296         self.sitelist.sort(sort)
297
298     def startDownload(self):
299         """Start the download from the peers."""
300         # Don't start twice
301         if self.started:
302             return
303         
304         log.msg('Starting to download %s' % self.path)
305         self.started = True
306         assert self.pieces, "You must initialize the piece hashes first"
307         
308         self.sitelist = self.peers.keys()
309         
310         # Special case if there's only one good peer left
311 #        if len(self.sitelist) == 1:
312 #            log.msg('Downloading from peer %r' % (self.peers[self.sitelist[0]]['peer'], ))
313 #            self.defer.callback(self.peers[self.sitelist[0]]['peer'].get(self.path))
314 #            return
315         
316         # Begin to download the pieces
317         self.outstanding = 0
318         self.nextFinish = 0
319         self.completePieces = [False for piece in self.pieces]
320         self.addedMirror = False
321         self.addMirror()
322         self.getPieces()
323
324     def addMirror(self):
325         """Use the mirror if there are few peers."""
326         if not self.addedMirror and len(self.sitelist) + self.outstanding < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
327             self.addedMirror = True
328             parsed = urlparse(self.mirror)
329             if parsed[0] == "http":
330                 site = splitHostPort(parsed[0], parsed[1])
331                 self.mirror_path = urlunparse(('', '') + parsed[2:])
332                 peer = self.manager.getPeer(site, mirror = True)
333                 self.peers[site] = {}
334                 self.peers[site]['peer'] = peer
335                 self.sitelist.append(site)
336         
337     #{ Downloading the pieces
338     def getPieces(self):
339         """Download the next pieces from the peers."""
340         if self.file.closed:
341             log.msg('Download has been aborted for %s' % self.path)
342             self.stream.allAvailable(remove = True)
343             return
344             
345         self.sort()
346         piece = self.nextFinish
347         while self.outstanding < 4 and self.sitelist and piece < len(self.completePieces):
348             if self.completePieces[piece] == False:
349                 # Send a request to the highest ranked peer
350                 site = self.sitelist.pop()
351                 self.completePieces[piece] = site
352                 log.msg('Sending a request for piece %d to peer %r' % (piece, self.peers[site]['peer']))
353                 
354                 self.outstanding += 1
355                 path = self.path
356                 if self.peers[site]['peer'].mirror:
357                     path = self.mirror_path
358                 if len(self.completePieces) > 1:
359                     df = self.peers[site]['peer'].getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
360                 else:
361                     df = self.peers[site]['peer'].get(path)
362                 reactor.callLater(0, df.addCallbacks,
363                                   *(self._getPiece, self._getError),
364                                   **{'callbackArgs': (piece, site),
365                                      'errbackArgs': (piece, site)})
366             piece += 1
367                 
368         # Check if we're done
369         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
370             log.msg('Download is complete for %s' % self.path)
371             self.stream.allAvailable(remove = True)
372             
373         # Check if we ran out of peers
374         if self.outstanding <= 0 and not self.sitelist and False in self.completePieces:
375             log.msg("Download failed, no peers left to try.")
376             if self.defer:
377                 # Send a return error
378                 df = self.defer
379                 self.defer = None
380                 resp = Response(500, {}, None)
381                 df.callback(resp)
382             else:
383                 # Already streaming the response, try and abort
384                 self.stream.allAvailable(remove = True)
385     
386     def _getPiece(self, response, piece, site):
387         """Process the retrieved headers from the peer."""
388         if response.code == 404:
389             # Peer no longer has this file, move on
390             log.msg('Peer sharing piece %d no longer has it: %r' % (piece, self.peers[site]['peer']))
391             self.completePieces[piece] = False
392             if response.stream and response.stream.length:
393                 stream.readAndDiscard(response.stream)
394             
395             # Don't add the site back, just move on
396             site = None
397         elif ((len(self.completePieces) > 1 and response.code != 206) or
398             (response.code not in (200, 206))):
399             # Request failed, try a different peer
400             log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, self.peers[site]['peer']))
401             self.peers[site]['peer'].hashError('Peer responded with the wrong type of download: %r' % response.code)
402             self.completePieces[piece] = False
403             self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
404             if response.stream and response.stream.length:
405                 stream.readAndDiscard(response.stream)
406
407             # After 3 errors in a row, drop the peer
408             if self.peers[site]['errors'] >= 3:
409                 site = None
410         else:
411             if self.defer:
412                 # Start sending the return file
413                 df = self.defer
414                 self.defer = None
415                 self.stream = GrowingFileStream(self.file, self.hash.expSize)
416
417                 # Get the headers from the peer's response
418                 headers = {}
419                 if response.headers.hasHeader('last-modified'):
420                     headers['last-modified'] = response.headers.getHeader('last-modified')
421                 resp = Response(200, headers, self.stream)
422                 df.callback(resp)
423
424             # Read the response stream to the file
425             log.msg('Streaming piece %d from peer %r' % (piece, self.peers[site]['peer']))
426             if response.code == 206:
427                 df = StreamToFile(self.hash.newPieceHasher(), response.stream,
428                                   self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
429             else:
430                 df = StreamToFile(self.hash.newHasher(), response.stream,
431                                   self.file).run()
432             reactor.callLater(0, df.addCallbacks,
433                               *(self._gotPiece, self._gotError),
434                               **{'callbackArgs': (piece, site),
435                                  'errbackArgs': (piece, site)})
436
437         self.outstanding -= 1
438         if site:
439             self.sitelist.append(site)
440         else:
441             self.addMirror()
442         self.getPieces()
443
444     def _getError(self, err, piece, site):
445         """Peer failed, try again."""
446         log.msg('Got error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
447         self.outstanding -= 1
448         self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
449         if self.peers[site]['errors'] < 3:
450             self.sitelist.append(site)
451         else:
452             self.addMirror()
453         self.completePieces[piece] = False
454         self.getPieces()
455         log.err(err)
456
457     def _gotPiece(self, hash, piece, site):
458         """Process the retrieved piece from the peer."""
459         if self.pieces[piece] and hash.digest() != self.pieces[piece]:
460             # Hash doesn't match
461             log.msg('Hash error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
462             self.peers[site]['peer'].hashError('Piece received from peer does not match expected')
463             self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
464             self.completePieces[piece] = False
465         else:
466             # Successfully completed one of several pieces
467             log.msg('Finished with piece %d from peer %r' % (piece, self.peers[site]['peer']))
468             self.completePieces[piece] = True
469             self.peers[site]['errors'] = 0
470             while (self.nextFinish < len(self.completePieces) and
471                    self.completePieces[self.nextFinish] == True):
472                 self.nextFinish += 1
473                 self.stream.updateAvailable(PIECE_SIZE)
474
475         self.getPieces()
476
477     def _gotError(self, err, piece, site):
478         """Piece download failed, try again."""
479         log.msg('Error streaming piece %d from peer %r: %r' % (piece, self.peers[site]['peer'], err))
480         log.err(err)
481         self.peers[site]['errors'] = self.peers[site].get('errors', 0) + 1
482         self.completePieces[piece] = False
483         self.getPieces()
484         
485 class PeerManager:
486     """Manage a set of peers and the requests to them.
487     
488     @type cache_dir: L{twisted.python.filepath.FilePath}
489     @ivar cache_dir: the directory to use for storing all files
490     @type dht: L{DHTManager.DHT}
491     @ivar dht: the DHT instance
492     @type stats: L{stats.StatsLogger}
493     @ivar stats: the statistics logger to record sent data to
494     @type clients: C{dictionary}
495     @ivar clients: the available peers that have been previously contacted
496     """
497
498     def __init__(self, cache_dir, dht, stats):
499         """Initialize the instance."""
500         self.cache_dir = cache_dir
501         self.cache_dir.restat(False)
502         if not self.cache_dir.exists():
503             self.cache_dir.makedirs()
504         self.dht = dht
505         self.stats = stats
506         self.clients = {}
507         
508     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
509         """Download from a list of peers or fallback to a mirror.
510         
511         @type hash: L{Hash.HashObject}
512         @param hash: the hash object containing the expected hash for the file
513         @param mirror: the URI of the file on the mirror
514         @type peers: C{list} of C{string}
515         @param peers: a list of the peer info where the file can be found
516             (optional, defaults to downloading from the mirror)
517         @type method: C{string}
518         @param method: the HTTP method to use, 'GET' or 'HEAD'
519             (optional, defaults to 'GET')
520         @type modtime: C{int}
521         @param modtime: the modification time to use for an 'If-Modified-Since'
522             header, as seconds since the epoch
523             (optional, defaults to not sending that header)
524         """
525         if not peers or method != "GET" or modtime is not None:
526             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
527             parsed = urlparse(mirror)
528             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
529             site = splitHostPort(parsed[0], parsed[1])
530             path = urlunparse(('', '') + parsed[2:])
531             peer = self.getPeer(site, mirror = True)
532             return peer.get(path, method, modtime)
533 #        elif len(peers) == 1:
534 #            site = uncompact(peers[0]['c'])
535 #            log.msg('Downloading from peer %r' % (site, ))
536 #            path = '/~/' + quote_plus(hash.expected())
537 #            peer = self.getPeer(site)
538 #            return peer.get(path)
539         else:
540             tmpfile = self.cache_dir.child(hash.hexexpected())
541             return FileDownload(self, hash, mirror, peers, tmpfile).run()
542         
543     def getPeer(self, site, mirror = False):
544         """Create a new peer if necessary and return it.
545         
546         @type site: (C{string}, C{int})
547         @param site: the IP address and port of the peer
548         @param mirror: whether the peer is actually a mirror
549             (optional, defaults to False)
550         """
551         if site not in self.clients:
552             self.clients[site] = Peer(site[0], site[1], self.stats)
553             if mirror:
554                 self.clients[site].mirror = True
555         return self.clients[site]
556     
557     def close(self):
558         """Close all the connections to peers."""
559         for site in self.clients:
560             self.clients[site].close()
561         self.clients = {}
562
563 class TestPeerManager(unittest.TestCase):
564     """Unit tests for the PeerManager."""
565     
566     manager = None
567     pending_calls = []
568     
569     def tearDown(self):
570         for p in self.pending_calls:
571             if p.active():
572                 p.cancel()
573         self.pending_calls = []
574         if self.manager:
575             self.manager.close()
576             self.manager = None