Update the version numbers and changelog for new release.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
8 import sha
9
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
15
16 from HTTPDownloader import Peer
17 from Streams import GrowingFileStream, StreamToFile
18 from util import uncompact
19 from Hash import PIECE_SIZE
20 from apt_p2p_Khashmir.bencode import bdecode
21 from apt_p2p_conf import config
22
23 class PeerError(Exception):
24     """An error occurred downloading from peers."""
25     
26 class FileDownload:
27     """Manage a download from a list of peers or a mirror.
28     
29     @type manager: L{PeerManager}
30     @ivar manager: the manager to send requests for peers to
31     @type hash: L{Hash.HashObject}
32     @ivar hash: the hash object containing the expected hash for the file
33     @ivar mirror: the URI of the file on the mirror
34     @type compact_peers: C{list} of C{dictionary}
35     @ivar compact_peers: a list of the peer info where the file can be found
36     @type file: C{file}
37     @ivar file: the open file to right the download to
38     @type path: C{string}
39     @ivar path: the path to request from peers to access the file
40     @type pieces: C{list} of C{string} 
41     @ivar pieces: the hashes of the pieces in the file
42     @type started: C{boolean}
43     @ivar started: whether the download has begun yet
44     @type defer: L{twisted.internet.defer.Deferred}
45     @ivar defer: the deferred that will callback with the result of the download
46     @type peers: C{dictionary}
47     @ivar peers: information about each of the peers available to download from
48     @type outstanding: C{int}
49     @ivar outstanding: the number of requests to peers currently outstanding
50     @type peerlist: C{list} of L{HTTPDownloader.Peer}
51     @ivar peerlist: the sorted list of peers for this download
52     @type stream: L{GrowingFileStream}
53     @ivar stream: the stream of resulting data from the download
54     @type nextFinish: C{int}
55     @ivar nextFinish: the next piece that is needed to finish for the stream
56     @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
57     @ivar completePieces: one per piece, will be False if no requests are
58         outstanding for the piece, True if the piece has been successfully
59         downloaded, or the Peer that a request for this piece has been sent  
60     """
61     
62     def __init__(self, manager, hash, mirror, compact_peers, file):
63         """Initialize the instance and check for piece hashes.
64         
65         @type manager: L{PeerManager}
66         @param manager: the manager to send requests for peers to
67         @type hash: L{Hash.HashObject}
68         @param hash: the hash object containing the expected hash for the file
69         @param mirror: the URI of the file on the mirror
70         @type compact_peers: C{list} of C{dictionary}
71         @param compact_peers: a list of the peer info where the file can be found
72         @type file: L{twisted.python.filepath.FilePath}
73         @param file: the temporary file to use to store the downloaded file
74         """
75         self.manager = manager
76         self.hash = hash
77         self.mirror = mirror
78         self.compact_peers = compact_peers
79         
80         self.path = '/~/' + quote_plus(hash.expected())
81         self.defer = None
82         self.mirror_path = None
83         self.pieces = None
84         self.started = False
85         
86         file.restat(False)
87         if file.exists():
88             file.remove()
89         self.file = file.open('w+')
90
91     def run(self):
92         """Start the downloading process."""
93         log.msg('Checking for pieces for %s' % self.path)
94         self.defer = defer.Deferred()
95         self.peers = {}
96         no_pieces = 0
97         pieces_string = {0: 0}
98         pieces_hash = {0: 0}
99         pieces_dl_hash = {0: 0}
100
101         for compact_peer in self.compact_peers:
102             # Build a list of all the peers for this download
103             site = uncompact(compact_peer['c'])
104             peer = self.manager.getPeer(site)
105             self.peers.setdefault(site, {})['peer'] = peer
106
107             # Extract any piece information from the peers list
108             if 't' in compact_peer:
109                 self.peers[site]['t'] = compact_peer['t']['t']
110                 pieces_string.setdefault(compact_peer['t']['t'], 0)
111                 pieces_string[compact_peer['t']['t']] += 1
112             elif 'h' in compact_peer:
113                 self.peers[site]['h'] = compact_peer['h']
114                 pieces_hash.setdefault(compact_peer['h'], 0)
115                 pieces_hash[compact_peer['h']] += 1
116             elif 'l' in compact_peer:
117                 self.peers[site]['l'] = compact_peer['l']
118                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
119                 pieces_dl_hash[compact_peer['l']] += 1
120             else:
121                 no_pieces += 1
122         
123         # Select the most popular piece info
124         max_found = max(no_pieces, max(pieces_string.values()),
125                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
126
127         if max_found < len(self.peers):
128             log.msg('Misleading piece information found, using most popular %d of %d peers' % 
129                     (max_found, len(self.peers)))
130
131         if max_found == no_pieces:
132             # The file is not split into pieces
133             log.msg('No pieces were found for the file')
134             self.pieces = [self.hash.expected()]
135             self.startDownload()
136         elif max_found == max(pieces_string.values()):
137             # Small number of pieces in a string
138             for pieces, num in pieces_string.items():
139                 # Find the most popular piece string
140                 if num == max_found:
141                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
142                     log.msg('Peer info contained %d piece hashes' % len(self.pieces))
143                     self.startDownload()
144                     break
145         elif max_found == max(pieces_hash.values()):
146             # Medium number of pieces stored in the DHT
147             for pieces, num in pieces_hash.items():
148                 # Find the most popular piece hash to lookup
149                 if num == max_found:
150                     log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
151                     self.getDHTPieces(pieces)
152                     break
153         elif max_found == max(pieces_dl_hash.values()):
154             # Large number of pieces stored in peers
155             for pieces, num in pieces_dl_hash.items():
156                 # Find the most popular piece hash to download
157                 if num == max_found:
158                     log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
159                     self.getPeerPieces(pieces)
160                     break
161         return self.defer
162
163     #{ Downloading the piece hashes
164     def getDHTPieces(self, key):
165         """Retrieve the piece information from the DHT.
166         
167         @param key: the key to lookup in the DHT
168         """
169         # Remove any peers with the wrong piece hash
170         #for site in self.peers.keys():
171         #    if self.peers[site].get('h', '') != key:
172         #        del self.peers[site]
173
174         # Start the DHT lookup
175         lookupDefer = self.manager.dht.get(key)
176         lookupDefer.addBoth(self._getDHTPieces, key)
177         
178     def _getDHTPieces(self, results, key):
179         """Check the retrieved values."""
180         if isinstance(results, list):
181             for result in results:
182                 # Make sure the hash matches the key
183                 result_hash = sha.new(result.get('t', '')).digest()
184                 if result_hash == key:
185                     pieces = result['t']
186                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
187                     log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
188                     self.startDownload()
189                     return
190                 
191             log.msg('Could not retrieve the piece hashes from the DHT')
192         else:
193             log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
194             
195         # Continue without the piece hashes
196         self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
197         self.startDownload()
198
199     def getPeerPieces(self, key, failedSite = None):
200         """Retrieve the piece information from the peers.
201         
202         @param key: the key to request from the peers
203         """
204         if failedSite is None:
205             log.msg('Starting the lookup of piece hashes in peers')
206             self.outstanding = 0
207             # Remove any peers with the wrong piece hash
208             #for site in self.peers.keys():
209             #    if self.peers[site].get('l', '') != key:
210             #        del self.peers[site]
211         else:
212             log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
213             self.peers[failedSite]['failed'] = True
214             self.outstanding -= 1
215
216         if self.pieces is None:
217             # Send a request to one or more peers
218             for site in self.peers:
219                 if self.peers[site].get('failed', False) != True:
220                     log.msg('Sending a piece hash request to %r' % (site, ))
221                     path = '/~/' + quote_plus(key)
222                     lookupDefer = self.peers[site]['peer'].get(path)
223                     reactor.callLater(0, lookupDefer.addCallbacks,
224                                       *(self._getPeerPieces, self._gotPeerError),
225                                       **{'callbackArgs': (key, site),
226                                          'errbackArgs': (key, site)})
227                     self.outstanding += 1
228                     if self.outstanding >= 4:
229                         break
230         
231         if self.pieces is None and self.outstanding <= 0:
232             # Continue without the piece hashes
233             log.msg('Could not retrieve the piece hashes from the peers')
234             self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
235             self.startDownload()
236         
237     def _getPeerPieces(self, response, key, site):
238         """Process the retrieved response from the peer."""
239         log.msg('Got a piece hash response %d from %r' % (response.code, site))
240         if response.code != 200:
241             # Request failed, try a different peer
242             self.getPeerPieces(key, site)
243         else:
244             # Read the response stream to a string
245             self.peers[site]['pieces'] = ''
246             def _gotPeerPiece(data, self = self, site = site):
247                 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
248                 self.peers[site]['pieces'] += data
249             log.msg('Streaming piece hashes from peer')
250             df = stream.readStream(response.stream, _gotPeerPiece)
251             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
252                             callbackArgs=(key, site), errbackArgs=(key, site))
253
254     def _gotPeerError(self, err, key, site):
255         """Peer failed, try again."""
256         log.msg('Peer piece hash request failed for %r' % (site, ))
257         log.err(err)
258         self.getPeerPieces(key, site)
259
260     def _gotPeerPieces(self, result, key, site):
261         """Check the retrieved pieces from the peer."""
262         log.msg('Finished streaming piece hashes from peer %r' % (site, ))
263         if self.pieces is not None:
264             # Already done
265             log.msg('Already done')
266             return
267         
268         try:
269             result = bdecode(self.peers[site]['pieces'])
270         except:
271             log.msg('Error bdecoding piece hashes')
272             log.err()
273             self.getPeerPieces(key, site)
274             return
275             
276         result_hash = sha.new(result.get('t', '')).digest()
277         if result_hash == key:
278             pieces = result['t']
279             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
280             log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
281             self.startDownload()
282         else:
283             log.msg('Peer returned a piece string that did not match')
284             self.getPeerPieces(key, site)
285
286     #{ Downloading the file
287     def sort(self):
288         """Sort the peers by their rank (highest ranked at the end)."""
289         def sort(a, b):
290             """Sort peers by their rank."""
291             if a.rank > b.rank:
292                 return 1
293             elif a.rank < b.rank:
294                 return -1
295             return 0
296         self.peerlist.sort(sort)
297
298     def startDownload(self):
299         """Start the download from the peers."""
300         # Don't start twice
301         if self.started:
302             return
303         
304         log.msg('Starting to download %s' % self.path)
305         self.started = True
306         assert self.pieces, "You must initialize the piece hashes first"
307         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
308         
309         # Use the mirror if there are few peers
310         if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
311             parsed = urlparse(self.mirror)
312             if parsed[0] == "http":
313                 site = splitHostPort(parsed[0], parsed[1])
314                 self.mirror_path = urlunparse(('', '') + parsed[2:])
315                 peer = self.manager.getPeer(site, mirror = True)
316                 self.peerlist.append(peer)
317         
318         # Special case if there's only one good peer left
319 #        if len(self.peerlist) == 1:
320 #            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
321 #            self.defer.callback(self.peerlist[0].get(self.path))
322 #            return
323         
324         # Begin to download the pieces
325         self.outstanding = 0
326         self.nextFinish = 0
327         self.completePieces = [False for piece in self.pieces]
328         self.getPieces()
329         
330     #{ Downloading the pieces
331     def getPieces(self):
332         """Download the next pieces from the peers."""
333         if self.file.closed:
334             log.msg('Download has been aborted for %s' % self.path)
335             self.stream.allAvailable(remove = True)
336             return
337             
338         self.sort()
339         piece = self.nextFinish
340         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
341             if self.completePieces[piece] == False:
342                 # Send a request to the highest ranked peer
343                 peer = self.peerlist.pop()
344                 self.completePieces[piece] = peer
345                 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
346                 
347                 self.outstanding += 1
348                 path = self.path
349                 if peer.mirror:
350                     path = self.mirror_path
351                 if len(self.completePieces) > 1:
352                     df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
353                 else:
354                     df = peer.get(path)
355                 reactor.callLater(0, df.addCallbacks,
356                                   *(self._getPiece, self._getError),
357                                   **{'callbackArgs': (piece, peer),
358                                      'errbackArgs': (piece, peer)})
359             piece += 1
360                 
361         # Check if we're done
362         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
363             log.msg('Download is complete for %s' % self.path)
364             self.stream.allAvailable(remove = True)
365     
366     def _getPiece(self, response, piece, peer):
367         """Process the retrieved headers from the peer."""
368         if ((len(self.completePieces) > 1 and response.code != 206) or
369             (response.code not in (200, 206))):
370             # Request failed, try a different peer
371             log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
372             peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
373             self.completePieces[piece] = False
374             if response.stream and response.stream.length:
375                 stream.readAndDiscard(response.stream)
376         else:
377             if self.defer:
378                 # Start sending the return file
379                 df = self.defer
380                 self.defer = None
381                 self.stream = GrowingFileStream(self.file, self.hash.expSize)
382
383                 # Get the headers from the peer's response
384                 headers = {}
385                 if response.headers.hasHeader('last-modified'):
386                     headers['last-modified'] = response.headers.getHeader('last-modified')
387                 resp = Response(200, headers, self.stream)
388                 df.callback(resp)
389
390             # Read the response stream to the file
391             log.msg('Streaming piece %d from peer %r' % (piece, peer))
392             if response.code == 206:
393                 df = StreamToFile(self.hash.newPieceHasher(), response.stream,
394                                   self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
395             else:
396                 df = StreamToFile(self.hash.newHasher(), response.stream,
397                                   self.file).run()
398             reactor.callLater(0, df.addCallbacks,
399                               *(self._gotPiece, self._gotError),
400                               **{'callbackArgs': (piece, peer),
401                                  'errbackArgs': (piece, peer)})
402
403         self.outstanding -= 1
404         self.peerlist.append(peer)
405         self.getPieces()
406
407     def _getError(self, err, piece, peer):
408         """Peer failed, try again."""
409         log.msg('Got error for piece %d from peer %r' % (piece, peer))
410         self.outstanding -= 1
411         self.peerlist.append(peer)
412         self.completePieces[piece] = False
413         self.getPieces()
414         log.err(err)
415
416     def _gotPiece(self, hash, piece, peer):
417         """Process the retrieved piece from the peer."""
418         if self.pieces[piece] and hash.digest() != self.pieces[piece]:
419             # Hash doesn't match
420             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
421             peer.hashError('Piece received from peer does not match expected')
422             self.completePieces[piece] = False
423         else:
424             # Successfully completed one of several pieces
425             log.msg('Finished with piece %d from peer %r' % (piece, peer))
426             self.completePieces[piece] = True
427             while (self.nextFinish < len(self.completePieces) and
428                    self.completePieces[self.nextFinish] == True):
429                 self.nextFinish += 1
430                 self.stream.updateAvailable(PIECE_SIZE)
431
432         self.getPieces()
433
434     def _gotError(self, err, piece, peer):
435         """Piece download failed, try again."""
436         log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, err))
437         log.err(err)
438         self.completePieces[piece] = False
439         self.getPieces()
440         
441 class PeerManager:
442     """Manage a set of peers and the requests to them.
443     
444     @type cache_dir: L{twisted.python.filepath.FilePath}
445     @ivar cache_dir: the directory to use for storing all files
446     @type dht: L{DHTManager.DHT}
447     @ivar dht: the DHT instance
448     @type stats: L{stats.StatsLogger}
449     @ivar stats: the statistics logger to record sent data to
450     @type clients: C{dictionary}
451     @ivar clients: the available peers that have been previously contacted
452     """
453
454     def __init__(self, cache_dir, dht, stats):
455         """Initialize the instance."""
456         self.cache_dir = cache_dir
457         self.cache_dir.restat(False)
458         if not self.cache_dir.exists():
459             self.cache_dir.makedirs()
460         self.dht = dht
461         self.stats = stats
462         self.clients = {}
463         
464     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
465         """Download from a list of peers or fallback to a mirror.
466         
467         @type hash: L{Hash.HashObject}
468         @param hash: the hash object containing the expected hash for the file
469         @param mirror: the URI of the file on the mirror
470         @type peers: C{list} of C{string}
471         @param peers: a list of the peer info where the file can be found
472             (optional, defaults to downloading from the mirror)
473         @type method: C{string}
474         @param method: the HTTP method to use, 'GET' or 'HEAD'
475             (optional, defaults to 'GET')
476         @type modtime: C{int}
477         @param modtime: the modification time to use for an 'If-Modified-Since'
478             header, as seconds since the epoch
479             (optional, defaults to not sending that header)
480         """
481         if not peers or method != "GET" or modtime is not None:
482             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
483             parsed = urlparse(mirror)
484             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
485             site = splitHostPort(parsed[0], parsed[1])
486             path = urlunparse(('', '') + parsed[2:])
487             peer = self.getPeer(site, mirror = True)
488             return peer.get(path, method, modtime)
489 #        elif len(peers) == 1:
490 #            site = uncompact(peers[0]['c'])
491 #            log.msg('Downloading from peer %r' % (site, ))
492 #            path = '/~/' + quote_plus(hash.expected())
493 #            peer = self.getPeer(site)
494 #            return peer.get(path)
495         else:
496             tmpfile = self.cache_dir.child(hash.hexexpected())
497             return FileDownload(self, hash, mirror, peers, tmpfile).run()
498         
499     def getPeer(self, site, mirror = False):
500         """Create a new peer if necessary and return it.
501         
502         @type site: (C{string}, C{int})
503         @param site: the IP address and port of the peer
504         @param mirror: whether the peer is actually a mirror
505             (optional, defaults to False)
506         """
507         if site not in self.clients:
508             self.clients[site] = Peer(site[0], site[1], self.stats)
509             if mirror:
510                 self.clients[site].mirror = True
511         return self.clients[site]
512     
513     def close(self):
514         """Close all the connections to peers."""
515         for site in self.clients:
516             self.clients[site].close()
517         self.clients = {}
518
519 class TestPeerManager(unittest.TestCase):
520     """Unit tests for the PeerManager."""
521     
522     manager = None
523     pending_calls = []
524     
525     def tearDown(self):
526         for p in self.pending_calls:
527             if p.active():
528                 p.cancel()
529         self.pending_calls = []
530         if self.manager:
531             self.manager.close()
532             self.manager = None