Remove some unnecessary log messages and use better Exceptions.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
8 import sha
9
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
15
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20 from apt_p2p_conf import config
21
22
23 class PeerError(Exception):
24     """An error occurred downloading from peers."""
25     
26 class GrowingFileStream(stream.FileStream):
27     """Modified to stream data from a file as it becomes available.
28     
29     @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
30     @ivar deferred: waiting for the result of the last read attempt
31     @ivar available: the number of bytes that are currently available to read
32     @ivar position: the current position in the file where the next read will begin
33     @ivar finished: True when no more data will be coming available
34     """
35
36     CHUNK_SIZE = 4*1024
37
38     def __init__(self, f, length = None):
39         stream.FileStream.__init__(self, f)
40         self.length = length
41         self.deferred = None
42         self.available = 0L
43         self.position = 0L
44         self.finished = False
45
46     def updateAvailable(self, newlyAvailable):
47         """Update the number of bytes that are available.
48         
49         Call it with 0 to trigger reading of a fully read file.
50         
51         @param newlyAvailable: the number of bytes that just became available
52         """
53         assert not self.finished
54         self.available += newlyAvailable
55         
56         # If a read is pending, let it go
57         if self.deferred and self.position < self.available:
58             # Try to read some data from the file
59             length = self.available - self.position
60             readSize = min(length, self.CHUNK_SIZE)
61             self.f.seek(self.position)
62             b = self.f.read(readSize)
63             bytesRead = len(b)
64             
65             # Check if end of file was reached
66             if bytesRead:
67                 self.position += bytesRead
68                 deferred = self.deferred
69                 self.deferred = None
70                 deferred.callback(b)
71
72     def allAvailable(self):
73         """Indicate that no more data will be coming available."""
74         self.finished = True
75
76         # If a read is pending, let it go
77         if self.deferred:
78             if self.position < self.available:
79                 # Try to read some data from the file
80                 length = self.available - self.position
81                 readSize = min(length, self.CHUNK_SIZE)
82                 self.f.seek(self.position)
83                 b = self.f.read(readSize)
84                 bytesRead = len(b)
85     
86                 # Check if end of file was reached
87                 if bytesRead:
88                     self.position += bytesRead
89                     deferred = self.deferred
90                     self.deferred = None
91                     deferred.callback(b)
92                 else:
93                     # We're done
94                     deferred = self.deferred
95                     self.deferred = None
96                     deferred.callback(None)
97             else:
98                 # We're done
99                 deferred = self.deferred
100                 self.deferred = None
101                 deferred.callback(None)
102         
103     def read(self, sendfile=False):
104         assert not self.deferred, "A previous read is still deferred."
105
106         if self.f is None:
107             return None
108
109         length = self.available - self.position
110         readSize = min(length, self.CHUNK_SIZE)
111
112         # If we don't have any available, we're done or deferred
113         if readSize <= 0:
114             if self.finished:
115                 return None
116             else:
117                 self.deferred = defer.Deferred()
118                 return self.deferred
119
120         # Try to read some data from the file
121         self.f.seek(self.position)
122         b = self.f.read(readSize)
123         bytesRead = len(b)
124         if not bytesRead:
125             # End of file was reached, we're done or deferred
126             if self.finished:
127                 return None
128             else:
129                 self.deferred = defer.Deferred()
130                 return self.deferred
131         else:
132             self.position += bytesRead
133             return b
134
135 class StreamToFile:
136     """Save a stream to a partial file and hash it.
137     
138     @type stream: L{twisted.web2.stream.IByteStream}
139     @ivar stream: the input stream being read
140     @type outFile: L{twisted.python.filepath.FilePath}
141     @ivar outFile: the file being written
142     @type hash: C{sha1}
143     @ivar hash: the hash object for the data
144     @type position: C{int}
145     @ivar position: the current file position to write the next data to
146     @type length: C{int}
147     @ivar length: the position in the file to not write beyond
148     @type doneDefer: L{twisted.internet.defer.Deferred}
149     @ivar doneDefer: the deferred that will fire when done writing
150     """
151     
152     def __init__(self, inputStream, outFile, start = 0, length = None):
153         """Initializes the file.
154         
155         @type inputStream: L{twisted.web2.stream.IByteStream}
156         @param inputStream: the input stream to read from
157         @type outFile: L{twisted.python.filepath.FilePath}
158         @param outFile: the file to write to
159         @type start: C{int}
160         @param start: the file position to start writing at
161             (optional, defaults to the start of the file)
162         @type length: C{int}
163         @param length: the maximum amount of data to write to the file
164             (optional, defaults to not limiting the writing to the file
165         """
166         self.stream = inputStream
167         self.outFile = outFile
168         self.hash = sha.new()
169         self.position = start
170         self.length = None
171         if length is not None:
172             self.length = start + length
173         self.doneDefer = None
174         
175     def run(self):
176         """Start the streaming.
177
178         @rtype: L{twisted.internet.defer.Deferred}
179         """
180         log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
181         self.doneDefer = stream.readStream(self.stream, self._gotData)
182         self.doneDefer.addCallbacks(self._done, self._error)
183         return self.doneDefer
184
185     def _gotData(self, data):
186         """Process the received data."""
187         if self.outFile.closed:
188             raise PeerError, "outFile was unexpectedly closed"
189         
190         if data is None:
191             raise PeerError, "Data is None?"
192         
193         # Make sure we don't go too far
194         if self.length is not None and self.position + len(data) > self.length:
195             data = data[:(self.length - self.position)]
196         
197         # Write and hash the streamed data
198         self.outFile.seek(self.position)
199         self.outFile.write(data)
200         self.hash.update(data)
201         self.position += len(data)
202         
203     def _done(self, result):
204         """Return the result."""
205         log.msg('Streaming is complete')
206         return self.hash.digest()
207     
208     def _error(self, err):
209         """Log the error."""
210         log.msg('Streaming error')
211         log.err(err)
212         return err
213     
214 class FileDownload:
215     """Manage a download from a list of peers or a mirror.
216     
217     @type manager: L{PeerManager}
218     @ivar manager: the manager to send requests for peers to
219     @type hash: L{Hash.HashObject}
220     @ivar hash: the hash object containing the expected hash for the file
221     @ivar mirror: the URI of the file on the mirror
222     @type compact_peers: C{list} of C{dictionary}
223     @ivar compact_peers: a list of the peer info where the file can be found
224     @type file: C{file}
225     @ivar file: the open file to right the download to
226     @type path: C{string}
227     @ivar path: the path to request from peers to access the file
228     @type pieces: C{list} of C{string} 
229     @ivar pieces: the hashes of the pieces in the file
230     @type started: C{boolean}
231     @ivar started: whether the download has begun yet
232     @type defer: L{twisted.internet.defer.Deferred}
233     @ivar defer: the deferred that will callback with the result of the download
234     @type peers: C{dictionary}
235     @ivar peers: information about each of the peers available to download from
236     @type outstanding: C{int}
237     @ivar outstanding: the number of requests to peers currently outstanding
238     @type peerlist: C{list} of L{HTTPDownloader.Peer}
239     @ivar peerlist: the sorted list of peers for this download
240     @type stream: L{GrowingFileStream}
241     @ivar stream: the stream of resulting data from the download
242     @type nextFinish: C{int}
243     @ivar nextFinish: the next piece that is needed to finish for the stream
244     @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
245     @ivar completePieces: one per piece, will be False if no requests are
246         outstanding for the piece, True if the piece has been successfully
247         downloaded, or the Peer that a request for this piece has been sent  
248     """
249     
250     def __init__(self, manager, hash, mirror, compact_peers, file):
251         """Initialize the instance and check for piece hashes.
252         
253         @type manager: L{PeerManager}
254         @param manager: the manager to send requests for peers to
255         @type hash: L{Hash.HashObject}
256         @param hash: the hash object containing the expected hash for the file
257         @param mirror: the URI of the file on the mirror
258         @type compact_peers: C{list} of C{dictionary}
259         @param compact_peers: a list of the peer info where the file can be found
260         @type file: L{twisted.python.filepath.FilePath}
261         @param file: the temporary file to use to store the downloaded file
262         """
263         self.manager = manager
264         self.hash = hash
265         self.mirror = mirror
266         self.compact_peers = compact_peers
267         
268         self.path = '/~/' + quote_plus(hash.expected())
269         self.mirror_path = None
270         self.pieces = None
271         self.started = False
272         
273         file.restat(False)
274         if file.exists():
275             file.remove()
276         self.file = file.open('w+')
277
278     def run(self):
279         """Start the downloading process."""
280         log.msg('Checking for pieces for %s' % self.path)
281         self.defer = defer.Deferred()
282         self.peers = {}
283         no_pieces = 0
284         pieces_string = {0: 0}
285         pieces_hash = {0: 0}
286         pieces_dl_hash = {0: 0}
287
288         for compact_peer in self.compact_peers:
289             # Build a list of all the peers for this download
290             site = uncompact(compact_peer['c'])
291             peer = self.manager.getPeer(site)
292             self.peers.setdefault(site, {})['peer'] = peer
293
294             # Extract any piece information from the peers list
295             if 't' in compact_peer:
296                 self.peers[site]['t'] = compact_peer['t']['t']
297                 pieces_string.setdefault(compact_peer['t']['t'], 0)
298                 pieces_string[compact_peer['t']['t']] += 1
299             elif 'h' in compact_peer:
300                 self.peers[site]['h'] = compact_peer['h']
301                 pieces_hash.setdefault(compact_peer['h'], 0)
302                 pieces_hash[compact_peer['h']] += 1
303             elif 'l' in compact_peer:
304                 self.peers[site]['l'] = compact_peer['l']
305                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
306                 pieces_dl_hash[compact_peer['l']] += 1
307             else:
308                 no_pieces += 1
309         
310         # Select the most popular piece info
311         max_found = max(no_pieces, max(pieces_string.values()),
312                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
313
314         if max_found < len(self.peers):
315             log.msg('Misleading piece information found, using most popular %d of %d peers' % 
316                     (max_found, len(self.peers)))
317
318         if max_found == no_pieces:
319             # The file is not split into pieces
320             log.msg('No pieces were found for the file')
321             self.pieces = [self.hash.expected()]
322             self.startDownload()
323         elif max_found == max(pieces_string.values()):
324             # Small number of pieces in a string
325             for pieces, num in pieces_string.items():
326                 # Find the most popular piece string
327                 if num == max_found:
328                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
329                     log.msg('Peer info contained %d piece hashes' % len(self.pieces))
330                     self.startDownload()
331                     break
332         elif max_found == max(pieces_hash.values()):
333             # Medium number of pieces stored in the DHT
334             for pieces, num in pieces_hash.items():
335                 # Find the most popular piece hash to lookup
336                 if num == max_found:
337                     log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
338                     self.getDHTPieces(pieces)
339                     break
340         elif max_found == max(pieces_dl_hash.values()):
341             # Large number of pieces stored in peers
342             for pieces, num in pieces_dl_hash.items():
343                 # Find the most popular piece hash to download
344                 if num == max_found:
345                     log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
346                     self.getPeerPieces(pieces)
347                     break
348         return self.defer
349
350     #{ Downloading the piece hashes
351     def getDHTPieces(self, key):
352         """Retrieve the piece information from the DHT.
353         
354         @param key: the key to lookup in the DHT
355         """
356         # Remove any peers with the wrong piece hash
357         #for site in self.peers.keys():
358         #    if self.peers[site].get('h', '') != key:
359         #        del self.peers[site]
360
361         # Start the DHT lookup
362         lookupDefer = self.manager.dht.getValue(key)
363         lookupDefer.addBoth(self._getDHTPieces, key)
364         
365     def _getDHTPieces(self, results, key):
366         """Check the retrieved values."""
367         if isinstance(results, list):
368             for result in results:
369                 # Make sure the hash matches the key
370                 result_hash = sha.new(result.get('t', '')).digest()
371                 if result_hash == key:
372                     pieces = result['t']
373                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
374                     log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
375                     self.startDownload()
376                     return
377                 
378             log.msg('Could not retrieve the piece hashes from the DHT')
379         else:
380             log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
381             
382         # Continue without the piece hashes
383         self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
384         self.startDownload()
385
386     def getPeerPieces(self, key, failedSite = None):
387         """Retrieve the piece information from the peers.
388         
389         @param key: the key to request from the peers
390         """
391         if failedSite is None:
392             log.msg('Starting the lookup of piece hashes in peers')
393             self.outstanding = 0
394             # Remove any peers with the wrong piece hash
395             #for site in self.peers.keys():
396             #    if self.peers[site].get('l', '') != key:
397             #        del self.peers[site]
398         else:
399             log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
400             self.peers[failedSite]['failed'] = True
401             self.outstanding -= 1
402
403         if self.pieces is None:
404             # Send a request to one or more peers
405             for site in self.peers:
406                 if self.peers[site].get('failed', False) != True:
407                     log.msg('Sending a piece hash request to %r' % (site, ))
408                     path = '/~/' + quote_plus(key)
409                     lookupDefer = self.peers[site]['peer'].get(path)
410                     reactor.callLater(0, lookupDefer.addCallbacks,
411                                       *(self._getPeerPieces, self._gotPeerError),
412                                       **{'callbackArgs': (key, site),
413                                          'errbackArgs': (key, site)})
414                     self.outstanding += 1
415                     if self.outstanding >= 4:
416                         break
417         
418         if self.pieces is None and self.outstanding <= 0:
419             # Continue without the piece hashes
420             log.msg('Could not retrieve the piece hashes from the peers')
421             self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
422             self.startDownload()
423         
424     def _getPeerPieces(self, response, key, site):
425         """Process the retrieved response from the peer."""
426         log.msg('Got a piece hash response %d from %r' % (response.code, site))
427         if response.code != 200:
428             # Request failed, try a different peer
429             self.getPeerPieces(key, site)
430         else:
431             # Read the response stream to a string
432             self.peers[site]['pieces'] = ''
433             def _gotPeerPiece(data, self = self, site = site):
434                 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
435                 self.peers[site]['pieces'] += data
436             log.msg('Streaming piece hashes from peer')
437             df = stream.readStream(response.stream, _gotPeerPiece)
438             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
439                             callbackArgs=(key, site), errbackArgs=(key, site))
440
441     def _gotPeerError(self, err, key, site):
442         """Peer failed, try again."""
443         log.msg('Peer piece hash request failed for %r' % (site, ))
444         log.err(err)
445         self.getPeerPieces(key, site)
446
447     def _gotPeerPieces(self, result, key, site):
448         """Check the retrieved pieces from the peer."""
449         log.msg('Finished streaming piece hashes from peer %r' % (site, ))
450         if self.pieces is not None:
451             # Already done
452             log.msg('Already done')
453             return
454         
455         try:
456             result = bdecode(self.peers[site]['pieces'])
457         except:
458             log.msg('Error bdecoding piece hashes')
459             log.err()
460             self.getPeerPieces(key, site)
461             return
462             
463         result_hash = sha.new(result.get('t', '')).digest()
464         if result_hash == key:
465             pieces = result['t']
466             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
467             log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
468             self.startDownload()
469         else:
470             log.msg('Peer returned a piece string that did not match')
471             self.getPeerPieces(key, site)
472
473     #{ Downloading the file
474     def sort(self):
475         """Sort the peers by their rank (highest ranked at the end)."""
476         def sort(a, b):
477             """Sort peers by their rank."""
478             if a.rank > b.rank:
479                 return 1
480             elif a.rank < b.rank:
481                 return -1
482             return 0
483         self.peerlist.sort(sort)
484
485     def startDownload(self):
486         """Start the download from the peers."""
487         # Don't start twice
488         if self.started:
489             return
490         
491         log.msg('Starting to download %s' % self.path)
492         self.started = True
493         assert self.pieces, "You must initialize the piece hashes first"
494         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
495         
496         # Use the mirror if there are few peers
497         if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
498             parsed = urlparse(self.mirror)
499             if parsed[0] == "http":
500                 site = splitHostPort(parsed[0], parsed[1])
501                 self.mirror_path = urlunparse(('', '') + parsed[2:])
502                 peer = self.manager.getPeer(site, mirror = True)
503                 self.peerlist.append(peer)
504         
505         # Special case if there's only one good peer left
506 #        if len(self.peerlist) == 1:
507 #            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
508 #            self.defer.callback(self.peerlist[0].get(self.path))
509 #            return
510         
511         # Start sending the return file
512         self.stream = GrowingFileStream(self.file, self.hash.expSize)
513         resp = Response(200, {}, self.stream)
514         self.defer.callback(resp)
515
516         # Begin to download the pieces
517         self.outstanding = 0
518         self.nextFinish = 0
519         self.completePieces = [False for piece in self.pieces]
520         self.getPieces()
521         
522     #{ Downloading the pieces
523     def getPieces(self):
524         """Download the next pieces from the peers."""
525         self.sort()
526         piece = self.nextFinish
527         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
528             if self.completePieces[piece] == False:
529                 # Send a request to the highest ranked peer
530                 peer = self.peerlist.pop()
531                 self.completePieces[piece] = peer
532                 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
533                 
534                 self.outstanding += 1
535                 if peer.mirror:
536                     df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
537                 else:
538                     df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
539                 reactor.callLater(0, df.addCallbacks,
540                                   *(self._getPiece, self._getError),
541                                   **{'callbackArgs': (piece, peer),
542                                      'errbackArgs': (piece, peer)})
543             piece += 1
544                 
545         # Check if we're done
546         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
547             log.msg('We seem to be done with all pieces')
548             self.stream.allAvailable()
549     
550     def _getPiece(self, response, piece, peer):
551         """Process the retrieved headers from the peer."""
552         log.msg('Got response for piece %d from peer %r' % (piece, peer))
553         if ((len(self.completePieces) > 1 and response.code != 206) or
554             (response.code not in (200, 206))):
555             # Request failed, try a different peer
556             log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
557             peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
558             self.completePieces[piece] = False
559             if response.stream and response.stream.length:
560                 stream.readAndDiscard(response.stream)
561         else:
562             # Read the response stream to the file
563             log.msg('Streaming piece %d from peer %r' % (piece, peer))
564             if response.code == 206:
565                 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
566                                   PIECE_SIZE).run()
567             else:
568                 df = StreamToFile(response.stream, self.file).run()
569             reactor.callLater(0, df.addCallbacks,
570                               *(self._gotPiece, self._gotError),
571                               **{'callbackArgs': (piece, peer),
572                                  'errbackArgs': (piece, peer)})
573
574         self.outstanding -= 1
575         self.peerlist.append(peer)
576         self.getPieces()
577
578     def _getError(self, err, piece, peer):
579         """Peer failed, try again."""
580         log.msg('Got error for piece %d from peer %r' % (piece, peer))
581         self.outstanding -= 1
582         self.peerlist.append(peer)
583         self.completePieces[piece] = False
584         self.getPieces()
585         log.err(err)
586
587     def _gotPiece(self, response, piece, peer):
588         """Process the retrieved piece from the peer."""
589         log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
590         if self.pieces[piece] and response != self.pieces[piece]:
591             # Hash doesn't match
592             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
593             peer.hashError('Piece received from peer does not match expected')
594             self.completePieces[piece] = False
595         else:
596             # Successfully completed one of several pieces
597             log.msg('Finished with piece %d from peer %r' % (piece, peer))
598             self.completePieces[piece] = True
599             while (self.nextFinish < len(self.completePieces) and
600                    self.completePieces[self.nextFinish] == True):
601                 self.nextFinish += 1
602                 self.stream.updateAvailable(PIECE_SIZE)
603
604         self.getPieces()
605
606     def _gotError(self, err, piece, peer):
607         """Piece download failed, try again."""
608         log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
609         log.err(err)
610         self.completePieces[piece] = False
611         self.getPieces()
612         
613 class PeerManager:
614     """Manage a set of peers and the requests to them.
615     
616     @type cache_dir: L{twisted.python.filepath.FilePath}
617     @ivar cache_dir: the directory to use for storing all files
618     @type dht: L{interfaces.IDHT}
619     @ivar dht: the DHT instance
620     @type stats: L{stats.StatsLogger}
621     @ivar stats: the statistics logger to record sent data to
622     @type clients: C{dictionary}
623     @ivar clients: the available peers that have been previously contacted
624     """
625
626     def __init__(self, cache_dir, dht, stats):
627         """Initialize the instance."""
628         self.cache_dir = cache_dir
629         self.cache_dir.restat(False)
630         if not self.cache_dir.exists():
631             self.cache_dir.makedirs()
632         self.dht = dht
633         self.stats = stats
634         self.clients = {}
635         
636     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
637         """Download from a list of peers or fallback to a mirror.
638         
639         @type hash: L{Hash.HashObject}
640         @param hash: the hash object containing the expected hash for the file
641         @param mirror: the URI of the file on the mirror
642         @type peers: C{list} of C{string}
643         @param peers: a list of the peer info where the file can be found
644             (optional, defaults to downloading from the mirror)
645         @type method: C{string}
646         @param method: the HTTP method to use, 'GET' or 'HEAD'
647             (optional, defaults to 'GET')
648         @type modtime: C{int}
649         @param modtime: the modification time to use for an 'If-Modified-Since'
650             header, as seconds since the epoch
651             (optional, defaults to not sending that header)
652         """
653         if not peers or method != "GET" or modtime is not None:
654             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
655             parsed = urlparse(mirror)
656             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
657             site = splitHostPort(parsed[0], parsed[1])
658             path = urlunparse(('', '') + parsed[2:])
659             peer = self.getPeer(site, mirror = True)
660             return peer.get(path, method, modtime)
661 #        elif len(peers) == 1:
662 #            site = uncompact(peers[0]['c'])
663 #            log.msg('Downloading from peer %r' % (site, ))
664 #            path = '/~/' + quote_plus(hash.expected())
665 #            peer = self.getPeer(site)
666 #            return peer.get(path)
667         else:
668             tmpfile = self.cache_dir.child(hash.hexexpected())
669             return FileDownload(self, hash, mirror, peers, tmpfile).run()
670         
671     def getPeer(self, site, mirror = False):
672         """Create a new peer if necessary and return it.
673         
674         @type site: (C{string}, C{int})
675         @param site: the IP address and port of the peer
676         @param mirror: whether the peer is actually a mirror
677             (optional, defaults to False)
678         """
679         if site not in self.clients:
680             self.clients[site] = Peer(site[0], site[1], self.stats)
681             if mirror:
682                 self.clients[site].mirror = True
683         return self.clients[site]
684     
685     def close(self):
686         """Close all the connections to peers."""
687         for site in self.clients:
688             self.clients[site].close()
689         self.clients = {}
690
691 class TestPeerManager(unittest.TestCase):
692     """Unit tests for the PeerManager."""
693     
694     manager = None
695     pending_calls = []
696     
697     def tearDown(self):
698         for p in self.pending_calls:
699             if p.active():
700                 p.cancel()
701         self.pending_calls = []
702         if self.manager:
703             self.manager.close()
704             self.manager = None