Try to rejoin DHT periodically after failures using exponential backoff.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
8 import sha
9
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
15
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20 from apt_p2p_conf import config
21
22 class GrowingFileStream(stream.FileStream):
23     """Modified to stream data from a file as it becomes available.
24     
25     @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
26     @ivar deferred: waiting for the result of the last read attempt
27     @ivar available: the number of bytes that are currently available to read
28     @ivar position: the current position in the file where the next read will begin
29     @ivar finished: True when no more data will be coming available
30     """
31
32     CHUNK_SIZE = 4*1024
33
34     def __init__(self, f, length = None):
35         stream.FileStream.__init__(self, f)
36         self.length = length
37         self.deferred = None
38         self.available = 0L
39         self.position = 0L
40         self.finished = False
41
42     def updateAvailable(self, newlyAvailable):
43         """Update the number of bytes that are available.
44         
45         Call it with 0 to trigger reading of a fully read file.
46         
47         @param newlyAvailable: the number of bytes that just became available
48         """
49         assert not self.finished
50         self.available += newlyAvailable
51         
52         # If a read is pending, let it go
53         if self.deferred and self.position < self.available:
54             # Try to read some data from the file
55             length = self.available - self.position
56             readSize = min(length, self.CHUNK_SIZE)
57             self.f.seek(self.position)
58             b = self.f.read(readSize)
59             bytesRead = len(b)
60             
61             # Check if end of file was reached
62             if bytesRead:
63                 self.position += bytesRead
64                 deferred = self.deferred
65                 self.deferred = None
66                 deferred.callback(b)
67
68     def allAvailable(self):
69         """Indicate that no more data will be coming available."""
70         self.finished = True
71
72         # If a read is pending, let it go
73         if self.deferred:
74             if self.position < self.available:
75                 # Try to read some data from the file
76                 length = self.available - self.position
77                 readSize = min(length, self.CHUNK_SIZE)
78                 self.f.seek(self.position)
79                 b = self.f.read(readSize)
80                 bytesRead = len(b)
81     
82                 # Check if end of file was reached
83                 if bytesRead:
84                     self.position += bytesRead
85                     deferred = self.deferred
86                     self.deferred = None
87                     deferred.callback(b)
88                 else:
89                     # We're done
90                     deferred = self.deferred
91                     self.deferred = None
92                     deferred.callback(None)
93             else:
94                 # We're done
95                 deferred = self.deferred
96                 self.deferred = None
97                 deferred.callback(None)
98         
99     def read(self, sendfile=False):
100         assert not self.deferred, "A previous read is still deferred."
101
102         if self.f is None:
103             return None
104
105         length = self.available - self.position
106         readSize = min(length, self.CHUNK_SIZE)
107
108         # If we don't have any available, we're done or deferred
109         if readSize <= 0:
110             if self.finished:
111                 return None
112             else:
113                 self.deferred = defer.Deferred()
114                 return self.deferred
115
116         # Try to read some data from the file
117         self.f.seek(self.position)
118         b = self.f.read(readSize)
119         bytesRead = len(b)
120         if not bytesRead:
121             # End of file was reached, we're done or deferred
122             if self.finished:
123                 return None
124             else:
125                 self.deferred = defer.Deferred()
126                 return self.deferred
127         else:
128             self.position += bytesRead
129             return b
130
131 class StreamToFile:
132     """Save a stream to a partial file and hash it.
133     
134     @type stream: L{twisted.web2.stream.IByteStream}
135     @ivar stream: the input stream being read
136     @type outFile: L{twisted.python.filepath.FilePath}
137     @ivar outFile: the file being written
138     @type hash: C{sha1}
139     @ivar hash: the hash object for the data
140     @type position: C{int}
141     @ivar position: the current file position to write the next data to
142     @type length: C{int}
143     @ivar length: the position in the file to not write beyond
144     @type doneDefer: L{twisted.internet.defer.Deferred}
145     @ivar doneDefer: the deferred that will fire when done writing
146     """
147     
148     def __init__(self, inputStream, outFile, start = 0, length = None):
149         """Initializes the file.
150         
151         @type inputStream: L{twisted.web2.stream.IByteStream}
152         @param inputStream: the input stream to read from
153         @type outFile: L{twisted.python.filepath.FilePath}
154         @param outFile: the file to write to
155         @type start: C{int}
156         @param start: the file position to start writing at
157             (optional, defaults to the start of the file)
158         @type length: C{int}
159         @param length: the maximum amount of data to write to the file
160             (optional, defaults to not limiting the writing to the file
161         """
162         self.stream = inputStream
163         self.outFile = outFile
164         self.hash = sha.new()
165         self.position = start
166         self.length = None
167         if length is not None:
168             self.length = start + length
169         self.doneDefer = None
170         
171     def run(self):
172         """Start the streaming.
173
174         @rtype: L{twisted.internet.defer.Deferred}
175         """
176         log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
177         self.doneDefer = stream.readStream(self.stream, self._gotData)
178         self.doneDefer.addCallbacks(self._done, self._error)
179         return self.doneDefer
180
181     def _gotData(self, data):
182         """Process the received data."""
183         if self.outFile.closed:
184             raise Exception, "outFile was unexpectedly closed"
185         
186         if data is None:
187             raise Exception, "Data is None?"
188         
189         # Make sure we don't go too far
190         if self.length is not None and self.position + len(data) > self.length:
191             data = data[:(self.length - self.position)]
192         
193         # Write and hash the streamed data
194         self.outFile.seek(self.position)
195         self.outFile.write(data)
196         self.hash.update(data)
197         self.position += len(data)
198         
199     def _done(self, result):
200         """Return the result."""
201         log.msg('Streaming is complete')
202         return self.hash.digest()
203     
204     def _error(self, err):
205         """Log the error."""
206         log.msg('Streaming error')
207         log.err(err)
208         return err
209     
210 class FileDownload:
211     """Manage a download from a list of peers or a mirror.
212     
213     @type manager: L{PeerManager}
214     @ivar manager: the manager to send requests for peers to
215     @type hash: L{Hash.HashObject}
216     @ivar hash: the hash object containing the expected hash for the file
217     @ivar mirror: the URI of the file on the mirror
218     @type compact_peers: C{list} of C{dictionary}
219     @ivar compact_peers: a list of the peer info where the file can be found
220     @type file: C{file}
221     @ivar file: the open file to right the download to
222     @type path: C{string}
223     @ivar path: the path to request from peers to access the file
224     @type pieces: C{list} of C{string} 
225     @ivar pieces: the hashes of the pieces in the file
226     @type started: C{boolean}
227     @ivar started: whether the download has begun yet
228     @type defer: L{twisted.internet.defer.Deferred}
229     @ivar defer: the deferred that will callback with the result of the download
230     @type peers: C{dictionary}
231     @ivar peers: information about each of the peers available to download from
232     @type outstanding: C{int}
233     @ivar outstanding: the number of requests to peers currently outstanding
234     @type peerlist: C{list} of L{HTTPDownloader.Peer}
235     @ivar peerlist: the sorted list of peers for this download
236     @type stream: L{GrowingFileStream}
237     @ivar stream: the stream of resulting data from the download
238     @type nextFinish: C{int}
239     @ivar nextFinish: the next piece that is needed to finish for the stream
240     @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
241     @ivar completePieces: one per piece, will be False if no requests are
242         outstanding for the piece, True if the piece has been successfully
243         downloaded, or the Peer that a request for this piece has been sent  
244     """
245     
246     def __init__(self, manager, hash, mirror, compact_peers, file):
247         """Initialize the instance and check for piece hashes.
248         
249         @type manager: L{PeerManager}
250         @param manager: the manager to send requests for peers to
251         @type hash: L{Hash.HashObject}
252         @param hash: the hash object containing the expected hash for the file
253         @param mirror: the URI of the file on the mirror
254         @type compact_peers: C{list} of C{dictionary}
255         @param compact_peers: a list of the peer info where the file can be found
256         @type file: L{twisted.python.filepath.FilePath}
257         @param file: the temporary file to use to store the downloaded file
258         """
259         self.manager = manager
260         self.hash = hash
261         self.mirror = mirror
262         self.compact_peers = compact_peers
263         
264         self.path = '/~/' + quote_plus(hash.expected())
265         self.mirror_path = None
266         self.pieces = None
267         self.started = False
268         
269         file.restat(False)
270         if file.exists():
271             file.remove()
272         self.file = file.open('w+')
273
274     def run(self):
275         """Start the downloading process."""
276         log.msg('Checking for pieces for %s' % self.path)
277         self.defer = defer.Deferred()
278         self.peers = {}
279         no_pieces = 0
280         pieces_string = {0: 0}
281         pieces_hash = {0: 0}
282         pieces_dl_hash = {0: 0}
283
284         for compact_peer in self.compact_peers:
285             # Build a list of all the peers for this download
286             site = uncompact(compact_peer['c'])
287             peer = self.manager.getPeer(site)
288             self.peers.setdefault(site, {})['peer'] = peer
289
290             # Extract any piece information from the peers list
291             if 't' in compact_peer:
292                 self.peers[site]['t'] = compact_peer['t']['t']
293                 pieces_string.setdefault(compact_peer['t']['t'], 0)
294                 pieces_string[compact_peer['t']['t']] += 1
295             elif 'h' in compact_peer:
296                 self.peers[site]['h'] = compact_peer['h']
297                 pieces_hash.setdefault(compact_peer['h'], 0)
298                 pieces_hash[compact_peer['h']] += 1
299             elif 'l' in compact_peer:
300                 self.peers[site]['l'] = compact_peer['l']
301                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
302                 pieces_dl_hash[compact_peer['l']] += 1
303             else:
304                 no_pieces += 1
305         
306         # Select the most popular piece info
307         max_found = max(no_pieces, max(pieces_string.values()),
308                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
309
310         if max_found < len(self.peers):
311             log.msg('Misleading piece information found, using most popular %d of %d peers' % 
312                     (max_found, len(self.peers)))
313
314         if max_found == no_pieces:
315             # The file is not split into pieces
316             log.msg('No pieces were found for the file')
317             self.pieces = [self.hash.expected()]
318             self.startDownload()
319         elif max_found == max(pieces_string.values()):
320             # Small number of pieces in a string
321             for pieces, num in pieces_string.items():
322                 # Find the most popular piece string
323                 if num == max_found:
324                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
325                     log.msg('Peer info contained %d piece hashes' % len(self.pieces))
326                     self.startDownload()
327                     break
328         elif max_found == max(pieces_hash.values()):
329             # Medium number of pieces stored in the DHT
330             for pieces, num in pieces_hash.items():
331                 # Find the most popular piece hash to lookup
332                 if num == max_found:
333                     log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
334                     self.getDHTPieces(pieces)
335                     break
336         elif max_found == max(pieces_dl_hash.values()):
337             # Large number of pieces stored in peers
338             for pieces, num in pieces_dl_hash.items():
339                 # Find the most popular piece hash to download
340                 if num == max_found:
341                     log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
342                     self.getPeerPieces(pieces)
343                     break
344         return self.defer
345
346     #{ Downloading the piece hashes
347     def getDHTPieces(self, key):
348         """Retrieve the piece information from the DHT.
349         
350         @param key: the key to lookup in the DHT
351         """
352         # Remove any peers with the wrong piece hash
353         #for site in self.peers.keys():
354         #    if self.peers[site].get('h', '') != key:
355         #        del self.peers[site]
356
357         # Start the DHT lookup
358         lookupDefer = self.manager.dht.getValue(key)
359         lookupDefer.addBoth(self._getDHTPieces, key)
360         
361     def _getDHTPieces(self, results, key):
362         """Check the retrieved values."""
363         if isinstance(results, list):
364             for result in results:
365                 # Make sure the hash matches the key
366                 result_hash = sha.new(result.get('t', '')).digest()
367                 if result_hash == key:
368                     pieces = result['t']
369                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
370                     log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
371                     self.startDownload()
372                     return
373                 
374             log.msg('Could not retrieve the piece hashes from the DHT')
375         else:
376             log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
377             
378         # Continue without the piece hashes
379         self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
380         self.startDownload()
381
382     def getPeerPieces(self, key, failedSite = None):
383         """Retrieve the piece information from the peers.
384         
385         @param key: the key to request from the peers
386         """
387         if failedSite is None:
388             log.msg('Starting the lookup of piece hashes in peers')
389             self.outstanding = 0
390             # Remove any peers with the wrong piece hash
391             #for site in self.peers.keys():
392             #    if self.peers[site].get('l', '') != key:
393             #        del self.peers[site]
394         else:
395             log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
396             self.peers[failedSite]['failed'] = True
397             self.outstanding -= 1
398
399         if self.pieces is None:
400             # Send a request to one or more peers
401             log.msg('Checking for a peer to request piece hashes from')
402             for site in self.peers:
403                 if self.peers[site].get('failed', False) != True:
404                     log.msg('Sending a piece hash request to %r' % (site, ))
405                     path = '/~/' + quote_plus(key)
406                     lookupDefer = self.peers[site]['peer'].get(path)
407                     reactor.callLater(0, lookupDefer.addCallbacks,
408                                       *(self._getPeerPieces, self._gotPeerError),
409                                       **{'callbackArgs': (key, site),
410                                          'errbackArgs': (key, site)})
411                     self.outstanding += 1
412                     if self.outstanding >= 4:
413                         break
414         
415         log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
416         if self.pieces is None and self.outstanding <= 0:
417             # Continue without the piece hashes
418             log.msg('Could not retrieve the piece hashes from the peers')
419             self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
420             self.startDownload()
421         
422     def _getPeerPieces(self, response, key, site):
423         """Process the retrieved response from the peer."""
424         log.msg('Got a piece hash response %d from %r' % (response.code, site))
425         if response.code != 200:
426             # Request failed, try a different peer
427             log.msg('Did not like response %d from %r' % (response.code, site))
428             self.getPeerPieces(key, site)
429         else:
430             # Read the response stream to a string
431             self.peers[site]['pieces'] = ''
432             def _gotPeerPiece(data, self = self, site = site):
433                 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
434                 self.peers[site]['pieces'] += data
435             log.msg('Streaming piece hashes from peer')
436             df = stream.readStream(response.stream, _gotPeerPiece)
437             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
438                             callbackArgs=(key, site), errbackArgs=(key, site))
439
440     def _gotPeerError(self, err, key, site):
441         """Peer failed, try again."""
442         log.msg('Peer piece hash request failed for %r' % (site, ))
443         log.err(err)
444         self.getPeerPieces(key, site)
445
446     def _gotPeerPieces(self, result, key, site):
447         """Check the retrieved pieces from the peer."""
448         log.msg('Finished streaming piece hashes from peer %r' % (site, ))
449         if self.pieces is not None:
450             # Already done
451             log.msg('Already done')
452             return
453         
454         try:
455             result = bdecode(self.peers[site]['pieces'])
456         except:
457             log.msg('Error bdecoding piece hashes')
458             log.err()
459             self.getPeerPieces(key, site)
460             return
461             
462         result_hash = sha.new(result.get('t', '')).digest()
463         if result_hash == key:
464             pieces = result['t']
465             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
466             log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
467             self.startDownload()
468         else:
469             log.msg('Peer returned a piece string that did not match')
470             self.getPeerPieces(key, site)
471
472     #{ Downloading the file
473     def sort(self):
474         """Sort the peers by their rank (highest ranked at the end)."""
475         def sort(a, b):
476             """Sort peers by their rank."""
477             if a.rank > b.rank:
478                 return 1
479             elif a.rank < b.rank:
480                 return -1
481             return 0
482         self.peerlist.sort(sort)
483
484     def startDownload(self):
485         """Start the download from the peers."""
486         # Don't start twice
487         if self.started:
488             return
489         
490         log.msg('Starting to download %s' % self.path)
491         self.started = True
492         assert self.pieces, "You must initialize the piece hashes first"
493         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
494         
495         # Use the mirror if there are few peers
496         if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
497             parsed = urlparse(self.mirror)
498             if parsed[0] == "http":
499                 site = splitHostPort(parsed[0], parsed[1])
500                 self.mirror_path = urlunparse(('', '') + parsed[2:])
501                 peer = self.manager.getPeer(site)
502                 peer.mirror = True
503                 self.peerlist.append(peer)
504         
505         # Special case if there's only one good peer left
506 #        if len(self.peerlist) == 1:
507 #            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
508 #            self.defer.callback(self.peerlist[0].get(self.path))
509 #            return
510         
511         # Start sending the return file
512         self.stream = GrowingFileStream(self.file, self.hash.expSize)
513         resp = Response(200, {}, self.stream)
514         self.defer.callback(resp)
515
516         # Begin to download the pieces
517         self.outstanding = 0
518         self.nextFinish = 0
519         self.completePieces = [False for piece in self.pieces]
520         self.getPieces()
521         
522     #{ Downloading the pieces
523     def getPieces(self):
524         """Download the next pieces from the peers."""
525         log.msg('Checking for more piece requests to send')
526         self.sort()
527         piece = self.nextFinish
528         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
529             log.msg('Checking piece %d' % piece)
530             if self.completePieces[piece] == False:
531                 # Send a request to the highest ranked peer
532                 peer = self.peerlist.pop()
533                 self.completePieces[piece] = peer
534                 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
535                 
536                 self.outstanding += 1
537                 if peer.mirror:
538                     df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
539                 else:
540                     df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
541                 reactor.callLater(0, df.addCallbacks,
542                                   *(self._getPiece, self._getError),
543                                   **{'callbackArgs': (piece, peer),
544                                      'errbackArgs': (piece, peer)})
545             piece += 1
546                 
547         log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
548         # Check if we're done
549         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
550             log.msg('We seem to be done with all pieces')
551             self.stream.allAvailable()
552     
553     def _getPiece(self, response, piece, peer):
554         """Process the retrieved headers from the peer."""
555         log.msg('Got response for piece %d from peer %r' % (piece, peer))
556         if ((len(self.completePieces) > 1 and response.code != 206) or
557             (response.code not in (200, 206))):
558             # Request failed, try a different peer
559             log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
560             peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
561             self.completePieces[piece] = False
562             if response.stream and response.stream.length:
563                 stream.readAndDiscard(response.stream)
564         else:
565             # Read the response stream to the file
566             log.msg('Streaming piece %d from peer %r' % (piece, peer))
567             if response.code == 206:
568                 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
569             else:
570                 df = StreamToFile(response.stream, self.file).run()
571             df.addCallbacks(self._gotPiece, self._gotError,
572                             callbackArgs=(piece, peer), errbackArgs=(piece, peer))
573
574         self.outstanding -= 1
575         self.peerlist.append(peer)
576         self.getPieces()
577
578     def _getError(self, err, piece, peer):
579         """Peer failed, try again."""
580         log.msg('Got error for piece %d from peer %r' % (piece, peer))
581         self.outstanding -= 1
582         self.peerlist.append(peer)
583         self.completePieces[piece] = False
584         self.getPieces()
585         log.err(err)
586
587     def _gotPiece(self, response, piece, peer):
588         """Process the retrieved piece from the peer."""
589         log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
590         if self.pieces[piece] and response != self.pieces[piece]:
591             # Hash doesn't match
592             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
593             peer.hashError('Piece received from peer does not match expected')
594             self.completePieces[piece] = False
595         else:
596             # Successfully completed one of several pieces
597             log.msg('Finished with piece %d from peer %r' % (piece, peer))
598             self.completePieces[piece] = True
599             while (self.nextFinish < len(self.completePieces) and
600                    self.completePieces[self.nextFinish] == True):
601                 self.nextFinish += 1
602                 self.stream.updateAvailable(PIECE_SIZE)
603
604         self.getPieces()
605
606     def _gotError(self, err, piece, peer):
607         """Piece download failed, try again."""
608         log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
609         log.err(err)
610         self.completePieces[piece] = False
611         self.getPieces()
612         
613 class PeerManager:
614     """Manage a set of peers and the requests to them.
615     
616     @type cache_dir: L{twisted.python.filepath.FilePath}
617     @ivar cache_dir: the directory to use for storing all files
618     @type dht: L{interfaces.IDHT}
619     @ivar dht: the DHT instance
620     @type clients: C{dictionary}
621     @ivar clients: the available peers that have been previously contacted
622     """
623
624     def __init__(self, cache_dir, dht):
625         """Initialize the instance."""
626         self.cache_dir = cache_dir
627         self.cache_dir.restat(False)
628         if not self.cache_dir.exists():
629             self.cache_dir.makedirs()
630         self.dht = dht
631         self.clients = {}
632         
633     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
634         """Download from a list of peers or fallback to a mirror.
635         
636         @type hash: L{Hash.HashObject}
637         @param hash: the hash object containing the expected hash for the file
638         @param mirror: the URI of the file on the mirror
639         @type peers: C{list} of C{string}
640         @param peers: a list of the peer info where the file can be found
641             (optional, defaults to downloading from the mirror)
642         @type method: C{string}
643         @param method: the HTTP method to use, 'GET' or 'HEAD'
644             (optional, defaults to 'GET')
645         @type modtime: C{int}
646         @param modtime: the modification time to use for an 'If-Modified-Since'
647             header, as seconds since the epoch
648             (optional, defaults to not sending that header)
649         """
650         if not peers or method != "GET" or modtime is not None:
651             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
652             parsed = urlparse(mirror)
653             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
654             site = splitHostPort(parsed[0], parsed[1])
655             path = urlunparse(('', '') + parsed[2:])
656             peer = self.getPeer(site)
657             peer.mirror = True
658             return peer.get(path, method, modtime)
659 #        elif len(peers) == 1:
660 #            site = uncompact(peers[0]['c'])
661 #            log.msg('Downloading from peer %r' % (site, ))
662 #            path = '/~/' + quote_plus(hash.expected())
663 #            peer = self.getPeer(site)
664 #            return peer.get(path)
665         else:
666             tmpfile = self.cache_dir.child(hash.hexexpected())
667             return FileDownload(self, hash, mirror, peers, tmpfile).run()
668         
669     def getPeer(self, site):
670         """Create a new peer if necessary and return it.
671         
672         @type site: (C{string}, C{int})
673         @param site: the IP address and port of the peer
674         """
675         if site not in self.clients:
676             self.clients[site] = Peer(site[0], site[1])
677         return self.clients[site]
678     
679     def close(self):
680         """Close all the connections to peers."""
681         for site in self.clients:
682             self.clients[site].close()
683         self.clients = {}
684
685 class TestPeerManager(unittest.TestCase):
686     """Unit tests for the PeerManager."""
687     
688     manager = None
689     pending_calls = []
690     
691     def tearDown(self):
692         for p in self.pending_calls:
693             if p.active():
694                 p.cancel()
695         self.pending_calls = []
696         if self.manager:
697             self.manager.close()
698             self.manager = None