737669751da78ee9a3242996aed50e41dc5aa18b
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
8 import sha
9
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
15
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20 from apt_p2p_conf import config
21
22 class GrowingFileStream(stream.FileStream):
23     """Modified to stream data from a file as it becomes available.
24     
25     @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
26     @ivar deferred: waiting for the result of the last read attempt
27     @ivar available: the number of bytes that are currently available to read
28     @ivar position: the current position in the file where the next read will begin
29     @ivar finished: True when no more data will be coming available
30     """
31
32     CHUNK_SIZE = 4*1024
33
34     def __init__(self, f, length = None):
35         stream.FileStream.__init__(self, f)
36         self.length = length
37         self.deferred = None
38         self.available = 0L
39         self.position = 0L
40         self.finished = False
41
42     def updateAvailable(self, newlyAvailable):
43         """Update the number of bytes that are available.
44         
45         Call it with 0 to trigger reading of a fully read file.
46         
47         @param newlyAvailable: the number of bytes that just became available
48         """
49         assert not self.finished
50         self.available += newlyAvailable
51         
52         # If a read is pending, let it go
53         if self.deferred and self.position < self.available:
54             # Try to read some data from the file
55             length = self.available - self.position
56             readSize = min(length, self.CHUNK_SIZE)
57             self.f.seek(self.position)
58             b = self.f.read(readSize)
59             bytesRead = len(b)
60             
61             # Check if end of file was reached
62             if bytesRead:
63                 self.position += bytesRead
64                 deferred = self.deferred
65                 self.deferred = None
66                 deferred.callback(b)
67
68     def allAvailable(self):
69         """Indicate that no more data will be coming available."""
70         self.finished = True
71
72         # If a read is pending, let it go
73         if self.deferred:
74             if self.position < self.available:
75                 # Try to read some data from the file
76                 length = self.available - self.position
77                 readSize = min(length, self.CHUNK_SIZE)
78                 self.f.seek(self.position)
79                 b = self.f.read(readSize)
80                 bytesRead = len(b)
81     
82                 # Check if end of file was reached
83                 if bytesRead:
84                     self.position += bytesRead
85                     deferred = self.deferred
86                     self.deferred = None
87                     deferred.callback(b)
88                 else:
89                     # We're done
90                     deferred = self.deferred
91                     self.deferred = None
92                     deferred.callback(None)
93             else:
94                 # We're done
95                 deferred = self.deferred
96                 self.deferred = None
97                 deferred.callback(None)
98         
99     def read(self, sendfile=False):
100         assert not self.deferred, "A previous read is still deferred."
101
102         if self.f is None:
103             return None
104
105         length = self.available - self.position
106         readSize = min(length, self.CHUNK_SIZE)
107
108         # If we don't have any available, we're done or deferred
109         if readSize <= 0:
110             if self.finished:
111                 return None
112             else:
113                 self.deferred = defer.Deferred()
114                 return self.deferred
115
116         # Try to read some data from the file
117         self.f.seek(self.position)
118         b = self.f.read(readSize)
119         bytesRead = len(b)
120         if not bytesRead:
121             # End of file was reached, we're done or deferred
122             if self.finished:
123                 return None
124             else:
125                 self.deferred = defer.Deferred()
126                 return self.deferred
127         else:
128             self.position += bytesRead
129             return b
130
131 class StreamToFile:
132     """Save a stream to a partial file and hash it.
133     
134     @type stream: L{twisted.web2.stream.IByteStream}
135     @ivar stream: the input stream being read
136     @type outFile: L{twisted.python.filepath.FilePath}
137     @ivar outFile: the file being written
138     @type hash: C{sha1}
139     @ivar hash: the hash object for the data
140     @type position: C{int}
141     @ivar position: the current file position to write the next data to
142     @type length: C{int}
143     @ivar length: the position in the file to not write beyond
144     @type doneDefer: L{twisted.internet.defer.Deferred}
145     @ivar doneDefer: the deferred that will fire when done writing
146     """
147     
148     def __init__(self, inputStream, outFile, start = 0, length = None):
149         """Initializes the file.
150         
151         @type inputStream: L{twisted.web2.stream.IByteStream}
152         @param inputStream: the input stream to read from
153         @type outFile: L{twisted.python.filepath.FilePath}
154         @param outFile: the file to write to
155         @type start: C{int}
156         @param start: the file position to start writing at
157             (optional, defaults to the start of the file)
158         @type length: C{int}
159         @param length: the maximum amount of data to write to the file
160             (optional, defaults to not limiting the writing to the file
161         """
162         self.stream = inputStream
163         self.outFile = outFile
164         self.hash = sha.new()
165         self.position = start
166         self.length = None
167         if length is not None:
168             self.length = start + length
169         self.doneDefer = None
170         
171     def run(self):
172         """Start the streaming.
173
174         @rtype: L{twisted.internet.defer.Deferred}
175         """
176         log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
177         self.doneDefer = stream.readStream(self.stream, self._gotData)
178         self.doneDefer.addCallbacks(self._done, self._error)
179         return self.doneDefer
180
181     def _gotData(self, data):
182         """Process the received data."""
183         if self.outFile.closed:
184             raise Exception, "outFile was unexpectedly closed"
185         
186         if data is None:
187             raise Exception, "Data is None?"
188         
189         # Make sure we don't go too far
190         if self.length is not None and self.position + len(data) > self.length:
191             data = data[:(self.length - self.position)]
192         
193         # Write and hash the streamed data
194         self.outFile.seek(self.position)
195         self.outFile.write(data)
196         self.hash.update(data)
197         self.position += len(data)
198         
199     def _done(self, result):
200         """Return the result."""
201         log.msg('Streaming is complete')
202         return self.hash.digest()
203     
204     def _error(self, err):
205         """Log the error."""
206         log.msg('Streaming error')
207         log.err(err)
208         return err
209     
210 class FileDownload:
211     """Manage a download from a list of peers or a mirror.
212     
213     @type manager: L{PeerManager}
214     @ivar manager: the manager to send requests for peers to
215     @type hash: L{Hash.HashObject}
216     @ivar hash: the hash object containing the expected hash for the file
217     @ivar mirror: the URI of the file on the mirror
218     @type compact_peers: C{list} of C{dictionary}
219     @ivar compact_peers: a list of the peer info where the file can be found
220     @type file: C{file}
221     @ivar file: the open file to right the download to
222     @type path: C{string}
223     @ivar path: the path to request from peers to access the file
224     @type pieces: C{list} of C{string} 
225     @ivar pieces: the hashes of the pieces in the file
226     @type started: C{boolean}
227     @ivar started: whether the download has begun yet
228     @type defer: L{twisted.internet.defer.Deferred}
229     @ivar defer: the deferred that will callback with the result of the download
230     @type peers: C{dictionary}
231     @ivar peers: information about each of the peers available to download from
232     @type outstanding: C{int}
233     @ivar outstanding: the number of requests to peers currently outstanding
234     @type peerlist: C{list} of L{HTTPDownloader.Peer}
235     @ivar peerlist: the sorted list of peers for this download
236     @type stream: L{GrowingFileStream}
237     @ivar stream: the stream of resulting data from the download
238     @type nextFinish: C{int}
239     @ivar nextFinish: the next piece that is needed to finish for the stream
240     @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
241     @ivar completePieces: one per piece, will be False if no requests are
242         outstanding for the piece, True if the piece has been successfully
243         downloaded, or the Peer that a request for this piece has been sent  
244     """
245     
246     def __init__(self, manager, hash, mirror, compact_peers, file):
247         """Initialize the instance and check for piece hashes.
248         
249         @type manager: L{PeerManager}
250         @param manager: the manager to send requests for peers to
251         @type hash: L{Hash.HashObject}
252         @param hash: the hash object containing the expected hash for the file
253         @param mirror: the URI of the file on the mirror
254         @type compact_peers: C{list} of C{dictionary}
255         @param compact_peers: a list of the peer info where the file can be found
256         @type file: L{twisted.python.filepath.FilePath}
257         @param file: the temporary file to use to store the downloaded file
258         """
259         self.manager = manager
260         self.hash = hash
261         self.mirror = mirror
262         self.compact_peers = compact_peers
263         
264         self.path = '/~/' + quote_plus(hash.expected())
265         self.mirror_path = None
266         self.pieces = None
267         self.started = False
268         
269         file.restat(False)
270         if file.exists():
271             file.remove()
272         self.file = file.open('w+')
273
274     def run(self):
275         """Start the downloading process."""
276         log.msg('Checking for pieces for %s' % self.path)
277         self.defer = defer.Deferred()
278         self.peers = {}
279         no_pieces = 0
280         pieces_string = {0: 0}
281         pieces_hash = {0: 0}
282         pieces_dl_hash = {0: 0}
283
284         for compact_peer in self.compact_peers:
285             # Build a list of all the peers for this download
286             site = uncompact(compact_peer['c'])
287             peer = self.manager.getPeer(site)
288             self.peers.setdefault(site, {})['peer'] = peer
289
290             # Extract any piece information from the peers list
291             if 't' in compact_peer:
292                 self.peers[site]['t'] = compact_peer['t']['t']
293                 pieces_string.setdefault(compact_peer['t']['t'], 0)
294                 pieces_string[compact_peer['t']['t']] += 1
295             elif 'h' in compact_peer:
296                 self.peers[site]['h'] = compact_peer['h']
297                 pieces_hash.setdefault(compact_peer['h'], 0)
298                 pieces_hash[compact_peer['h']] += 1
299             elif 'l' in compact_peer:
300                 self.peers[site]['l'] = compact_peer['l']
301                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
302                 pieces_dl_hash[compact_peer['l']] += 1
303             else:
304                 no_pieces += 1
305         
306         # Select the most popular piece info
307         max_found = max(no_pieces, max(pieces_string.values()),
308                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
309
310         if max_found < len(self.peers):
311             log.msg('Misleading piece information found, using most popular %d of %d peers' % 
312                     (max_found, len(self.peers)))
313
314         if max_found == no_pieces:
315             # The file is not split into pieces
316             log.msg('No pieces were found for the file')
317             self.pieces = [self.hash.expected()]
318             self.startDownload()
319         elif max_found == max(pieces_string.values()):
320             # Small number of pieces in a string
321             for pieces, num in pieces_string.items():
322                 # Find the most popular piece string
323                 if num == max_found:
324                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
325                     log.msg('Peer info contained %d piece hashes' % len(self.pieces))
326                     self.startDownload()
327                     break
328         elif max_found == max(pieces_hash.values()):
329             # Medium number of pieces stored in the DHT
330             for pieces, num in pieces_hash.items():
331                 # Find the most popular piece hash to lookup
332                 if num == max_found:
333                     log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
334                     self.getDHTPieces(pieces)
335                     break
336         elif max_found == max(pieces_dl_hash.values()):
337             # Large number of pieces stored in peers
338             for pieces, num in pieces_dl_hash.items():
339                 # Find the most popular piece hash to download
340                 if num == max_found:
341                     log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
342                     self.getPeerPieces(pieces)
343                     break
344         return self.defer
345
346     #{ Downloading the piece hashes
347     def getDHTPieces(self, key):
348         """Retrieve the piece information from the DHT.
349         
350         @param key: the key to lookup in the DHT
351         """
352         # Remove any peers with the wrong piece hash
353         #for site in self.peers.keys():
354         #    if self.peers[site].get('h', '') != key:
355         #        del self.peers[site]
356
357         # Start the DHT lookup
358         lookupDefer = self.manager.dht.getValue(key)
359         lookupDefer.addBoth(self._getDHTPieces, key)
360         
361     def _getDHTPieces(self, results, key):
362         """Check the retrieved values."""
363         if isinstance(results, list):
364             for result in results:
365                 # Make sure the hash matches the key
366                 result_hash = sha.new(result.get('t', '')).digest()
367                 if result_hash == key:
368                     pieces = result['t']
369                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
370                     log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
371                     self.startDownload()
372                     return
373                 
374             log.msg('Could not retrieve the piece hashes from the DHT')
375         else:
376             log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
377             
378         # Continue without the piece hashes
379         self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
380         self.startDownload()
381
382     def getPeerPieces(self, key, failedSite = None):
383         """Retrieve the piece information from the peers.
384         
385         @param key: the key to request from the peers
386         """
387         if failedSite is None:
388             log.msg('Starting the lookup of piece hashes in peers')
389             self.outstanding = 0
390             # Remove any peers with the wrong piece hash
391             #for site in self.peers.keys():
392             #    if self.peers[site].get('l', '') != key:
393             #        del self.peers[site]
394         else:
395             log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
396             self.peers[failedSite]['failed'] = True
397             self.outstanding -= 1
398
399         if self.pieces is None:
400             # Send a request to one or more peers
401             log.msg('Checking for a peer to request piece hashes from')
402             for site in self.peers:
403                 if self.peers[site].get('failed', False) != True:
404                     log.msg('Sending a piece hash request to %r' % (site, ))
405                     path = '/~/' + quote_plus(key)
406                     lookupDefer = self.peers[site]['peer'].get(path)
407                     reactor.callLater(0, lookupDefer.addCallbacks,
408                                       *(self._getPeerPieces, self._gotPeerError),
409                                       **{'callbackArgs': (key, site),
410                                          'errbackArgs': (key, site)})
411                     self.outstanding += 1
412                     if self.outstanding >= 4:
413                         break
414         
415         log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
416         if self.pieces is None and self.outstanding <= 0:
417             # Continue without the piece hashes
418             log.msg('Could not retrieve the piece hashes from the peers')
419             self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
420             self.startDownload()
421         
422     def _getPeerPieces(self, response, key, site):
423         """Process the retrieved response from the peer."""
424         log.msg('Got a piece hash response %d from %r' % (response.code, site))
425         if response.code != 200:
426             # Request failed, try a different peer
427             log.msg('Did not like response %d from %r' % (response.code, site))
428             self.getPeerPieces(key, site)
429         else:
430             # Read the response stream to a string
431             self.peers[site]['pieces'] = ''
432             def _gotPeerPiece(data, self = self, site = site):
433                 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
434                 self.peers[site]['pieces'] += data
435             log.msg('Streaming piece hashes from peer')
436             df = stream.readStream(response.stream, _gotPeerPiece)
437             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
438                             callbackArgs=(key, site), errbackArgs=(key, site))
439
440     def _gotPeerError(self, err, key, site):
441         """Peer failed, try again."""
442         log.msg('Peer piece hash request failed for %r' % (site, ))
443         log.err(err)
444         self.getPeerPieces(key, site)
445
446     def _gotPeerPieces(self, result, key, site):
447         """Check the retrieved pieces from the peer."""
448         log.msg('Finished streaming piece hashes from peer %r' % (site, ))
449         if self.pieces is not None:
450             # Already done
451             log.msg('Already done')
452             return
453         
454         try:
455             result = bdecode(self.peers[site]['pieces'])
456         except:
457             log.msg('Error bdecoding piece hashes')
458             log.err()
459             self.getPeerPieces(key, site)
460             return
461             
462         result_hash = sha.new(result.get('t', '')).digest()
463         if result_hash == key:
464             pieces = result['t']
465             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
466             log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
467             self.startDownload()
468         else:
469             log.msg('Peer returned a piece string that did not match')
470             self.getPeerPieces(key, site)
471
472     #{ Downloading the file
473     def sort(self):
474         """Sort the peers by their rank (highest ranked at the end)."""
475         def sort(a, b):
476             """Sort peers by their rank."""
477             if a.rank > b.rank:
478                 return 1
479             elif a.rank < b.rank:
480                 return -1
481             return 0
482         self.peerlist.sort(sort)
483
484     def startDownload(self):
485         """Start the download from the peers."""
486         # Don't start twice
487         if self.started:
488             return
489         
490         log.msg('Starting to download %s' % self.path)
491         self.started = True
492         assert self.pieces, "You must initialize the piece hashes first"
493         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
494         
495         # Use the mirror if there are few peers
496         if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
497             parsed = urlparse(self.mirror)
498             if parsed[0] == "http":
499                 site = splitHostPort(parsed[0], parsed[1])
500                 self.mirror_path = urlunparse(('', '') + parsed[2:])
501                 peer = self.manager.getPeer(site, mirror = True)
502                 self.peerlist.append(peer)
503         
504         # Special case if there's only one good peer left
505 #        if len(self.peerlist) == 1:
506 #            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
507 #            self.defer.callback(self.peerlist[0].get(self.path))
508 #            return
509         
510         # Start sending the return file
511         self.stream = GrowingFileStream(self.file, self.hash.expSize)
512         resp = Response(200, {}, self.stream)
513         self.defer.callback(resp)
514
515         # Begin to download the pieces
516         self.outstanding = 0
517         self.nextFinish = 0
518         self.completePieces = [False for piece in self.pieces]
519         self.getPieces()
520         
521     #{ Downloading the pieces
522     def getPieces(self):
523         """Download the next pieces from the peers."""
524         log.msg('Checking for more piece requests to send')
525         self.sort()
526         piece = self.nextFinish
527         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
528             log.msg('Checking piece %d' % piece)
529             if self.completePieces[piece] == False:
530                 # Send a request to the highest ranked peer
531                 peer = self.peerlist.pop()
532                 self.completePieces[piece] = peer
533                 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
534                 
535                 self.outstanding += 1
536                 if peer.mirror:
537                     df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
538                 else:
539                     df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
540                 reactor.callLater(0, df.addCallbacks,
541                                   *(self._getPiece, self._getError),
542                                   **{'callbackArgs': (piece, peer),
543                                      'errbackArgs': (piece, peer)})
544             piece += 1
545                 
546         log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
547         # Check if we're done
548         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
549             log.msg('We seem to be done with all pieces')
550             self.stream.allAvailable()
551     
552     def _getPiece(self, response, piece, peer):
553         """Process the retrieved headers from the peer."""
554         log.msg('Got response for piece %d from peer %r' % (piece, peer))
555         if ((len(self.completePieces) > 1 and response.code != 206) or
556             (response.code not in (200, 206))):
557             # Request failed, try a different peer
558             log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
559             peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
560             self.completePieces[piece] = False
561             if response.stream and response.stream.length:
562                 stream.readAndDiscard(response.stream)
563         else:
564             # Read the response stream to the file
565             log.msg('Streaming piece %d from peer %r' % (piece, peer))
566             if response.code == 206:
567                 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
568                                   PIECE_SIZE).run()
569             else:
570                 df = StreamToFile(response.stream, self.file).run()
571             reactor.callLater(0, df.addCallbacks,
572                               *(self._gotPiece, self._gotError),
573                               **{'callbackArgs': (piece, peer),
574                                  'errbackArgs': (piece, peer)})
575
576         self.outstanding -= 1
577         self.peerlist.append(peer)
578         self.getPieces()
579
580     def _getError(self, err, piece, peer):
581         """Peer failed, try again."""
582         log.msg('Got error for piece %d from peer %r' % (piece, peer))
583         self.outstanding -= 1
584         self.peerlist.append(peer)
585         self.completePieces[piece] = False
586         self.getPieces()
587         log.err(err)
588
589     def _gotPiece(self, response, piece, peer):
590         """Process the retrieved piece from the peer."""
591         log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
592         if self.pieces[piece] and response != self.pieces[piece]:
593             # Hash doesn't match
594             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
595             peer.hashError('Piece received from peer does not match expected')
596             self.completePieces[piece] = False
597         else:
598             # Successfully completed one of several pieces
599             log.msg('Finished with piece %d from peer %r' % (piece, peer))
600             self.completePieces[piece] = True
601             while (self.nextFinish < len(self.completePieces) and
602                    self.completePieces[self.nextFinish] == True):
603                 self.nextFinish += 1
604                 self.stream.updateAvailable(PIECE_SIZE)
605
606         self.getPieces()
607
608     def _gotError(self, err, piece, peer):
609         """Piece download failed, try again."""
610         log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
611         log.err(err)
612         self.completePieces[piece] = False
613         self.getPieces()
614         
615 class PeerManager:
616     """Manage a set of peers and the requests to them.
617     
618     @type cache_dir: L{twisted.python.filepath.FilePath}
619     @ivar cache_dir: the directory to use for storing all files
620     @type dht: L{interfaces.IDHT}
621     @ivar dht: the DHT instance
622     @type stats: L{stats.StatsLogger}
623     @ivar stats: the statistics logger to record sent data to
624     @type clients: C{dictionary}
625     @ivar clients: the available peers that have been previously contacted
626     """
627
628     def __init__(self, cache_dir, dht, stats):
629         """Initialize the instance."""
630         self.cache_dir = cache_dir
631         self.cache_dir.restat(False)
632         if not self.cache_dir.exists():
633             self.cache_dir.makedirs()
634         self.dht = dht
635         self.stats = stats
636         self.clients = {}
637         
638     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
639         """Download from a list of peers or fallback to a mirror.
640         
641         @type hash: L{Hash.HashObject}
642         @param hash: the hash object containing the expected hash for the file
643         @param mirror: the URI of the file on the mirror
644         @type peers: C{list} of C{string}
645         @param peers: a list of the peer info where the file can be found
646             (optional, defaults to downloading from the mirror)
647         @type method: C{string}
648         @param method: the HTTP method to use, 'GET' or 'HEAD'
649             (optional, defaults to 'GET')
650         @type modtime: C{int}
651         @param modtime: the modification time to use for an 'If-Modified-Since'
652             header, as seconds since the epoch
653             (optional, defaults to not sending that header)
654         """
655         if not peers or method != "GET" or modtime is not None:
656             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
657             parsed = urlparse(mirror)
658             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
659             site = splitHostPort(parsed[0], parsed[1])
660             path = urlunparse(('', '') + parsed[2:])
661             peer = self.getPeer(site, mirror = True)
662             return peer.get(path, method, modtime)
663 #        elif len(peers) == 1:
664 #            site = uncompact(peers[0]['c'])
665 #            log.msg('Downloading from peer %r' % (site, ))
666 #            path = '/~/' + quote_plus(hash.expected())
667 #            peer = self.getPeer(site)
668 #            return peer.get(path)
669         else:
670             tmpfile = self.cache_dir.child(hash.hexexpected())
671             return FileDownload(self, hash, mirror, peers, tmpfile).run()
672         
673     def getPeer(self, site, mirror = False):
674         """Create a new peer if necessary and return it.
675         
676         @type site: (C{string}, C{int})
677         @param site: the IP address and port of the peer
678         @param mirror: whether the peer is actually a mirror
679             (optional, defaults to False)
680         """
681         if site not in self.clients:
682             self.clients[site] = Peer(site[0], site[1], self.stats)
683             if mirror:
684                 self.clients[site].mirror = True
685         return self.clients[site]
686     
687     def close(self):
688         """Close all the connections to peers."""
689         for site in self.clients:
690             self.clients[site].close()
691         self.clients = {}
692
693 class TestPeerManager(unittest.TestCase):
694     """Unit tests for the PeerManager."""
695     
696     manager = None
697     pending_calls = []
698     
699     def tearDown(self):
700         for p in self.pending_calls:
701             if p.active():
702                 p.cancel()
703         self.pending_calls = []
704         if self.manager:
705             self.manager.close()
706             self.manager = None