31d926168954c953c4ef3a6a314462c34a8a4693
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
8 import sha
9
10 from twisted.internet import reactor, defer
11 from twisted.python import log, filepath
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
15
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20 from apt_p2p_conf import config
21
22
23 class PeerError(Exception):
24     """An error occurred downloading from peers."""
25     
26 class GrowingFileStream(stream.FileStream):
27     """Modified to stream data from a file as it becomes available.
28     
29     @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
30     @ivar deferred: waiting for the result of the last read attempt
31     @ivar available: the number of bytes that are currently available to read
32     @ivar position: the current position in the file where the next read will begin
33     @ivar finished: True when no more data will be coming available
34     """
35
36     CHUNK_SIZE = 4*1024
37
38     def __init__(self, f, length = None):
39         stream.FileStream.__init__(self, f)
40         self.length = length
41         self.deferred = None
42         self.available = 0L
43         self.position = 0L
44         self.finished = False
45
46     def updateAvailable(self, newlyAvailable):
47         """Update the number of bytes that are available.
48         
49         Call it with 0 to trigger reading of a fully read file.
50         
51         @param newlyAvailable: the number of bytes that just became available
52         """
53         assert not self.finished
54         self.available += newlyAvailable
55         
56         # If a read is pending, let it go
57         if self.deferred and self.position < self.available:
58             # Try to read some data from the file
59             length = self.available - self.position
60             readSize = min(length, self.CHUNK_SIZE)
61             self.f.seek(self.position)
62             b = self.f.read(readSize)
63             bytesRead = len(b)
64             
65             # Check if end of file was reached
66             if bytesRead:
67                 self.position += bytesRead
68                 deferred = self.deferred
69                 self.deferred = None
70                 deferred.callback(b)
71
72     def allAvailable(self):
73         """Indicate that no more data will be coming available."""
74         self.finished = True
75
76         # If a read is pending, let it go
77         if self.deferred:
78             if self.position < self.available:
79                 # Try to read some data from the file
80                 length = self.available - self.position
81                 readSize = min(length, self.CHUNK_SIZE)
82                 self.f.seek(self.position)
83                 b = self.f.read(readSize)
84                 bytesRead = len(b)
85     
86                 # Check if end of file was reached
87                 if bytesRead:
88                     self.position += bytesRead
89                     deferred = self.deferred
90                     self.deferred = None
91                     deferred.callback(b)
92                 else:
93                     # We're done
94                     self._close()
95                     deferred = self.deferred
96                     self.deferred = None
97                     deferred.callback(None)
98             else:
99                 # We're done
100                 self._close()
101                 deferred = self.deferred
102                 self.deferred = None
103                 deferred.callback(None)
104         
105     def read(self, sendfile=False):
106         assert not self.deferred, "A previous read is still deferred."
107
108         if self.f is None:
109             return None
110
111         length = self.available - self.position
112         readSize = min(length, self.CHUNK_SIZE)
113
114         # If we don't have any available, we're done or deferred
115         if readSize <= 0:
116             if self.finished:
117                 self._close()
118                 return None
119             else:
120                 self.deferred = defer.Deferred()
121                 return self.deferred
122
123         # Try to read some data from the file
124         self.f.seek(self.position)
125         b = self.f.read(readSize)
126         bytesRead = len(b)
127         if not bytesRead:
128             # End of file was reached, we're done or deferred
129             if self.finished:
130                 self._close()
131                 return None
132             else:
133                 self.deferred = defer.Deferred()
134                 return self.deferred
135         else:
136             self.position += bytesRead
137             return b
138
139     def _close(self):
140         """Close the temporary file and remove it."""
141         self.f.close()
142         filepath.FilePath(self.f.name).remove()
143         self.f = None
144         
145 class StreamToFile:
146     """Save a stream to a partial file and hash it.
147     
148     @type stream: L{twisted.web2.stream.IByteStream}
149     @ivar stream: the input stream being read
150     @type outFile: L{twisted.python.filepath.FilePath}
151     @ivar outFile: the file being written
152     @type hasher: hashing object, e.g. C{sha1}
153     @ivar hasher: the hash object for the data
154     @type position: C{int}
155     @ivar position: the current file position to write the next data to
156     @type length: C{int}
157     @ivar length: the position in the file to not write beyond
158     @type doneDefer: L{twisted.internet.defer.Deferred}
159     @ivar doneDefer: the deferred that will fire when done writing
160     """
161     
162     def __init__(self, hasher, inputStream, outFile, start = 0, length = None):
163         """Initializes the file.
164         
165         @type hasher: hashing object, e.g. C{sha1}
166         @param hasher: the hash object for the data
167         @type inputStream: L{twisted.web2.stream.IByteStream}
168         @param inputStream: the input stream to read from
169         @type outFile: L{twisted.python.filepath.FilePath}
170         @param outFile: the file to write to
171         @type start: C{int}
172         @param start: the file position to start writing at
173             (optional, defaults to the start of the file)
174         @type length: C{int}
175         @param length: the maximum amount of data to write to the file
176             (optional, defaults to not limiting the writing to the file
177         """
178         self.stream = inputStream
179         self.outFile = outFile
180         self.hasher = hasher
181         self.position = start
182         self.length = None
183         if length is not None:
184             self.length = start + length
185         self.doneDefer = None
186         
187     def run(self):
188         """Start the streaming.
189
190         @rtype: L{twisted.internet.defer.Deferred}
191         """
192         self.doneDefer = stream.readStream(self.stream, self._gotData)
193         self.doneDefer.addCallbacks(self._done, self._error)
194         return self.doneDefer
195
196     def _gotData(self, data):
197         """Process the received data."""
198         if self.outFile.closed:
199             raise PeerError, "outFile was unexpectedly closed"
200         
201         if data is None:
202             raise PeerError, "Data is None?"
203         
204         # Make sure we don't go too far
205         if self.length is not None and self.position + len(data) > self.length:
206             data = data[:(self.length - self.position)]
207         
208         # Write and hash the streamed data
209         self.outFile.seek(self.position)
210         self.outFile.write(data)
211         self.hasher.update(data)
212         self.position += len(data)
213         
214     def _done(self, result):
215         """Return the result."""
216         return self.hasher.digest()
217     
218     def _error(self, err):
219         """Log the error."""
220         log.msg('Streaming error')
221         log.err(err)
222         return err
223     
224 class FileDownload:
225     """Manage a download from a list of peers or a mirror.
226     
227     @type manager: L{PeerManager}
228     @ivar manager: the manager to send requests for peers to
229     @type hash: L{Hash.HashObject}
230     @ivar hash: the hash object containing the expected hash for the file
231     @ivar mirror: the URI of the file on the mirror
232     @type compact_peers: C{list} of C{dictionary}
233     @ivar compact_peers: a list of the peer info where the file can be found
234     @type file: C{file}
235     @ivar file: the open file to right the download to
236     @type path: C{string}
237     @ivar path: the path to request from peers to access the file
238     @type pieces: C{list} of C{string} 
239     @ivar pieces: the hashes of the pieces in the file
240     @type started: C{boolean}
241     @ivar started: whether the download has begun yet
242     @type defer: L{twisted.internet.defer.Deferred}
243     @ivar defer: the deferred that will callback with the result of the download
244     @type peers: C{dictionary}
245     @ivar peers: information about each of the peers available to download from
246     @type outstanding: C{int}
247     @ivar outstanding: the number of requests to peers currently outstanding
248     @type peerlist: C{list} of L{HTTPDownloader.Peer}
249     @ivar peerlist: the sorted list of peers for this download
250     @type stream: L{GrowingFileStream}
251     @ivar stream: the stream of resulting data from the download
252     @type nextFinish: C{int}
253     @ivar nextFinish: the next piece that is needed to finish for the stream
254     @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
255     @ivar completePieces: one per piece, will be False if no requests are
256         outstanding for the piece, True if the piece has been successfully
257         downloaded, or the Peer that a request for this piece has been sent  
258     """
259     
260     def __init__(self, manager, hash, mirror, compact_peers, file):
261         """Initialize the instance and check for piece hashes.
262         
263         @type manager: L{PeerManager}
264         @param manager: the manager to send requests for peers to
265         @type hash: L{Hash.HashObject}
266         @param hash: the hash object containing the expected hash for the file
267         @param mirror: the URI of the file on the mirror
268         @type compact_peers: C{list} of C{dictionary}
269         @param compact_peers: a list of the peer info where the file can be found
270         @type file: L{twisted.python.filepath.FilePath}
271         @param file: the temporary file to use to store the downloaded file
272         """
273         self.manager = manager
274         self.hash = hash
275         self.mirror = mirror
276         self.compact_peers = compact_peers
277         
278         self.path = '/~/' + quote_plus(hash.expected())
279         self.defer = None
280         self.mirror_path = None
281         self.pieces = None
282         self.started = False
283         
284         file.restat(False)
285         if file.exists():
286             file.remove()
287         self.file = file.open('w+')
288
289     def run(self):
290         """Start the downloading process."""
291         log.msg('Checking for pieces for %s' % self.path)
292         self.defer = defer.Deferred()
293         self.peers = {}
294         no_pieces = 0
295         pieces_string = {0: 0}
296         pieces_hash = {0: 0}
297         pieces_dl_hash = {0: 0}
298
299         for compact_peer in self.compact_peers:
300             # Build a list of all the peers for this download
301             site = uncompact(compact_peer['c'])
302             peer = self.manager.getPeer(site)
303             self.peers.setdefault(site, {})['peer'] = peer
304
305             # Extract any piece information from the peers list
306             if 't' in compact_peer:
307                 self.peers[site]['t'] = compact_peer['t']['t']
308                 pieces_string.setdefault(compact_peer['t']['t'], 0)
309                 pieces_string[compact_peer['t']['t']] += 1
310             elif 'h' in compact_peer:
311                 self.peers[site]['h'] = compact_peer['h']
312                 pieces_hash.setdefault(compact_peer['h'], 0)
313                 pieces_hash[compact_peer['h']] += 1
314             elif 'l' in compact_peer:
315                 self.peers[site]['l'] = compact_peer['l']
316                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
317                 pieces_dl_hash[compact_peer['l']] += 1
318             else:
319                 no_pieces += 1
320         
321         # Select the most popular piece info
322         max_found = max(no_pieces, max(pieces_string.values()),
323                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
324
325         if max_found < len(self.peers):
326             log.msg('Misleading piece information found, using most popular %d of %d peers' % 
327                     (max_found, len(self.peers)))
328
329         if max_found == no_pieces:
330             # The file is not split into pieces
331             log.msg('No pieces were found for the file')
332             self.pieces = [self.hash.expected()]
333             self.startDownload()
334         elif max_found == max(pieces_string.values()):
335             # Small number of pieces in a string
336             for pieces, num in pieces_string.items():
337                 # Find the most popular piece string
338                 if num == max_found:
339                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
340                     log.msg('Peer info contained %d piece hashes' % len(self.pieces))
341                     self.startDownload()
342                     break
343         elif max_found == max(pieces_hash.values()):
344             # Medium number of pieces stored in the DHT
345             for pieces, num in pieces_hash.items():
346                 # Find the most popular piece hash to lookup
347                 if num == max_found:
348                     log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
349                     self.getDHTPieces(pieces)
350                     break
351         elif max_found == max(pieces_dl_hash.values()):
352             # Large number of pieces stored in peers
353             for pieces, num in pieces_dl_hash.items():
354                 # Find the most popular piece hash to download
355                 if num == max_found:
356                     log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
357                     self.getPeerPieces(pieces)
358                     break
359         return self.defer
360
361     #{ Downloading the piece hashes
362     def getDHTPieces(self, key):
363         """Retrieve the piece information from the DHT.
364         
365         @param key: the key to lookup in the DHT
366         """
367         # Remove any peers with the wrong piece hash
368         #for site in self.peers.keys():
369         #    if self.peers[site].get('h', '') != key:
370         #        del self.peers[site]
371
372         # Start the DHT lookup
373         lookupDefer = self.manager.dht.getValue(key)
374         lookupDefer.addBoth(self._getDHTPieces, key)
375         
376     def _getDHTPieces(self, results, key):
377         """Check the retrieved values."""
378         if isinstance(results, list):
379             for result in results:
380                 # Make sure the hash matches the key
381                 result_hash = sha.new(result.get('t', '')).digest()
382                 if result_hash == key:
383                     pieces = result['t']
384                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
385                     log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
386                     self.startDownload()
387                     return
388                 
389             log.msg('Could not retrieve the piece hashes from the DHT')
390         else:
391             log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
392             
393         # Continue without the piece hashes
394         self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
395         self.startDownload()
396
397     def getPeerPieces(self, key, failedSite = None):
398         """Retrieve the piece information from the peers.
399         
400         @param key: the key to request from the peers
401         """
402         if failedSite is None:
403             log.msg('Starting the lookup of piece hashes in peers')
404             self.outstanding = 0
405             # Remove any peers with the wrong piece hash
406             #for site in self.peers.keys():
407             #    if self.peers[site].get('l', '') != key:
408             #        del self.peers[site]
409         else:
410             log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
411             self.peers[failedSite]['failed'] = True
412             self.outstanding -= 1
413
414         if self.pieces is None:
415             # Send a request to one or more peers
416             for site in self.peers:
417                 if self.peers[site].get('failed', False) != True:
418                     log.msg('Sending a piece hash request to %r' % (site, ))
419                     path = '/~/' + quote_plus(key)
420                     lookupDefer = self.peers[site]['peer'].get(path)
421                     reactor.callLater(0, lookupDefer.addCallbacks,
422                                       *(self._getPeerPieces, self._gotPeerError),
423                                       **{'callbackArgs': (key, site),
424                                          'errbackArgs': (key, site)})
425                     self.outstanding += 1
426                     if self.outstanding >= 4:
427                         break
428         
429         if self.pieces is None and self.outstanding <= 0:
430             # Continue without the piece hashes
431             log.msg('Could not retrieve the piece hashes from the peers')
432             self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
433             self.startDownload()
434         
435     def _getPeerPieces(self, response, key, site):
436         """Process the retrieved response from the peer."""
437         log.msg('Got a piece hash response %d from %r' % (response.code, site))
438         if response.code != 200:
439             # Request failed, try a different peer
440             self.getPeerPieces(key, site)
441         else:
442             # Read the response stream to a string
443             self.peers[site]['pieces'] = ''
444             def _gotPeerPiece(data, self = self, site = site):
445                 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
446                 self.peers[site]['pieces'] += data
447             log.msg('Streaming piece hashes from peer')
448             df = stream.readStream(response.stream, _gotPeerPiece)
449             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
450                             callbackArgs=(key, site), errbackArgs=(key, site))
451
452     def _gotPeerError(self, err, key, site):
453         """Peer failed, try again."""
454         log.msg('Peer piece hash request failed for %r' % (site, ))
455         log.err(err)
456         self.getPeerPieces(key, site)
457
458     def _gotPeerPieces(self, result, key, site):
459         """Check the retrieved pieces from the peer."""
460         log.msg('Finished streaming piece hashes from peer %r' % (site, ))
461         if self.pieces is not None:
462             # Already done
463             log.msg('Already done')
464             return
465         
466         try:
467             result = bdecode(self.peers[site]['pieces'])
468         except:
469             log.msg('Error bdecoding piece hashes')
470             log.err()
471             self.getPeerPieces(key, site)
472             return
473             
474         result_hash = sha.new(result.get('t', '')).digest()
475         if result_hash == key:
476             pieces = result['t']
477             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
478             log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
479             self.startDownload()
480         else:
481             log.msg('Peer returned a piece string that did not match')
482             self.getPeerPieces(key, site)
483
484     #{ Downloading the file
485     def sort(self):
486         """Sort the peers by their rank (highest ranked at the end)."""
487         def sort(a, b):
488             """Sort peers by their rank."""
489             if a.rank > b.rank:
490                 return 1
491             elif a.rank < b.rank:
492                 return -1
493             return 0
494         self.peerlist.sort(sort)
495
496     def startDownload(self):
497         """Start the download from the peers."""
498         # Don't start twice
499         if self.started:
500             return
501         
502         log.msg('Starting to download %s' % self.path)
503         self.started = True
504         assert self.pieces, "You must initialize the piece hashes first"
505         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
506         
507         # Use the mirror if there are few peers
508         if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
509             parsed = urlparse(self.mirror)
510             if parsed[0] == "http":
511                 site = splitHostPort(parsed[0], parsed[1])
512                 self.mirror_path = urlunparse(('', '') + parsed[2:])
513                 peer = self.manager.getPeer(site, mirror = True)
514                 self.peerlist.append(peer)
515         
516         # Special case if there's only one good peer left
517 #        if len(self.peerlist) == 1:
518 #            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
519 #            self.defer.callback(self.peerlist[0].get(self.path))
520 #            return
521         
522         # Begin to download the pieces
523         self.outstanding = 0
524         self.nextFinish = 0
525         self.completePieces = [False for piece in self.pieces]
526         self.getPieces()
527         
528     #{ Downloading the pieces
529     def getPieces(self):
530         """Download the next pieces from the peers."""
531         self.sort()
532         piece = self.nextFinish
533         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
534             if self.completePieces[piece] == False:
535                 # Send a request to the highest ranked peer
536                 peer = self.peerlist.pop()
537                 self.completePieces[piece] = peer
538                 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
539                 
540                 self.outstanding += 1
541                 path = self.path
542                 if peer.mirror:
543                     path = self.mirror_path
544                 if len(self.completePieces) > 1:
545                     df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
546                 else:
547                     df = peer.get(path)
548                 reactor.callLater(0, df.addCallbacks,
549                                   *(self._getPiece, self._getError),
550                                   **{'callbackArgs': (piece, peer),
551                                      'errbackArgs': (piece, peer)})
552             piece += 1
553                 
554         # Check if we're done
555         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
556             log.msg('Download is complete for %s' % self.path)
557             self.stream.allAvailable()
558     
559     def _getPiece(self, response, piece, peer):
560         """Process the retrieved headers from the peer."""
561         if ((len(self.completePieces) > 1 and response.code != 206) or
562             (response.code not in (200, 206))):
563             # Request failed, try a different peer
564             log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
565             peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
566             self.completePieces[piece] = False
567             if response.stream and response.stream.length:
568                 stream.readAndDiscard(response.stream)
569         else:
570             if self.defer:
571                 # Start sending the return file
572                 df = self.defer
573                 self.defer = None
574                 self.stream = GrowingFileStream(self.file, self.hash.expSize)
575
576                 # Get the headers from the peer's response
577                 headers = {}
578                 if response.headers.hasHeader('last-modified'):
579                     headers['last-modified'] = response.headers.getHeader('last-modified')
580                 resp = Response(200, {}, self.stream)
581                 df.callback(resp)
582
583             # Read the response stream to the file
584             log.msg('Streaming piece %d from peer %r' % (piece, peer))
585             if response.code == 206:
586                 df = StreamToFile(self.hash.newPieceHasher(), response.stream,
587                                   self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
588             else:
589                 df = StreamToFile(self.hash.newHasher(), response.stream,
590                                   self.file).run()
591             reactor.callLater(0, df.addCallbacks,
592                               *(self._gotPiece, self._gotError),
593                               **{'callbackArgs': (piece, peer),
594                                  'errbackArgs': (piece, peer)})
595
596         self.outstanding -= 1
597         self.peerlist.append(peer)
598         self.getPieces()
599
600     def _getError(self, err, piece, peer):
601         """Peer failed, try again."""
602         log.msg('Got error for piece %d from peer %r' % (piece, peer))
603         self.outstanding -= 1
604         self.peerlist.append(peer)
605         self.completePieces[piece] = False
606         self.getPieces()
607         log.err(err)
608
609     def _gotPiece(self, response, piece, peer):
610         """Process the retrieved piece from the peer."""
611         if self.pieces[piece] and response != self.pieces[piece]:
612             # Hash doesn't match
613             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
614             peer.hashError('Piece received from peer does not match expected')
615             self.completePieces[piece] = False
616         else:
617             # Successfully completed one of several pieces
618             log.msg('Finished with piece %d from peer %r' % (piece, peer))
619             self.completePieces[piece] = True
620             while (self.nextFinish < len(self.completePieces) and
621                    self.completePieces[self.nextFinish] == True):
622                 self.nextFinish += 1
623                 self.stream.updateAvailable(PIECE_SIZE)
624
625         self.getPieces()
626
627     def _gotError(self, err, piece, peer):
628         """Piece download failed, try again."""
629         log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
630         log.err(err)
631         self.completePieces[piece] = False
632         self.getPieces()
633         
634 class PeerManager:
635     """Manage a set of peers and the requests to them.
636     
637     @type cache_dir: L{twisted.python.filepath.FilePath}
638     @ivar cache_dir: the directory to use for storing all files
639     @type dht: L{interfaces.IDHT}
640     @ivar dht: the DHT instance
641     @type stats: L{stats.StatsLogger}
642     @ivar stats: the statistics logger to record sent data to
643     @type clients: C{dictionary}
644     @ivar clients: the available peers that have been previously contacted
645     """
646
647     def __init__(self, cache_dir, dht, stats):
648         """Initialize the instance."""
649         self.cache_dir = cache_dir
650         self.cache_dir.restat(False)
651         if not self.cache_dir.exists():
652             self.cache_dir.makedirs()
653         self.dht = dht
654         self.stats = stats
655         self.clients = {}
656         
657     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
658         """Download from a list of peers or fallback to a mirror.
659         
660         @type hash: L{Hash.HashObject}
661         @param hash: the hash object containing the expected hash for the file
662         @param mirror: the URI of the file on the mirror
663         @type peers: C{list} of C{string}
664         @param peers: a list of the peer info where the file can be found
665             (optional, defaults to downloading from the mirror)
666         @type method: C{string}
667         @param method: the HTTP method to use, 'GET' or 'HEAD'
668             (optional, defaults to 'GET')
669         @type modtime: C{int}
670         @param modtime: the modification time to use for an 'If-Modified-Since'
671             header, as seconds since the epoch
672             (optional, defaults to not sending that header)
673         """
674         if not peers or method != "GET" or modtime is not None:
675             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
676             parsed = urlparse(mirror)
677             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
678             site = splitHostPort(parsed[0], parsed[1])
679             path = urlunparse(('', '') + parsed[2:])
680             peer = self.getPeer(site, mirror = True)
681             return peer.get(path, method, modtime)
682 #        elif len(peers) == 1:
683 #            site = uncompact(peers[0]['c'])
684 #            log.msg('Downloading from peer %r' % (site, ))
685 #            path = '/~/' + quote_plus(hash.expected())
686 #            peer = self.getPeer(site)
687 #            return peer.get(path)
688         else:
689             tmpfile = self.cache_dir.child(hash.hexexpected())
690             return FileDownload(self, hash, mirror, peers, tmpfile).run()
691         
692     def getPeer(self, site, mirror = False):
693         """Create a new peer if necessary and return it.
694         
695         @type site: (C{string}, C{int})
696         @param site: the IP address and port of the peer
697         @param mirror: whether the peer is actually a mirror
698             (optional, defaults to False)
699         """
700         if site not in self.clients:
701             self.clients[site] = Peer(site[0], site[1], self.stats)
702             if mirror:
703                 self.clients[site].mirror = True
704         return self.clients[site]
705     
706     def close(self):
707         """Close all the connections to peers."""
708         for site in self.clients:
709             self.clients[site].close()
710         self.clients = {}
711
712 class TestPeerManager(unittest.TestCase):
713     """Unit tests for the PeerManager."""
714     
715     manager = None
716     pending_calls = []
717     
718     def tearDown(self):
719         for p in self.pending_calls:
720             if p.active():
721                 p.cancel()
722         self.pending_calls = []
723         if self.manager:
724             self.manager.close()
725             self.manager = None