Multiple peer downloading is mostly working now.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
8 import sha
9
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
15
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20
21 class GrowingFileStream(stream.FileStream):
22     """Modified to stream data from a file as it becomes available.
23     
24     @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
25     @ivar deferred: waiting for the result of the last read attempt
26     @ivar available: the number of bytes that are currently available to read
27     @ivar position: the current position in the file where the next read will begin
28     @ivar finished: True when no more data will be coming available
29     """
30
31     CHUNK_SIZE = 4*1024
32
33     def __init__(self, f, length = None):
34         stream.FileStream.__init__(self, f)
35         self.length = length
36         self.deferred = None
37         self.available = 0L
38         self.position = 0L
39         self.finished = False
40
41     def updateAvailable(self, newlyAvailable):
42         """Update the number of bytes that are available.
43         
44         Call it with 0 to trigger reading of a fully read file.
45         
46         @param newlyAvailable: the number of bytes that just became available
47         """
48         assert not self.finished
49         self.available += newlyAvailable
50         
51         # If a read is pending, let it go
52         if self.deferred and self.position < self.available:
53             # Try to read some data from the file
54             length = self.available - self.position
55             readSize = min(length, self.CHUNK_SIZE)
56             self.f.seek(self.position)
57             b = self.f.read(readSize)
58             bytesRead = len(b)
59             
60             # Check if end of file was reached
61             if bytesRead:
62                 self.position += bytesRead
63                 deferred = self.deferred
64                 self.deferred = None
65                 deferred.callback(b)
66
67     def allAvailable(self):
68         """Indicate that no more data will be coming available."""
69         self.finished = True
70
71         # If a read is pending, let it go
72         if self.deferred:
73             if self.position < self.available:
74                 # Try to read some data from the file
75                 length = self.available - self.position
76                 readSize = min(length, self.CHUNK_SIZE)
77                 self.f.seek(self.position)
78                 b = self.f.read(readSize)
79                 bytesRead = len(b)
80     
81                 # Check if end of file was reached
82                 if bytesRead:
83                     self.position += bytesRead
84                     deferred = self.deferred
85                     self.deferred = None
86                     deferred.callback(b)
87                 else:
88                     # We're done
89                     deferred = self.deferred
90                     self.deferred = None
91                     deferred.callback(None)
92             else:
93                 # We're done
94                 deferred = self.deferred
95                 self.deferred = None
96                 deferred.callback(None)
97         
98     def read(self, sendfile=False):
99         assert not self.deferred, "A previous read is still deferred."
100
101         if self.f is None:
102             return None
103
104         length = self.available - self.position
105         readSize = min(length, self.CHUNK_SIZE)
106
107         # If we don't have any available, we're done or deferred
108         if readSize <= 0:
109             if self.finished:
110                 return None
111             else:
112                 self.deferred = defer.Deferred()
113                 return self.deferred
114
115         # Try to read some data from the file
116         self.f.seek(self.position)
117         b = self.f.read(readSize)
118         bytesRead = len(b)
119         if not bytesRead:
120             # End of file was reached, we're done or deferred
121             if self.finished:
122                 return None
123             else:
124                 self.deferred = defer.Deferred()
125                 return self.deferred
126         else:
127             self.position += bytesRead
128             return b
129
130 class StreamToFile:
131     """Save a stream to a partial file and hash it.
132     
133     @type stream: L{twisted.web2.stream.IByteStream}
134     @ivar stream: the input stream being read
135     @type outFile: L{twisted.python.filepath.FilePath}
136     @ivar outFile: the file being written
137     @type hash: C{sha1}
138     @ivar hash: the hash object for the data
139     @type position: C{int}
140     @ivar position: the current file position to write the next data to
141     @type length: C{int}
142     @ivar length: the position in the file to not write beyond
143     @type doneDefer: L{twisted.internet.defer.Deferred}
144     @ivar doneDefer: the deferred that will fire when done writing
145     """
146     
147     def __init__(self, inputStream, outFile, start = 0, length = None):
148         """Initializes the file.
149         
150         @type inputStream: L{twisted.web2.stream.IByteStream}
151         @param inputStream: the input stream to read from
152         @type outFile: L{twisted.python.filepath.FilePath}
153         @param outFile: the file to write to
154         @type start: C{int}
155         @param start: the file position to start writing at
156             (optional, defaults to the start of the file)
157         @type length: C{int}
158         @param length: the maximum amount of data to write to the file
159             (optional, defaults to not limiting the writing to the file
160         """
161         self.stream = inputStream
162         self.outFile = outFile
163         self.hash = sha.new()
164         self.position = start
165         self.length = None
166         if length is not None:
167             self.length = start + length
168         self.doneDefer = None
169         
170     def run(self):
171         """Start the streaming.
172
173         @rtype: L{twisted.internet.defer.Deferred}
174         """
175         log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
176         self.doneDefer = stream.readStream(self.stream, self._gotData)
177         self.doneDefer.addCallbacks(self._done, self._error)
178         return self.doneDefer
179
180     def _gotData(self, data):
181         """Process the received data."""
182         if self.outFile.closed:
183             raise Exception, "outFile was unexpectedly closed"
184         
185         if data is None:
186             raise Exception, "Data is None?"
187         
188         # Make sure we don't go too far
189         if self.length is not None and self.position + len(data) > self.length:
190             data = data[:(self.length - self.position)]
191         
192         # Write and hash the streamed data
193         self.outFile.seek(self.position)
194         self.outFile.write(data)
195         self.hash.update(data)
196         self.position += len(data)
197         
198     def _done(self, result):
199         """Return the result."""
200         log.msg('Streaming is complete')
201         return self.hash.digest()
202     
203     def _error(self, err):
204         """Log the error."""
205         log.msg('Streaming error')
206         log.err(err)
207         return err
208     
209 class FileDownload:
210     """Manage a download from a list of peers or a mirror.
211     
212     @type manager: L{PeerManager}
213     @ivar manager: the manager to send requests for peers to
214     @type hash: L{Hash.HashObject}
215     @ivar hash: the hash object containing the expected hash for the file
216     @ivar mirror: the URI of the file on the mirror
217     @type compact_peers: C{list} of C{dictionary}
218     @ivar compact_peers: a list of the peer info where the file can be found
219     @type file: C{file}
220     @ivar file: the open file to right the download to
221     @type path: C{string}
222     @ivar path: the path to request from peers to access the file
223     @type pieces: C{list} of C{string} 
224     @ivar pieces: the hashes of the pieces in the file
225     @type started: C{boolean}
226     @ivar started: whether the download has begun yet
227     @type defer: L{twisted.internet.defer.Deferred}
228     @ivar defer: the deferred that will callback with the result of the download
229     @type peers: C{dictionary}
230     @ivar peers: information about each of the peers available to download from
231     @type outstanding: C{int}
232     @ivar outstanding: the number of requests to peers currently outstanding
233     @type peerlist: C{list} of L{HTTPDownloader.Peer}
234     @ivar peerlist: the sorted list of peers for this download
235     @type stream: L{GrowingFileStream}
236     @ivar stream: the stream of resulting data from the download
237     @type nextFinish: C{int}
238     @ivar nextFinish: the next piece that is needed to finish for the stream
239     @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
240     @ivar completePieces: one per piece, will be False if no requests are
241         outstanding for the piece, True if the piece has been successfully
242         downloaded, or the Peer that a request for this piece has been sent  
243     """
244     
245     def __init__(self, manager, hash, mirror, compact_peers, file):
246         """Initialize the instance and check for piece hashes.
247         
248         @type manager: L{PeerManager}
249         @param manager: the manager to send requests for peers to
250         @type hash: L{Hash.HashObject}
251         @param hash: the hash object containing the expected hash for the file
252         @param mirror: the URI of the file on the mirror
253         @type compact_peers: C{list} of C{dictionary}
254         @param compact_peers: a list of the peer info where the file can be found
255         @type file: L{twisted.python.filepath.FilePath}
256         @param file: the temporary file to use to store the downloaded file
257         """
258         self.manager = manager
259         self.hash = hash
260         self.mirror = mirror
261         self.compact_peers = compact_peers
262         
263         self.path = '/~/' + quote_plus(hash.expected())
264         self.pieces = None
265         self.started = False
266         
267         file.restat(False)
268         if file.exists():
269             file.remove()
270         self.file = file.open('w+')
271
272     def run(self):
273         """Start the downloading process."""
274         log.msg('Checking for pieces for %s' % self.path)
275         self.defer = defer.Deferred()
276         self.peers = {}
277         no_pieces = 0
278         pieces_string = {0: 0}
279         pieces_hash = {0: 0}
280         pieces_dl_hash = {0: 0}
281
282         for compact_peer in self.compact_peers:
283             # Build a list of all the peers for this download
284             site = uncompact(compact_peer['c'])
285             peer = self.manager.getPeer(site)
286             self.peers.setdefault(site, {})['peer'] = peer
287
288             # Extract any piece information from the peers list
289             if 't' in compact_peer:
290                 self.peers[site]['t'] = compact_peer['t']['t']
291                 pieces_string.setdefault(compact_peer['t']['t'], 0)
292                 pieces_string[compact_peer['t']['t']] += 1
293             elif 'h' in compact_peer:
294                 self.peers[site]['h'] = compact_peer['h']
295                 pieces_hash.setdefault(compact_peer['h'], 0)
296                 pieces_hash[compact_peer['h']] += 1
297             elif 'l' in compact_peer:
298                 self.peers[site]['l'] = compact_peer['l']
299                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
300                 pieces_dl_hash[compact_peer['l']] += 1
301             else:
302                 no_pieces += 1
303         
304         # Select the most popular piece info
305         max_found = max(no_pieces, max(pieces_string.values()),
306                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
307
308         if max_found < len(self.peers):
309             log.msg('Misleading piece information found, using most popular %d of %d peers' % 
310                     (max_found, len(self.peers)))
311
312         if max_found == no_pieces:
313             # The file is not split into pieces
314             log.msg('No pieces were found for the file')
315             self.pieces = []
316             self.startDownload()
317         elif max_found == max(pieces_string.values()):
318             # Small number of pieces in a string
319             for pieces, num in pieces_string.items():
320                 # Find the most popular piece string
321                 if num == max_found:
322                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
323                     log.msg('Peer info contained %d piece hashes' % len(self.pieces))
324                     self.startDownload()
325                     break
326         elif max_found == max(pieces_hash.values()):
327             # Medium number of pieces stored in the DHT
328             for pieces, num in pieces_hash.items():
329                 # Find the most popular piece hash to lookup
330                 if num == max_found:
331                     log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
332                     self.getDHTPieces(pieces)
333                     break
334         elif max_found == max(pieces_dl_hash.values()):
335             # Large number of pieces stored in peers
336             for pieces, num in pieces_hash.items():
337                 # Find the most popular piece hash to download
338                 if num == max_found:
339                     log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
340                     self.getPeerPieces(pieces)
341                     break
342         return self.defer
343
344     #{ Downloading the piece hashes
345     def getDHTPieces(self, key):
346         """Retrieve the piece information from the DHT.
347         
348         @param key: the key to lookup in the DHT
349         """
350         # Remove any peers with the wrong piece hash
351         #for site in self.peers.keys():
352         #    if self.peers[site].get('h', '') != key:
353         #        del self.peers[site]
354
355         # Start the DHT lookup
356         lookupDefer = self.manager.dht.getValue(key)
357         lookupDefer.addCallback(self._getDHTPieces, key)
358         
359     def _getDHTPieces(self, results, key):
360         """Check the retrieved values."""
361         for result in results:
362             # Make sure the hash matches the key
363             result_hash = sha.new(result.get('t', '')).digest()
364             if result_hash == key:
365                 pieces = result['t']
366                 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
367                 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
368                 self.startDownload()
369                 return
370             
371         # Continue without the piece hashes
372         log.msg('Could not retrieve the piece hashes from the DHT')
373         self.pieces = []
374         self.startDownload()
375
376     def getPeerPieces(self, key, failedSite = None):
377         """Retrieve the piece information from the peers.
378         
379         @param key: the key to request from the peers
380         """
381         if failedSite is None:
382             log.msg('Starting the lookup of piece hashes in peers')
383             self.outstanding = 0
384             # Remove any peers with the wrong piece hash
385             #for site in self.peers.keys():
386             #    if self.peers[site].get('l', '') != key:
387             #        del self.peers[site]
388         else:
389             log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
390             self.peers[failedSite]['failed'] = True
391             self.outstanding -= 1
392
393         if self.pieces is None:
394             # Send a request to one or more peers
395             log.msg('Checking for a peer to request piece hashes from')
396             for site in self.peers:
397                 if self.peers[site].get('failed', False) != True:
398                     log.msg('Sending a piece hash request to %r' % (site, ))
399                     path = '/~/' + quote_plus(key)
400                     lookupDefer = self.peers[site]['peer'].get(path)
401                     reactor.callLater(0, lookupDefer.addCallbacks,
402                                       *(self._getPeerPieces, self._gotPeerError),
403                                       **{'callbackArgs': (key, site),
404                                          'errbackArgs': (key, site)})
405                     self.outstanding += 1
406                     if self.outstanding >= 3:
407                         break
408         
409         log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
410         if self.pieces is None and self.outstanding == 0:
411             # Continue without the piece hashes
412             log.msg('Could not retrieve the piece hashes from the peers')
413             self.pieces = []
414             self.startDownload()
415         
416     def _getPeerPieces(self, response, key, site):
417         """Process the retrieved response from the peer."""
418         log.msg('Got a piece hash response %d from %r' % (response.code, site))
419         if response.code != 200:
420             # Request failed, try a different peer
421             log.msg('Did not like response %d from %r' % (response.code, site))
422             self.getPeerPieces(key, site)
423         else:
424             # Read the response stream to a string
425             self.peers[site]['pieces'] = ''
426             def _gotPeerPiece(data, self = self, site = site):
427                 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
428                 self.peers[site]['pieces'] += data
429             log.msg('Streaming piece hashes from peer')
430             df = stream.readStream(response.stream, _gotPeerPiece)
431             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
432                             callbackArgs=(key, site), errbackArgs=(key, site))
433
434     def _gotPeerError(self, err, key, site):
435         """Peer failed, try again."""
436         log.msg('Peer piece hash request failed for %r' % (site, ))
437         log.err(err)
438         self.getPeerPieces(key, site)
439
440     def _gotPeerPieces(self, result, key, site):
441         """Check the retrieved pieces from the peer."""
442         log.msg('Finished streaming piece hashes from peer %r' % (site, ))
443         if self.pieces is not None:
444             # Already done
445             log.msg('Already done')
446             return
447         
448         try:
449             result = bdecode(self.peers[site]['pieces'])
450         except:
451             log.msg('Error bdecoding piece hashes')
452             log.err()
453             self.getPeerPieces(key, site)
454             return
455             
456         result_hash = sha.new(result.get('t', '')).digest()
457         if result_hash == key:
458             pieces = result['t']
459             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
460             log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
461             self.startDownload()
462         else:
463             log.msg('Peer returned a piece string that did not match')
464             self.getPeerPieces(key, site)
465
466     #{ Downloading the file
467     def sort(self):
468         """Sort the peers by their rank (highest ranked at the end)."""
469         def sort(a, b):
470             """Sort peers by their rank."""
471             if a.rank > b.rank:
472                 return 1
473             elif a.rank < b.rank:
474                 return -1
475             return 0
476         self.peerlist.sort(sort)
477
478     def startDownload(self):
479         """Start the download from the peers."""
480         # Don't start twice
481         if self.started:
482             return
483         
484         log.msg('Starting to download %s' % self.path)
485         self.started = True
486         assert self.pieces is not None, "You must initialize the piece hashes first"
487         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
488         
489         # Special case if there's only one good peer left
490         if len(self.peerlist) == 1:
491             log.msg('Downloading from peer %r' % (self.peerlist[0], ))
492             self.defer.callback(self.peerlist[0].get(self.path))
493             return
494         
495         # Start sending the return file
496         self.stream = GrowingFileStream(self.file, self.hash.expSize)
497         resp = Response(200, {}, self.stream)
498         self.defer.callback(resp)
499
500         # Begin to download the pieces
501         self.outstanding = 0
502         self.nextFinish = 0
503         if self.pieces:
504             self.completePieces = [False for piece in self.pieces]
505         else:
506             self.completePieces = [False]
507         self.getPieces()
508         
509     #{ Downloading the pieces
510     def getPieces(self):
511         """Download the next pieces from the peers."""
512         log.msg('Checking for more piece requests to send')
513         self.sort()
514         piece = self.nextFinish
515         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
516             log.msg('Checking piece %d' % piece)
517             if self.completePieces[piece] == False:
518                 # Send a request to the highest ranked peer
519                 peer = self.peerlist.pop()
520                 self.completePieces[piece] = peer
521                 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
522                 
523                 self.outstanding += 1
524                 if self.pieces:
525                     df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
526                 else:
527                     df = peer.get(self.path)
528                 reactor.callLater(0, df.addCallbacks,
529                                   *(self._getPiece, self._getError),
530                                   **{'callbackArgs': (piece, peer),
531                                      'errbackArgs': (piece, peer)})
532             piece += 1
533                 
534         log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
535         # Check if we're done
536         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
537             log.msg('We seem to be done with all pieces')
538             self.stream.allAvailable()
539     
540     def _getPiece(self, response, piece, peer):
541         """Process the retrieved headers from the peer."""
542         log.msg('Got response for piece %d from peer %r' % (piece, peer))
543         if ((len(self.completePieces) > 1 and response.code != 206) or
544             (response.code not in (200, 206))):
545             # Request failed, try a different peer
546             log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
547             peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
548             self.completePieces[piece] = False
549             if response.stream and response.stream.length:
550                 stream.readAndDiscard(response.stream)
551         else:
552             # Read the response stream to the file
553             log.msg('Streaming piece %d from peer %r' % (piece, peer))
554             if response.code == 206:
555                 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
556             else:
557                 df = StreamToFile(response.stream, self.file).run()
558             df.addCallbacks(self._gotPiece, self._gotError,
559                             callbackArgs=(piece, peer), errbackArgs=(piece, peer))
560
561         self.outstanding -= 1
562         self.peerlist.append(peer)
563         self.getPieces()
564
565     def _getError(self, err, piece, peer):
566         """Peer failed, try again."""
567         log.msg('Got error for piece %d from peer %r' % (piece, peer))
568         self.outstanding -= 1
569         self.peerlist.append(peer)
570         self.completePieces[piece] = False
571         self.getPieces()
572         log.err(err)
573
574     def _gotPiece(self, response, piece, peer):
575         """Process the retrieved piece from the peer."""
576         log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
577         if ((self.pieces and response != self.pieces[piece]) or
578             (len(self.pieces) == 0 and response != self.hash.expected())):
579             # Hash doesn't match
580             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
581             peer.hashError('Piece received from peer does not match expected')
582             self.completePieces[piece] = False
583         elif self.pieces:
584             # Successfully completed one of several pieces
585             log.msg('Finished with piece %d from peer %r' % (piece, peer))
586             self.completePieces[piece] = True
587             while (self.nextFinish < len(self.completePieces) and
588                    self.completePieces[self.nextFinish] == True):
589                 self.nextFinish += 1
590                 self.stream.updateAvailable(PIECE_SIZE)
591         else:
592             # Whole download (only one piece) is complete
593             log.msg('Piece %d from peer %r is the last piece' % (piece, peer))
594             self.completePieces[piece] = True
595             self.nextFinish = 1
596             self.stream.updateAvailable(2**30)
597
598         self.getPieces()
599
600     def _gotError(self, err, piece, peer):
601         """Piece download failed, try again."""
602         log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
603         log.err(err)
604         self.completePieces[piece] = False
605         self.getPieces()
606         
607 class PeerManager:
608     """Manage a set of peers and the requests to them.
609     
610     @type cache_dir: L{twisted.python.filepath.FilePath}
611     @ivar cache_dir: the directory to use for storing all files
612     @type dht: L{interfaces.IDHT}
613     @ivar dht: the DHT instance
614     @type clients: C{dictionary}
615     @ivar clients: the available peers that have been previously contacted
616     """
617
618     def __init__(self, cache_dir, dht):
619         """Initialize the instance."""
620         self.cache_dir = cache_dir
621         self.cache_dir.restat(False)
622         if not self.cache_dir.exists():
623             self.cache_dir.makedirs()
624         self.dht = dht
625         self.clients = {}
626         
627     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
628         """Download from a list of peers or fallback to a mirror.
629         
630         @type hash: L{Hash.HashObject}
631         @param hash: the hash object containing the expected hash for the file
632         @param mirror: the URI of the file on the mirror
633         @type peers: C{list} of C{string}
634         @param peers: a list of the peer info where the file can be found
635             (optional, defaults to downloading from the mirror)
636         @type method: C{string}
637         @param method: the HTTP method to use, 'GET' or 'HEAD'
638             (optional, defaults to 'GET')
639         @type modtime: C{int}
640         @param modtime: the modification time to use for an 'If-Modified-Since'
641             header, as seconds since the epoch
642             (optional, defaults to not sending that header)
643         """
644         if not peers or method != "GET" or modtime is not None:
645             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
646             parsed = urlparse(mirror)
647             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
648             site = splitHostPort(parsed[0], parsed[1])
649             path = urlunparse(('', '') + parsed[2:])
650             peer = self.getPeer(site)
651             return peer.get(path, method, modtime)
652         elif len(peers) == 1:
653             site = uncompact(peers[0]['c'])
654             log.msg('Downloading from peer %r' % (site, ))
655             path = '/~/' + quote_plus(hash.expected())
656             peer = self.getPeer(site)
657             return peer.get(path)
658         else:
659             tmpfile = self.cache_dir.child(hash.hexexpected())
660             return FileDownload(self, hash, mirror, peers, tmpfile).run()
661         
662     def getPeer(self, site):
663         """Create a new peer if necessary and return it.
664         
665         @type site: (C{string}, C{int})
666         @param site: the IP address and port of the peer
667         """
668         if site not in self.clients:
669             self.clients[site] = Peer(site[0], site[1])
670         return self.clients[site]
671     
672     def close(self):
673         """Close all the connections to peers."""
674         for site in self.clients:
675             self.clients[site].close()
676         self.clients = {}
677
678 class TestPeerManager(unittest.TestCase):
679     """Unit tests for the PeerManager."""
680     
681     manager = None
682     pending_calls = []
683     
684     def gotResp(self, resp, num, expect):
685         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
686         if expect is not None:
687             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
688         def print_(n):
689             pass
690         def printdone(n):
691             pass
692         stream.readStream(resp.stream, print_).addCallback(printdone)
693     
694     def test_download(self):
695         """Tests a normal download."""
696         self.manager = PeerManager()
697         self.timeout = 10
698         
699         host = 'www.ietf.org'
700         d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt')
701         d.addCallback(self.gotResp, 1, 1070)
702         return d
703         
704     def test_head(self):
705         """Tests a 'HEAD' request."""
706         self.manager = PeerManager()
707         self.timeout = 10
708         
709         host = 'www.ietf.org'
710         d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt', method = "HEAD")
711         d.addCallback(self.gotResp, 1, 0)
712         return d
713         
714     def test_multiple_downloads(self):
715         """Tests multiple downloads with queueing and connection closing."""
716         self.manager = PeerManager()
717         self.timeout = 120
718         lastDefer = defer.Deferred()
719         
720         def newRequest(host, path, num, expect, last=False):
721             d = self.manager.get('', 'http://' + host + ':' + str(80) + path)
722             d.addCallback(self.gotResp, num, expect)
723             if last:
724                 d.addBoth(lastDefer.callback)
725                 
726         newRequest('www.ietf.org', "/rfc/rfc0006.txt", 1, 1776)
727         newRequest('www.ietf.org', "/rfc/rfc2362.txt", 2, 159833)
728         newRequest('www.google.ca', "/", 3, None)
729         self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
730         self.pending_calls.append(reactor.callLater(10, newRequest, 'www.ietf.org', '/rfc/rfc0048.txt', 5, 41696))
731         self.pending_calls.append(reactor.callLater(30, newRequest, 'www.ietf.org', '/rfc/rfc0022.txt', 6, 4606))
732         self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
733         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0014.txt', 8, 27))
734         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0001.txt', 9, 21088))
735         self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
736         return lastDefer
737         
738     def tearDown(self):
739         for p in self.pending_calls:
740             if p.active():
741                 p.cancel()
742         self.pending_calls = []
743         if self.manager:
744             self.manager.close()
745             self.manager = None