PeerManager keeps a list of site names, peers are kept in a dictionary.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
8 import sha
9
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
15
16 from HTTPDownloader import Peer
17 from Streams import GrowingFileStream, StreamToFile
18 from util import uncompact
19 from Hash import PIECE_SIZE
20 from apt_p2p_Khashmir.bencode import bdecode
21 from apt_p2p_conf import config
22
23 class PeerError(Exception):
24     """An error occurred downloading from peers."""
25     
26 class FileDownload:
27     """Manage a download from a list of peers or a mirror.
28     
29     @type manager: L{PeerManager}
30     @ivar manager: the manager to send requests for peers to
31     @type hash: L{Hash.HashObject}
32     @ivar hash: the hash object containing the expected hash for the file
33     @ivar mirror: the URI of the file on the mirror
34     @type compact_peers: C{list} of C{dictionary}
35     @ivar compact_peers: a list of the peer info where the file can be found
36     @type file: C{file}
37     @ivar file: the open file to right the download to
38     @type path: C{string}
39     @ivar path: the path to request from peers to access the file
40     @type pieces: C{list} of C{string} 
41     @ivar pieces: the hashes of the pieces in the file
42     @type started: C{boolean}
43     @ivar started: whether the download has begun yet
44     @type defer: L{twisted.internet.defer.Deferred}
45     @ivar defer: the deferred that will callback with the result of the download
46     @type peers: C{dictionary}
47     @ivar peers: information about each of the peers available to download from
48     @type outstanding: C{int}
49     @ivar outstanding: the number of requests to peers currently outstanding
50     @type peerlist: C{list} of L{HTTPDownloader.Peer}
51     @ivar peerlist: the sorted list of peers for this download
52     @type stream: L{GrowingFileStream}
53     @ivar stream: the stream of resulting data from the download
54     @type nextFinish: C{int}
55     @ivar nextFinish: the next piece that is needed to finish for the stream
56     @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
57     @ivar completePieces: one per piece, will be False if no requests are
58         outstanding for the piece, True if the piece has been successfully
59         downloaded, or the Peer that a request for this piece has been sent  
60     """
61     
62     def __init__(self, manager, hash, mirror, compact_peers, file):
63         """Initialize the instance and check for piece hashes.
64         
65         @type manager: L{PeerManager}
66         @param manager: the manager to send requests for peers to
67         @type hash: L{Hash.HashObject}
68         @param hash: the hash object containing the expected hash for the file
69         @param mirror: the URI of the file on the mirror
70         @type compact_peers: C{list} of C{dictionary}
71         @param compact_peers: a list of the peer info where the file can be found
72         @type file: L{twisted.python.filepath.FilePath}
73         @param file: the temporary file to use to store the downloaded file
74         """
75         self.manager = manager
76         self.hash = hash
77         self.mirror = mirror
78         self.compact_peers = compact_peers
79         
80         self.path = '/~/' + quote_plus(hash.expected())
81         self.defer = None
82         self.mirror_path = None
83         self.pieces = None
84         self.started = False
85         
86         file.restat(False)
87         if file.exists():
88             file.remove()
89         self.file = file.open('w+')
90
91     def run(self):
92         """Start the downloading process."""
93         log.msg('Checking for pieces for %s' % self.path)
94         self.defer = defer.Deferred()
95         self.peers = {}
96         no_pieces = 0
97         pieces_string = {0: 0}
98         pieces_hash = {0: 0}
99         pieces_dl_hash = {0: 0}
100
101         for compact_peer in self.compact_peers:
102             # Build a list of all the peers for this download
103             site = uncompact(compact_peer['c'])
104             peer = self.manager.getPeer(site)
105             self.peers.setdefault(site, {})['peer'] = peer
106
107             # Extract any piece information from the peers list
108             if 't' in compact_peer:
109                 self.peers[site]['t'] = compact_peer['t']['t']
110                 pieces_string.setdefault(compact_peer['t']['t'], 0)
111                 pieces_string[compact_peer['t']['t']] += 1
112             elif 'h' in compact_peer:
113                 self.peers[site]['h'] = compact_peer['h']
114                 pieces_hash.setdefault(compact_peer['h'], 0)
115                 pieces_hash[compact_peer['h']] += 1
116             elif 'l' in compact_peer:
117                 self.peers[site]['l'] = compact_peer['l']
118                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
119                 pieces_dl_hash[compact_peer['l']] += 1
120             else:
121                 no_pieces += 1
122         
123         # Select the most popular piece info
124         max_found = max(no_pieces, max(pieces_string.values()),
125                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
126
127         if max_found < len(self.peers):
128             log.msg('Misleading piece information found, using most popular %d of %d peers' % 
129                     (max_found, len(self.peers)))
130
131         if max_found == no_pieces:
132             # The file is not split into pieces
133             log.msg('No pieces were found for the file')
134             self.pieces = [self.hash.expected()]
135             self.startDownload()
136         elif max_found == max(pieces_string.values()):
137             # Small number of pieces in a string
138             for pieces, num in pieces_string.items():
139                 # Find the most popular piece string
140                 if num == max_found:
141                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
142                     log.msg('Peer info contained %d piece hashes' % len(self.pieces))
143                     self.startDownload()
144                     break
145         elif max_found == max(pieces_hash.values()):
146             # Medium number of pieces stored in the DHT
147             for pieces, num in pieces_hash.items():
148                 # Find the most popular piece hash to lookup
149                 if num == max_found:
150                     log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
151                     self.getDHTPieces(pieces)
152                     break
153         elif max_found == max(pieces_dl_hash.values()):
154             # Large number of pieces stored in peers
155             for pieces, num in pieces_dl_hash.items():
156                 # Find the most popular piece hash to download
157                 if num == max_found:
158                     log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
159                     self.getPeerPieces(pieces)
160                     break
161         return self.defer
162
163     #{ Downloading the piece hashes
164     def getDHTPieces(self, key):
165         """Retrieve the piece information from the DHT.
166         
167         @param key: the key to lookup in the DHT
168         """
169         # Remove any peers with the wrong piece hash
170         #for site in self.peers.keys():
171         #    if self.peers[site].get('h', '') != key:
172         #        del self.peers[site]
173
174         # Start the DHT lookup
175         lookupDefer = self.manager.dht.get(key)
176         lookupDefer.addBoth(self._getDHTPieces, key)
177         
178     def _getDHTPieces(self, results, key):
179         """Check the retrieved values."""
180         if isinstance(results, list):
181             for result in results:
182                 # Make sure the hash matches the key
183                 result_hash = sha.new(result.get('t', '')).digest()
184                 if result_hash == key:
185                     pieces = result['t']
186                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
187                     log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
188                     self.startDownload()
189                     return
190                 
191             log.msg('Could not retrieve the piece hashes from the DHT')
192         else:
193             log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
194             
195         # Continue without the piece hashes
196         self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
197         self.startDownload()
198
199     def getPeerPieces(self, key, failedSite = None):
200         """Retrieve the piece information from the peers.
201         
202         @param key: the key to request from the peers
203         """
204         if failedSite is None:
205             log.msg('Starting the lookup of piece hashes in peers')
206             self.outstanding = 0
207             # Remove any peers with the wrong piece hash
208             #for site in self.peers.keys():
209             #    if self.peers[site].get('l', '') != key:
210             #        del self.peers[site]
211         else:
212             log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
213             self.peers[failedSite]['failed'] = True
214             self.outstanding -= 1
215
216         if self.pieces is None:
217             # Send a request to one or more peers
218             for site in self.peers:
219                 if self.peers[site].get('failed', False) != True:
220                     log.msg('Sending a piece hash request to %r' % (site, ))
221                     path = '/~/' + quote_plus(key)
222                     lookupDefer = self.peers[site]['peer'].get(path)
223                     reactor.callLater(0, lookupDefer.addCallbacks,
224                                       *(self._getPeerPieces, self._gotPeerError),
225                                       **{'callbackArgs': (key, site),
226                                          'errbackArgs': (key, site)})
227                     self.outstanding += 1
228                     if self.outstanding >= 4:
229                         break
230         
231         if self.pieces is None and self.outstanding <= 0:
232             # Continue without the piece hashes
233             log.msg('Could not retrieve the piece hashes from the peers')
234             self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
235             self.startDownload()
236         
237     def _getPeerPieces(self, response, key, site):
238         """Process the retrieved response from the peer."""
239         log.msg('Got a piece hash response %d from %r' % (response.code, site))
240         if response.code != 200:
241             # Request failed, try a different peer
242             self.getPeerPieces(key, site)
243         else:
244             # Read the response stream to a string
245             self.peers[site]['pieces'] = ''
246             def _gotPeerPiece(data, self = self, site = site):
247                 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
248                 self.peers[site]['pieces'] += data
249             log.msg('Streaming piece hashes from peer')
250             df = stream.readStream(response.stream, _gotPeerPiece)
251             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
252                             callbackArgs=(key, site), errbackArgs=(key, site))
253
254     def _gotPeerError(self, err, key, site):
255         """Peer failed, try again."""
256         log.msg('Peer piece hash request failed for %r' % (site, ))
257         log.err(err)
258         self.getPeerPieces(key, site)
259
260     def _gotPeerPieces(self, result, key, site):
261         """Check the retrieved pieces from the peer."""
262         log.msg('Finished streaming piece hashes from peer %r' % (site, ))
263         if self.pieces is not None:
264             # Already done
265             log.msg('Already done')
266             return
267         
268         try:
269             result = bdecode(self.peers[site]['pieces'])
270         except:
271             log.msg('Error bdecoding piece hashes')
272             log.err()
273             self.getPeerPieces(key, site)
274             return
275             
276         result_hash = sha.new(result.get('t', '')).digest()
277         if result_hash == key:
278             pieces = result['t']
279             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
280             log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
281             self.startDownload()
282         else:
283             log.msg('Peer returned a piece string that did not match')
284             self.getPeerPieces(key, site)
285
286     #{ Downloading the file
287     def sort(self):
288         """Sort the peers by their rank (highest ranked at the end)."""
289         def sort(a, b):
290             """Sort peers by their rank."""
291             if self.peers[a]['peer'].rank > self.peers[b]['peer'].rank:
292                 return 1
293             elif self.peers[a]['peer'].rank < self.peers[b]['peer'].rank:
294                 return -1
295             return 0
296         self.sitelist.sort(sort)
297
298     def startDownload(self):
299         """Start the download from the peers."""
300         # Don't start twice
301         if self.started:
302             return
303         
304         log.msg('Starting to download %s' % self.path)
305         self.started = True
306         assert self.pieces, "You must initialize the piece hashes first"
307         
308         # Use the mirror if there are few peers
309         if len(self.peers) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
310             parsed = urlparse(self.mirror)
311             if parsed[0] == "http":
312                 site = splitHostPort(parsed[0], parsed[1])
313                 self.mirror_path = urlunparse(('', '') + parsed[2:])
314                 peer = self.manager.getPeer(site, mirror = True)
315                 self.peers[site] = {}
316                 self.peers[site]['peer'] = peer
317         
318         self.sitelist = self.peers.keys()
319         
320         # Special case if there's only one good peer left
321 #        if len(self.sitelist) == 1:
322 #            log.msg('Downloading from peer %r' % (self.peers[self.sitelist[0]]['peer'], ))
323 #            self.defer.callback(self.peers[self.sitelist[0]]['peer'].get(self.path))
324 #            return
325         
326         # Begin to download the pieces
327         self.outstanding = 0
328         self.nextFinish = 0
329         self.completePieces = [False for piece in self.pieces]
330         self.getPieces()
331         
332     #{ Downloading the pieces
333     def getPieces(self):
334         """Download the next pieces from the peers."""
335         if self.file.closed:
336             log.msg('Download has been aborted for %s' % self.path)
337             self.stream.allAvailable(remove = True)
338             return
339             
340         self.sort()
341         piece = self.nextFinish
342         while self.outstanding < 4 and self.sitelist and piece < len(self.completePieces):
343             if self.completePieces[piece] == False:
344                 # Send a request to the highest ranked peer
345                 site = self.sitelist.pop()
346                 self.completePieces[piece] = site
347                 log.msg('Sending a request for piece %d to peer %r' % (piece, self.peers[site]['peer']))
348                 
349                 self.outstanding += 1
350                 path = self.path
351                 if self.peers[site]['peer'].mirror:
352                     path = self.mirror_path
353                 if len(self.completePieces) > 1:
354                     df = self.peers[site]['peer'].getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
355                 else:
356                     df = self.peers[site]['peer'].get(path)
357                 reactor.callLater(0, df.addCallbacks,
358                                   *(self._getPiece, self._getError),
359                                   **{'callbackArgs': (piece, site),
360                                      'errbackArgs': (piece, site)})
361             piece += 1
362                 
363         # Check if we're done
364         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
365             log.msg('Download is complete for %s' % self.path)
366             self.stream.allAvailable(remove = True)
367     
368     def _getPiece(self, response, piece, site):
369         """Process the retrieved headers from the peer."""
370         if ((len(self.completePieces) > 1 and response.code != 206) or
371             (response.code not in (200, 206))):
372             # Request failed, try a different peer
373             log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, self.peers[site]['peer']))
374             self.peers[site]['peer'].hashError('Peer responded with the wrong type of download: %r' % response.code)
375             self.completePieces[piece] = False
376             if response.stream and response.stream.length:
377                 stream.readAndDiscard(response.stream)
378         else:
379             if self.defer:
380                 # Start sending the return file
381                 df = self.defer
382                 self.defer = None
383                 self.stream = GrowingFileStream(self.file, self.hash.expSize)
384
385                 # Get the headers from the peer's response
386                 headers = {}
387                 if response.headers.hasHeader('last-modified'):
388                     headers['last-modified'] = response.headers.getHeader('last-modified')
389                 resp = Response(200, headers, self.stream)
390                 df.callback(resp)
391
392             # Read the response stream to the file
393             log.msg('Streaming piece %d from peer %r' % (piece, self.peers[site]['peer']))
394             if response.code == 206:
395                 df = StreamToFile(self.hash.newPieceHasher(), response.stream,
396                                   self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
397             else:
398                 df = StreamToFile(self.hash.newHasher(), response.stream,
399                                   self.file).run()
400             reactor.callLater(0, df.addCallbacks,
401                               *(self._gotPiece, self._gotError),
402                               **{'callbackArgs': (piece, site),
403                                  'errbackArgs': (piece, site)})
404
405         self.outstanding -= 1
406         self.sitelist.append(site)
407         self.getPieces()
408
409     def _getError(self, err, piece, site):
410         """Peer failed, try again."""
411         log.msg('Got error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
412         self.outstanding -= 1
413         self.sitelist.append(site)
414         self.completePieces[piece] = False
415         self.getPieces()
416         log.err(err)
417
418     def _gotPiece(self, hash, piece, site):
419         """Process the retrieved piece from the peer."""
420         if self.pieces[piece] and hash.digest() != self.pieces[piece]:
421             # Hash doesn't match
422             log.msg('Hash error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
423             self.peers[site]['peer'].hashError('Piece received from peer does not match expected')
424             self.completePieces[piece] = False
425         else:
426             # Successfully completed one of several pieces
427             log.msg('Finished with piece %d from peer %r' % (piece, self.peers[site]['peer']))
428             self.completePieces[piece] = True
429             while (self.nextFinish < len(self.completePieces) and
430                    self.completePieces[self.nextFinish] == True):
431                 self.nextFinish += 1
432                 self.stream.updateAvailable(PIECE_SIZE)
433
434         self.getPieces()
435
436     def _gotError(self, err, piece, site):
437         """Piece download failed, try again."""
438         log.msg('Error streaming piece %d from peer %r: %r' % (piece, self.peers[site]['peer'], err))
439         log.err(err)
440         self.completePieces[piece] = False
441         self.getPieces()
442         
443 class PeerManager:
444     """Manage a set of peers and the requests to them.
445     
446     @type cache_dir: L{twisted.python.filepath.FilePath}
447     @ivar cache_dir: the directory to use for storing all files
448     @type dht: L{DHTManager.DHT}
449     @ivar dht: the DHT instance
450     @type stats: L{stats.StatsLogger}
451     @ivar stats: the statistics logger to record sent data to
452     @type clients: C{dictionary}
453     @ivar clients: the available peers that have been previously contacted
454     """
455
456     def __init__(self, cache_dir, dht, stats):
457         """Initialize the instance."""
458         self.cache_dir = cache_dir
459         self.cache_dir.restat(False)
460         if not self.cache_dir.exists():
461             self.cache_dir.makedirs()
462         self.dht = dht
463         self.stats = stats
464         self.clients = {}
465         
466     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
467         """Download from a list of peers or fallback to a mirror.
468         
469         @type hash: L{Hash.HashObject}
470         @param hash: the hash object containing the expected hash for the file
471         @param mirror: the URI of the file on the mirror
472         @type peers: C{list} of C{string}
473         @param peers: a list of the peer info where the file can be found
474             (optional, defaults to downloading from the mirror)
475         @type method: C{string}
476         @param method: the HTTP method to use, 'GET' or 'HEAD'
477             (optional, defaults to 'GET')
478         @type modtime: C{int}
479         @param modtime: the modification time to use for an 'If-Modified-Since'
480             header, as seconds since the epoch
481             (optional, defaults to not sending that header)
482         """
483         if not peers or method != "GET" or modtime is not None:
484             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
485             parsed = urlparse(mirror)
486             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
487             site = splitHostPort(parsed[0], parsed[1])
488             path = urlunparse(('', '') + parsed[2:])
489             peer = self.getPeer(site, mirror = True)
490             return peer.get(path, method, modtime)
491 #        elif len(peers) == 1:
492 #            site = uncompact(peers[0]['c'])
493 #            log.msg('Downloading from peer %r' % (site, ))
494 #            path = '/~/' + quote_plus(hash.expected())
495 #            peer = self.getPeer(site)
496 #            return peer.get(path)
497         else:
498             tmpfile = self.cache_dir.child(hash.hexexpected())
499             return FileDownload(self, hash, mirror, peers, tmpfile).run()
500         
501     def getPeer(self, site, mirror = False):
502         """Create a new peer if necessary and return it.
503         
504         @type site: (C{string}, C{int})
505         @param site: the IP address and port of the peer
506         @param mirror: whether the peer is actually a mirror
507             (optional, defaults to False)
508         """
509         if site not in self.clients:
510             self.clients[site] = Peer(site[0], site[1], self.stats)
511             if mirror:
512                 self.clients[site].mirror = True
513         return self.clients[site]
514     
515     def close(self):
516         """Close all the connections to peers."""
517         for site in self.clients:
518             self.clients[site].close()
519         self.clients = {}
520
521 class TestPeerManager(unittest.TestCase):
522     """Unit tests for the PeerManager."""
523     
524     manager = None
525     pending_calls = []
526     
527     def tearDown(self):
528         for p in self.pending_calls:
529             if p.active():
530                 p.cancel()
531         self.pending_calls = []
532         if self.manager:
533             self.manager.close()
534             self.manager = None