447e69759bcacd16f2d98f235447349479995776
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
8 import sha
9
10 from twisted.internet import reactor, defer
11 from twisted.python import log, filepath
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
15
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20 from apt_p2p_conf import config
21
22
23 class PeerError(Exception):
24     """An error occurred downloading from peers."""
25     
26 class GrowingFileStream(stream.FileStream):
27     """Modified to stream data from a file as it becomes available.
28     
29     @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
30     @ivar deferred: waiting for the result of the last read attempt
31     @ivar available: the number of bytes that are currently available to read
32     @ivar position: the current position in the file where the next read will begin
33     @ivar finished: True when no more data will be coming available
34     """
35
36     CHUNK_SIZE = 4*1024
37
38     def __init__(self, f, length = None):
39         stream.FileStream.__init__(self, f)
40         self.length = length
41         self.deferred = None
42         self.available = 0L
43         self.position = 0L
44         self.finished = False
45
46     def updateAvailable(self, newlyAvailable):
47         """Update the number of bytes that are available.
48         
49         Call it with 0 to trigger reading of a fully read file.
50         
51         @param newlyAvailable: the number of bytes that just became available
52         """
53         assert not self.finished
54         self.available += newlyAvailable
55         
56         # If a read is pending, let it go
57         if self.deferred and self.position < self.available:
58             # Try to read some data from the file
59             length = self.available - self.position
60             readSize = min(length, self.CHUNK_SIZE)
61             self.f.seek(self.position)
62             b = self.f.read(readSize)
63             bytesRead = len(b)
64             
65             # Check if end of file was reached
66             if bytesRead:
67                 self.position += bytesRead
68                 deferred = self.deferred
69                 self.deferred = None
70                 deferred.callback(b)
71
72     def allAvailable(self):
73         """Indicate that no more data will be coming available."""
74         self.finished = True
75
76         # If a read is pending, let it go
77         if self.deferred:
78             if self.position < self.available:
79                 # Try to read some data from the file
80                 length = self.available - self.position
81                 readSize = min(length, self.CHUNK_SIZE)
82                 self.f.seek(self.position)
83                 b = self.f.read(readSize)
84                 bytesRead = len(b)
85     
86                 # Check if end of file was reached
87                 if bytesRead:
88                     self.position += bytesRead
89                     deferred = self.deferred
90                     self.deferred = None
91                     deferred.callback(b)
92                 else:
93                     # We're done
94                     self._close()
95                     deferred = self.deferred
96                     self.deferred = None
97                     deferred.callback(None)
98             else:
99                 # We're done
100                 self._close()
101                 deferred = self.deferred
102                 self.deferred = None
103                 deferred.callback(None)
104         
105     def read(self, sendfile=False):
106         assert not self.deferred, "A previous read is still deferred."
107
108         if self.f is None:
109             return None
110
111         length = self.available - self.position
112         readSize = min(length, self.CHUNK_SIZE)
113
114         # If we don't have any available, we're done or deferred
115         if readSize <= 0:
116             if self.finished:
117                 self._close()
118                 return None
119             else:
120                 self.deferred = defer.Deferred()
121                 return self.deferred
122
123         # Try to read some data from the file
124         self.f.seek(self.position)
125         b = self.f.read(readSize)
126         bytesRead = len(b)
127         if not bytesRead:
128             # End of file was reached, we're done or deferred
129             if self.finished:
130                 self._close()
131                 return None
132             else:
133                 self.deferred = defer.Deferred()
134                 return self.deferred
135         else:
136             self.position += bytesRead
137             return b
138
139     def _close(self):
140         """Close the temporary file and remove it."""
141         self.f.close()
142         filepath.FilePath(self.f.name).remove()
143         self.f = None
144         
145 class StreamToFile:
146     """Save a stream to a partial file and hash it.
147     
148     @type stream: L{twisted.web2.stream.IByteStream}
149     @ivar stream: the input stream being read
150     @type outFile: L{twisted.python.filepath.FilePath}
151     @ivar outFile: the file being written
152     @type hash: C{sha1}
153     @ivar hash: the hash object for the data
154     @type position: C{int}
155     @ivar position: the current file position to write the next data to
156     @type length: C{int}
157     @ivar length: the position in the file to not write beyond
158     @type doneDefer: L{twisted.internet.defer.Deferred}
159     @ivar doneDefer: the deferred that will fire when done writing
160     """
161     
162     def __init__(self, inputStream, outFile, start = 0, length = None):
163         """Initializes the file.
164         
165         @type inputStream: L{twisted.web2.stream.IByteStream}
166         @param inputStream: the input stream to read from
167         @type outFile: L{twisted.python.filepath.FilePath}
168         @param outFile: the file to write to
169         @type start: C{int}
170         @param start: the file position to start writing at
171             (optional, defaults to the start of the file)
172         @type length: C{int}
173         @param length: the maximum amount of data to write to the file
174             (optional, defaults to not limiting the writing to the file
175         """
176         self.stream = inputStream
177         self.outFile = outFile
178         self.hash = sha.new()
179         self.position = start
180         self.length = None
181         if length is not None:
182             self.length = start + length
183         self.doneDefer = None
184         
185     def run(self):
186         """Start the streaming.
187
188         @rtype: L{twisted.internet.defer.Deferred}
189         """
190         log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
191         self.doneDefer = stream.readStream(self.stream, self._gotData)
192         self.doneDefer.addCallbacks(self._done, self._error)
193         return self.doneDefer
194
195     def _gotData(self, data):
196         """Process the received data."""
197         if self.outFile.closed:
198             raise PeerError, "outFile was unexpectedly closed"
199         
200         if data is None:
201             raise PeerError, "Data is None?"
202         
203         # Make sure we don't go too far
204         if self.length is not None and self.position + len(data) > self.length:
205             data = data[:(self.length - self.position)]
206         
207         # Write and hash the streamed data
208         self.outFile.seek(self.position)
209         self.outFile.write(data)
210         self.hash.update(data)
211         self.position += len(data)
212         
213     def _done(self, result):
214         """Return the result."""
215         log.msg('Streaming is complete')
216         return self.hash.digest()
217     
218     def _error(self, err):
219         """Log the error."""
220         log.msg('Streaming error')
221         log.err(err)
222         return err
223     
224 class FileDownload:
225     """Manage a download from a list of peers or a mirror.
226     
227     @type manager: L{PeerManager}
228     @ivar manager: the manager to send requests for peers to
229     @type hash: L{Hash.HashObject}
230     @ivar hash: the hash object containing the expected hash for the file
231     @ivar mirror: the URI of the file on the mirror
232     @type compact_peers: C{list} of C{dictionary}
233     @ivar compact_peers: a list of the peer info where the file can be found
234     @type file: C{file}
235     @ivar file: the open file to right the download to
236     @type path: C{string}
237     @ivar path: the path to request from peers to access the file
238     @type pieces: C{list} of C{string} 
239     @ivar pieces: the hashes of the pieces in the file
240     @type started: C{boolean}
241     @ivar started: whether the download has begun yet
242     @type defer: L{twisted.internet.defer.Deferred}
243     @ivar defer: the deferred that will callback with the result of the download
244     @type peers: C{dictionary}
245     @ivar peers: information about each of the peers available to download from
246     @type outstanding: C{int}
247     @ivar outstanding: the number of requests to peers currently outstanding
248     @type peerlist: C{list} of L{HTTPDownloader.Peer}
249     @ivar peerlist: the sorted list of peers for this download
250     @type stream: L{GrowingFileStream}
251     @ivar stream: the stream of resulting data from the download
252     @type nextFinish: C{int}
253     @ivar nextFinish: the next piece that is needed to finish for the stream
254     @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
255     @ivar completePieces: one per piece, will be False if no requests are
256         outstanding for the piece, True if the piece has been successfully
257         downloaded, or the Peer that a request for this piece has been sent  
258     """
259     
260     def __init__(self, manager, hash, mirror, compact_peers, file):
261         """Initialize the instance and check for piece hashes.
262         
263         @type manager: L{PeerManager}
264         @param manager: the manager to send requests for peers to
265         @type hash: L{Hash.HashObject}
266         @param hash: the hash object containing the expected hash for the file
267         @param mirror: the URI of the file on the mirror
268         @type compact_peers: C{list} of C{dictionary}
269         @param compact_peers: a list of the peer info where the file can be found
270         @type file: L{twisted.python.filepath.FilePath}
271         @param file: the temporary file to use to store the downloaded file
272         """
273         self.manager = manager
274         self.hash = hash
275         self.mirror = mirror
276         self.compact_peers = compact_peers
277         
278         self.path = '/~/' + quote_plus(hash.expected())
279         self.mirror_path = None
280         self.pieces = None
281         self.started = False
282         
283         file.restat(False)
284         if file.exists():
285             file.remove()
286         self.file = file.open('w+')
287
288     def run(self):
289         """Start the downloading process."""
290         log.msg('Checking for pieces for %s' % self.path)
291         self.defer = defer.Deferred()
292         self.peers = {}
293         no_pieces = 0
294         pieces_string = {0: 0}
295         pieces_hash = {0: 0}
296         pieces_dl_hash = {0: 0}
297
298         for compact_peer in self.compact_peers:
299             # Build a list of all the peers for this download
300             site = uncompact(compact_peer['c'])
301             peer = self.manager.getPeer(site)
302             self.peers.setdefault(site, {})['peer'] = peer
303
304             # Extract any piece information from the peers list
305             if 't' in compact_peer:
306                 self.peers[site]['t'] = compact_peer['t']['t']
307                 pieces_string.setdefault(compact_peer['t']['t'], 0)
308                 pieces_string[compact_peer['t']['t']] += 1
309             elif 'h' in compact_peer:
310                 self.peers[site]['h'] = compact_peer['h']
311                 pieces_hash.setdefault(compact_peer['h'], 0)
312                 pieces_hash[compact_peer['h']] += 1
313             elif 'l' in compact_peer:
314                 self.peers[site]['l'] = compact_peer['l']
315                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
316                 pieces_dl_hash[compact_peer['l']] += 1
317             else:
318                 no_pieces += 1
319         
320         # Select the most popular piece info
321         max_found = max(no_pieces, max(pieces_string.values()),
322                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
323
324         if max_found < len(self.peers):
325             log.msg('Misleading piece information found, using most popular %d of %d peers' % 
326                     (max_found, len(self.peers)))
327
328         if max_found == no_pieces:
329             # The file is not split into pieces
330             log.msg('No pieces were found for the file')
331             self.pieces = [self.hash.expected()]
332             self.startDownload()
333         elif max_found == max(pieces_string.values()):
334             # Small number of pieces in a string
335             for pieces, num in pieces_string.items():
336                 # Find the most popular piece string
337                 if num == max_found:
338                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
339                     log.msg('Peer info contained %d piece hashes' % len(self.pieces))
340                     self.startDownload()
341                     break
342         elif max_found == max(pieces_hash.values()):
343             # Medium number of pieces stored in the DHT
344             for pieces, num in pieces_hash.items():
345                 # Find the most popular piece hash to lookup
346                 if num == max_found:
347                     log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
348                     self.getDHTPieces(pieces)
349                     break
350         elif max_found == max(pieces_dl_hash.values()):
351             # Large number of pieces stored in peers
352             for pieces, num in pieces_dl_hash.items():
353                 # Find the most popular piece hash to download
354                 if num == max_found:
355                     log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
356                     self.getPeerPieces(pieces)
357                     break
358         return self.defer
359
360     #{ Downloading the piece hashes
361     def getDHTPieces(self, key):
362         """Retrieve the piece information from the DHT.
363         
364         @param key: the key to lookup in the DHT
365         """
366         # Remove any peers with the wrong piece hash
367         #for site in self.peers.keys():
368         #    if self.peers[site].get('h', '') != key:
369         #        del self.peers[site]
370
371         # Start the DHT lookup
372         lookupDefer = self.manager.dht.getValue(key)
373         lookupDefer.addBoth(self._getDHTPieces, key)
374         
375     def _getDHTPieces(self, results, key):
376         """Check the retrieved values."""
377         if isinstance(results, list):
378             for result in results:
379                 # Make sure the hash matches the key
380                 result_hash = sha.new(result.get('t', '')).digest()
381                 if result_hash == key:
382                     pieces = result['t']
383                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
384                     log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
385                     self.startDownload()
386                     return
387                 
388             log.msg('Could not retrieve the piece hashes from the DHT')
389         else:
390             log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
391             
392         # Continue without the piece hashes
393         self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
394         self.startDownload()
395
396     def getPeerPieces(self, key, failedSite = None):
397         """Retrieve the piece information from the peers.
398         
399         @param key: the key to request from the peers
400         """
401         if failedSite is None:
402             log.msg('Starting the lookup of piece hashes in peers')
403             self.outstanding = 0
404             # Remove any peers with the wrong piece hash
405             #for site in self.peers.keys():
406             #    if self.peers[site].get('l', '') != key:
407             #        del self.peers[site]
408         else:
409             log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
410             self.peers[failedSite]['failed'] = True
411             self.outstanding -= 1
412
413         if self.pieces is None:
414             # Send a request to one or more peers
415             for site in self.peers:
416                 if self.peers[site].get('failed', False) != True:
417                     log.msg('Sending a piece hash request to %r' % (site, ))
418                     path = '/~/' + quote_plus(key)
419                     lookupDefer = self.peers[site]['peer'].get(path)
420                     reactor.callLater(0, lookupDefer.addCallbacks,
421                                       *(self._getPeerPieces, self._gotPeerError),
422                                       **{'callbackArgs': (key, site),
423                                          'errbackArgs': (key, site)})
424                     self.outstanding += 1
425                     if self.outstanding >= 4:
426                         break
427         
428         if self.pieces is None and self.outstanding <= 0:
429             # Continue without the piece hashes
430             log.msg('Could not retrieve the piece hashes from the peers')
431             self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
432             self.startDownload()
433         
434     def _getPeerPieces(self, response, key, site):
435         """Process the retrieved response from the peer."""
436         log.msg('Got a piece hash response %d from %r' % (response.code, site))
437         if response.code != 200:
438             # Request failed, try a different peer
439             self.getPeerPieces(key, site)
440         else:
441             # Read the response stream to a string
442             self.peers[site]['pieces'] = ''
443             def _gotPeerPiece(data, self = self, site = site):
444                 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
445                 self.peers[site]['pieces'] += data
446             log.msg('Streaming piece hashes from peer')
447             df = stream.readStream(response.stream, _gotPeerPiece)
448             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
449                             callbackArgs=(key, site), errbackArgs=(key, site))
450
451     def _gotPeerError(self, err, key, site):
452         """Peer failed, try again."""
453         log.msg('Peer piece hash request failed for %r' % (site, ))
454         log.err(err)
455         self.getPeerPieces(key, site)
456
457     def _gotPeerPieces(self, result, key, site):
458         """Check the retrieved pieces from the peer."""
459         log.msg('Finished streaming piece hashes from peer %r' % (site, ))
460         if self.pieces is not None:
461             # Already done
462             log.msg('Already done')
463             return
464         
465         try:
466             result = bdecode(self.peers[site]['pieces'])
467         except:
468             log.msg('Error bdecoding piece hashes')
469             log.err()
470             self.getPeerPieces(key, site)
471             return
472             
473         result_hash = sha.new(result.get('t', '')).digest()
474         if result_hash == key:
475             pieces = result['t']
476             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
477             log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
478             self.startDownload()
479         else:
480             log.msg('Peer returned a piece string that did not match')
481             self.getPeerPieces(key, site)
482
483     #{ Downloading the file
484     def sort(self):
485         """Sort the peers by their rank (highest ranked at the end)."""
486         def sort(a, b):
487             """Sort peers by their rank."""
488             if a.rank > b.rank:
489                 return 1
490             elif a.rank < b.rank:
491                 return -1
492             return 0
493         self.peerlist.sort(sort)
494
495     def startDownload(self):
496         """Start the download from the peers."""
497         # Don't start twice
498         if self.started:
499             return
500         
501         log.msg('Starting to download %s' % self.path)
502         self.started = True
503         assert self.pieces, "You must initialize the piece hashes first"
504         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
505         
506         # Use the mirror if there are few peers
507         if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
508             parsed = urlparse(self.mirror)
509             if parsed[0] == "http":
510                 site = splitHostPort(parsed[0], parsed[1])
511                 self.mirror_path = urlunparse(('', '') + parsed[2:])
512                 peer = self.manager.getPeer(site, mirror = True)
513                 self.peerlist.append(peer)
514         
515         # Special case if there's only one good peer left
516 #        if len(self.peerlist) == 1:
517 #            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
518 #            self.defer.callback(self.peerlist[0].get(self.path))
519 #            return
520         
521         # Start sending the return file
522         self.stream = GrowingFileStream(self.file, self.hash.expSize)
523         resp = Response(200, {}, self.stream)
524         self.defer.callback(resp)
525
526         # Begin to download the pieces
527         self.outstanding = 0
528         self.nextFinish = 0
529         self.completePieces = [False for piece in self.pieces]
530         self.getPieces()
531         
532     #{ Downloading the pieces
533     def getPieces(self):
534         """Download the next pieces from the peers."""
535         self.sort()
536         piece = self.nextFinish
537         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
538             if self.completePieces[piece] == False:
539                 # Send a request to the highest ranked peer
540                 peer = self.peerlist.pop()
541                 self.completePieces[piece] = peer
542                 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
543                 
544                 self.outstanding += 1
545                 if peer.mirror:
546                     df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
547                 else:
548                     df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
549                 reactor.callLater(0, df.addCallbacks,
550                                   *(self._getPiece, self._getError),
551                                   **{'callbackArgs': (piece, peer),
552                                      'errbackArgs': (piece, peer)})
553             piece += 1
554                 
555         # Check if we're done
556         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
557             log.msg('We seem to be done with all pieces')
558             self.stream.allAvailable()
559     
560     def _getPiece(self, response, piece, peer):
561         """Process the retrieved headers from the peer."""
562         log.msg('Got response for piece %d from peer %r' % (piece, peer))
563         if ((len(self.completePieces) > 1 and response.code != 206) or
564             (response.code not in (200, 206))):
565             # Request failed, try a different peer
566             log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
567             peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
568             self.completePieces[piece] = False
569             if response.stream and response.stream.length:
570                 stream.readAndDiscard(response.stream)
571         else:
572             # Read the response stream to the file
573             log.msg('Streaming piece %d from peer %r' % (piece, peer))
574             if response.code == 206:
575                 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
576                                   PIECE_SIZE).run()
577             else:
578                 df = StreamToFile(response.stream, self.file).run()
579             reactor.callLater(0, df.addCallbacks,
580                               *(self._gotPiece, self._gotError),
581                               **{'callbackArgs': (piece, peer),
582                                  'errbackArgs': (piece, peer)})
583
584         self.outstanding -= 1
585         self.peerlist.append(peer)
586         self.getPieces()
587
588     def _getError(self, err, piece, peer):
589         """Peer failed, try again."""
590         log.msg('Got error for piece %d from peer %r' % (piece, peer))
591         self.outstanding -= 1
592         self.peerlist.append(peer)
593         self.completePieces[piece] = False
594         self.getPieces()
595         log.err(err)
596
597     def _gotPiece(self, response, piece, peer):
598         """Process the retrieved piece from the peer."""
599         log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
600         if self.pieces[piece] and response != self.pieces[piece]:
601             # Hash doesn't match
602             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
603             peer.hashError('Piece received from peer does not match expected')
604             self.completePieces[piece] = False
605         else:
606             # Successfully completed one of several pieces
607             log.msg('Finished with piece %d from peer %r' % (piece, peer))
608             self.completePieces[piece] = True
609             while (self.nextFinish < len(self.completePieces) and
610                    self.completePieces[self.nextFinish] == True):
611                 self.nextFinish += 1
612                 self.stream.updateAvailable(PIECE_SIZE)
613
614         self.getPieces()
615
616     def _gotError(self, err, piece, peer):
617         """Piece download failed, try again."""
618         log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
619         log.err(err)
620         self.completePieces[piece] = False
621         self.getPieces()
622         
623 class PeerManager:
624     """Manage a set of peers and the requests to them.
625     
626     @type cache_dir: L{twisted.python.filepath.FilePath}
627     @ivar cache_dir: the directory to use for storing all files
628     @type dht: L{interfaces.IDHT}
629     @ivar dht: the DHT instance
630     @type stats: L{stats.StatsLogger}
631     @ivar stats: the statistics logger to record sent data to
632     @type clients: C{dictionary}
633     @ivar clients: the available peers that have been previously contacted
634     """
635
636     def __init__(self, cache_dir, dht, stats):
637         """Initialize the instance."""
638         self.cache_dir = cache_dir
639         self.cache_dir.restat(False)
640         if not self.cache_dir.exists():
641             self.cache_dir.makedirs()
642         self.dht = dht
643         self.stats = stats
644         self.clients = {}
645         
646     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
647         """Download from a list of peers or fallback to a mirror.
648         
649         @type hash: L{Hash.HashObject}
650         @param hash: the hash object containing the expected hash for the file
651         @param mirror: the URI of the file on the mirror
652         @type peers: C{list} of C{string}
653         @param peers: a list of the peer info where the file can be found
654             (optional, defaults to downloading from the mirror)
655         @type method: C{string}
656         @param method: the HTTP method to use, 'GET' or 'HEAD'
657             (optional, defaults to 'GET')
658         @type modtime: C{int}
659         @param modtime: the modification time to use for an 'If-Modified-Since'
660             header, as seconds since the epoch
661             (optional, defaults to not sending that header)
662         """
663         if not peers or method != "GET" or modtime is not None:
664             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
665             parsed = urlparse(mirror)
666             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
667             site = splitHostPort(parsed[0], parsed[1])
668             path = urlunparse(('', '') + parsed[2:])
669             peer = self.getPeer(site, mirror = True)
670             return peer.get(path, method, modtime)
671 #        elif len(peers) == 1:
672 #            site = uncompact(peers[0]['c'])
673 #            log.msg('Downloading from peer %r' % (site, ))
674 #            path = '/~/' + quote_plus(hash.expected())
675 #            peer = self.getPeer(site)
676 #            return peer.get(path)
677         else:
678             tmpfile = self.cache_dir.child(hash.hexexpected())
679             return FileDownload(self, hash, mirror, peers, tmpfile).run()
680         
681     def getPeer(self, site, mirror = False):
682         """Create a new peer if necessary and return it.
683         
684         @type site: (C{string}, C{int})
685         @param site: the IP address and port of the peer
686         @param mirror: whether the peer is actually a mirror
687             (optional, defaults to False)
688         """
689         if site not in self.clients:
690             self.clients[site] = Peer(site[0], site[1], self.stats)
691             if mirror:
692                 self.clients[site].mirror = True
693         return self.clients[site]
694     
695     def close(self):
696         """Close all the connections to peers."""
697         for site in self.clients:
698             self.clients[site].close()
699         self.clients = {}
700
701 class TestPeerManager(unittest.TestCase):
702     """Unit tests for the PeerManager."""
703     
704     manager = None
705     pending_calls = []
706     
707     def tearDown(self):
708         for p in self.pending_calls:
709             if p.active():
710                 p.cancel()
711         self.pending_calls = []
712         if self.manager:
713             self.manager.close()
714             self.manager = None