e9d1ecb590702de992bec32c14f3803fa0d6b6b9
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
8 import sha
9
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
15
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20 from apt_p2p_conf import config
21
22 class GrowingFileStream(stream.FileStream):
23     """Modified to stream data from a file as it becomes available.
24     
25     @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
26     @ivar deferred: waiting for the result of the last read attempt
27     @ivar available: the number of bytes that are currently available to read
28     @ivar position: the current position in the file where the next read will begin
29     @ivar finished: True when no more data will be coming available
30     """
31
32     CHUNK_SIZE = 4*1024
33
34     def __init__(self, f, length = None):
35         stream.FileStream.__init__(self, f)
36         self.length = length
37         self.deferred = None
38         self.available = 0L
39         self.position = 0L
40         self.finished = False
41
42     def updateAvailable(self, newlyAvailable):
43         """Update the number of bytes that are available.
44         
45         Call it with 0 to trigger reading of a fully read file.
46         
47         @param newlyAvailable: the number of bytes that just became available
48         """
49         assert not self.finished
50         self.available += newlyAvailable
51         
52         # If a read is pending, let it go
53         if self.deferred and self.position < self.available:
54             # Try to read some data from the file
55             length = self.available - self.position
56             readSize = min(length, self.CHUNK_SIZE)
57             self.f.seek(self.position)
58             b = self.f.read(readSize)
59             bytesRead = len(b)
60             
61             # Check if end of file was reached
62             if bytesRead:
63                 self.position += bytesRead
64                 deferred = self.deferred
65                 self.deferred = None
66                 deferred.callback(b)
67
68     def allAvailable(self):
69         """Indicate that no more data will be coming available."""
70         self.finished = True
71
72         # If a read is pending, let it go
73         if self.deferred:
74             if self.position < self.available:
75                 # Try to read some data from the file
76                 length = self.available - self.position
77                 readSize = min(length, self.CHUNK_SIZE)
78                 self.f.seek(self.position)
79                 b = self.f.read(readSize)
80                 bytesRead = len(b)
81     
82                 # Check if end of file was reached
83                 if bytesRead:
84                     self.position += bytesRead
85                     deferred = self.deferred
86                     self.deferred = None
87                     deferred.callback(b)
88                 else:
89                     # We're done
90                     deferred = self.deferred
91                     self.deferred = None
92                     deferred.callback(None)
93             else:
94                 # We're done
95                 deferred = self.deferred
96                 self.deferred = None
97                 deferred.callback(None)
98         
99     def read(self, sendfile=False):
100         assert not self.deferred, "A previous read is still deferred."
101
102         if self.f is None:
103             return None
104
105         length = self.available - self.position
106         readSize = min(length, self.CHUNK_SIZE)
107
108         # If we don't have any available, we're done or deferred
109         if readSize <= 0:
110             if self.finished:
111                 return None
112             else:
113                 self.deferred = defer.Deferred()
114                 return self.deferred
115
116         # Try to read some data from the file
117         self.f.seek(self.position)
118         b = self.f.read(readSize)
119         bytesRead = len(b)
120         if not bytesRead:
121             # End of file was reached, we're done or deferred
122             if self.finished:
123                 return None
124             else:
125                 self.deferred = defer.Deferred()
126                 return self.deferred
127         else:
128             self.position += bytesRead
129             return b
130
131 class StreamToFile:
132     """Save a stream to a partial file and hash it.
133     
134     @type stream: L{twisted.web2.stream.IByteStream}
135     @ivar stream: the input stream being read
136     @type outFile: L{twisted.python.filepath.FilePath}
137     @ivar outFile: the file being written
138     @type hash: C{sha1}
139     @ivar hash: the hash object for the data
140     @type position: C{int}
141     @ivar position: the current file position to write the next data to
142     @type length: C{int}
143     @ivar length: the position in the file to not write beyond
144     @type doneDefer: L{twisted.internet.defer.Deferred}
145     @ivar doneDefer: the deferred that will fire when done writing
146     """
147     
148     def __init__(self, inputStream, outFile, start = 0, length = None):
149         """Initializes the file.
150         
151         @type inputStream: L{twisted.web2.stream.IByteStream}
152         @param inputStream: the input stream to read from
153         @type outFile: L{twisted.python.filepath.FilePath}
154         @param outFile: the file to write to
155         @type start: C{int}
156         @param start: the file position to start writing at
157             (optional, defaults to the start of the file)
158         @type length: C{int}
159         @param length: the maximum amount of data to write to the file
160             (optional, defaults to not limiting the writing to the file
161         """
162         self.stream = inputStream
163         self.outFile = outFile
164         self.hash = sha.new()
165         self.position = start
166         self.length = None
167         if length is not None:
168             self.length = start + length
169         self.doneDefer = None
170         
171     def run(self):
172         """Start the streaming.
173
174         @rtype: L{twisted.internet.defer.Deferred}
175         """
176         log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
177         self.doneDefer = stream.readStream(self.stream, self._gotData)
178         self.doneDefer.addCallbacks(self._done, self._error)
179         return self.doneDefer
180
181     def _gotData(self, data):
182         """Process the received data."""
183         if self.outFile.closed:
184             raise Exception, "outFile was unexpectedly closed"
185         
186         if data is None:
187             raise Exception, "Data is None?"
188         
189         # Make sure we don't go too far
190         if self.length is not None and self.position + len(data) > self.length:
191             data = data[:(self.length - self.position)]
192         
193         # Write and hash the streamed data
194         self.outFile.seek(self.position)
195         self.outFile.write(data)
196         self.hash.update(data)
197         self.position += len(data)
198         
199     def _done(self, result):
200         """Return the result."""
201         log.msg('Streaming is complete')
202         return self.hash.digest()
203     
204     def _error(self, err):
205         """Log the error."""
206         log.msg('Streaming error')
207         log.err(err)
208         return err
209     
210 class FileDownload:
211     """Manage a download from a list of peers or a mirror.
212     
213     @type manager: L{PeerManager}
214     @ivar manager: the manager to send requests for peers to
215     @type hash: L{Hash.HashObject}
216     @ivar hash: the hash object containing the expected hash for the file
217     @ivar mirror: the URI of the file on the mirror
218     @type compact_peers: C{list} of C{dictionary}
219     @ivar compact_peers: a list of the peer info where the file can be found
220     @type file: C{file}
221     @ivar file: the open file to right the download to
222     @type path: C{string}
223     @ivar path: the path to request from peers to access the file
224     @type pieces: C{list} of C{string} 
225     @ivar pieces: the hashes of the pieces in the file
226     @type started: C{boolean}
227     @ivar started: whether the download has begun yet
228     @type defer: L{twisted.internet.defer.Deferred}
229     @ivar defer: the deferred that will callback with the result of the download
230     @type peers: C{dictionary}
231     @ivar peers: information about each of the peers available to download from
232     @type outstanding: C{int}
233     @ivar outstanding: the number of requests to peers currently outstanding
234     @type peerlist: C{list} of L{HTTPDownloader.Peer}
235     @ivar peerlist: the sorted list of peers for this download
236     @type stream: L{GrowingFileStream}
237     @ivar stream: the stream of resulting data from the download
238     @type nextFinish: C{int}
239     @ivar nextFinish: the next piece that is needed to finish for the stream
240     @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
241     @ivar completePieces: one per piece, will be False if no requests are
242         outstanding for the piece, True if the piece has been successfully
243         downloaded, or the Peer that a request for this piece has been sent  
244     """
245     
246     def __init__(self, manager, hash, mirror, compact_peers, file):
247         """Initialize the instance and check for piece hashes.
248         
249         @type manager: L{PeerManager}
250         @param manager: the manager to send requests for peers to
251         @type hash: L{Hash.HashObject}
252         @param hash: the hash object containing the expected hash for the file
253         @param mirror: the URI of the file on the mirror
254         @type compact_peers: C{list} of C{dictionary}
255         @param compact_peers: a list of the peer info where the file can be found
256         @type file: L{twisted.python.filepath.FilePath}
257         @param file: the temporary file to use to store the downloaded file
258         """
259         self.manager = manager
260         self.hash = hash
261         self.mirror = mirror
262         self.compact_peers = compact_peers
263         
264         self.path = '/~/' + quote_plus(hash.expected())
265         self.mirror_path = None
266         self.pieces = None
267         self.started = False
268         
269         file.restat(False)
270         if file.exists():
271             file.remove()
272         self.file = file.open('w+')
273
274     def run(self):
275         """Start the downloading process."""
276         log.msg('Checking for pieces for %s' % self.path)
277         self.defer = defer.Deferred()
278         self.peers = {}
279         no_pieces = 0
280         pieces_string = {0: 0}
281         pieces_hash = {0: 0}
282         pieces_dl_hash = {0: 0}
283
284         for compact_peer in self.compact_peers:
285             # Build a list of all the peers for this download
286             site = uncompact(compact_peer['c'])
287             peer = self.manager.getPeer(site)
288             self.peers.setdefault(site, {})['peer'] = peer
289
290             # Extract any piece information from the peers list
291             if 't' in compact_peer:
292                 self.peers[site]['t'] = compact_peer['t']['t']
293                 pieces_string.setdefault(compact_peer['t']['t'], 0)
294                 pieces_string[compact_peer['t']['t']] += 1
295             elif 'h' in compact_peer:
296                 self.peers[site]['h'] = compact_peer['h']
297                 pieces_hash.setdefault(compact_peer['h'], 0)
298                 pieces_hash[compact_peer['h']] += 1
299             elif 'l' in compact_peer:
300                 self.peers[site]['l'] = compact_peer['l']
301                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
302                 pieces_dl_hash[compact_peer['l']] += 1
303             else:
304                 no_pieces += 1
305         
306         # Select the most popular piece info
307         max_found = max(no_pieces, max(pieces_string.values()),
308                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
309
310         if max_found < len(self.peers):
311             log.msg('Misleading piece information found, using most popular %d of %d peers' % 
312                     (max_found, len(self.peers)))
313
314         if max_found == no_pieces:
315             # The file is not split into pieces
316             log.msg('No pieces were found for the file')
317             self.pieces = [self.hash.expected()]
318             self.startDownload()
319         elif max_found == max(pieces_string.values()):
320             # Small number of pieces in a string
321             for pieces, num in pieces_string.items():
322                 # Find the most popular piece string
323                 if num == max_found:
324                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
325                     log.msg('Peer info contained %d piece hashes' % len(self.pieces))
326                     self.startDownload()
327                     break
328         elif max_found == max(pieces_hash.values()):
329             # Medium number of pieces stored in the DHT
330             for pieces, num in pieces_hash.items():
331                 # Find the most popular piece hash to lookup
332                 if num == max_found:
333                     log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
334                     self.getDHTPieces(pieces)
335                     break
336         elif max_found == max(pieces_dl_hash.values()):
337             # Large number of pieces stored in peers
338             for pieces, num in pieces_dl_hash.items():
339                 # Find the most popular piece hash to download
340                 if num == max_found:
341                     log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
342                     self.getPeerPieces(pieces)
343                     break
344         return self.defer
345
346     #{ Downloading the piece hashes
347     def getDHTPieces(self, key):
348         """Retrieve the piece information from the DHT.
349         
350         @param key: the key to lookup in the DHT
351         """
352         # Remove any peers with the wrong piece hash
353         #for site in self.peers.keys():
354         #    if self.peers[site].get('h', '') != key:
355         #        del self.peers[site]
356
357         # Start the DHT lookup
358         lookupDefer = self.manager.dht.getValue(key)
359         lookupDefer.addCallback(self._getDHTPieces, key)
360         
361     def _getDHTPieces(self, results, key):
362         """Check the retrieved values."""
363         for result in results:
364             # Make sure the hash matches the key
365             result_hash = sha.new(result.get('t', '')).digest()
366             if result_hash == key:
367                 pieces = result['t']
368                 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
369                 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
370                 self.startDownload()
371                 return
372             
373         # Continue without the piece hashes
374         log.msg('Could not retrieve the piece hashes from the DHT')
375         self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
376         self.startDownload()
377
378     def getPeerPieces(self, key, failedSite = None):
379         """Retrieve the piece information from the peers.
380         
381         @param key: the key to request from the peers
382         """
383         if failedSite is None:
384             log.msg('Starting the lookup of piece hashes in peers')
385             self.outstanding = 0
386             # Remove any peers with the wrong piece hash
387             #for site in self.peers.keys():
388             #    if self.peers[site].get('l', '') != key:
389             #        del self.peers[site]
390         else:
391             log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
392             self.peers[failedSite]['failed'] = True
393             self.outstanding -= 1
394
395         if self.pieces is None:
396             # Send a request to one or more peers
397             log.msg('Checking for a peer to request piece hashes from')
398             for site in self.peers:
399                 if self.peers[site].get('failed', False) != True:
400                     log.msg('Sending a piece hash request to %r' % (site, ))
401                     path = '/~/' + quote_plus(key)
402                     lookupDefer = self.peers[site]['peer'].get(path)
403                     reactor.callLater(0, lookupDefer.addCallbacks,
404                                       *(self._getPeerPieces, self._gotPeerError),
405                                       **{'callbackArgs': (key, site),
406                                          'errbackArgs': (key, site)})
407                     self.outstanding += 1
408                     if self.outstanding >= 4:
409                         break
410         
411         log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
412         if self.pieces is None and self.outstanding <= 0:
413             # Continue without the piece hashes
414             log.msg('Could not retrieve the piece hashes from the peers')
415             self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
416             self.startDownload()
417         
418     def _getPeerPieces(self, response, key, site):
419         """Process the retrieved response from the peer."""
420         log.msg('Got a piece hash response %d from %r' % (response.code, site))
421         if response.code != 200:
422             # Request failed, try a different peer
423             log.msg('Did not like response %d from %r' % (response.code, site))
424             self.getPeerPieces(key, site)
425         else:
426             # Read the response stream to a string
427             self.peers[site]['pieces'] = ''
428             def _gotPeerPiece(data, self = self, site = site):
429                 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
430                 self.peers[site]['pieces'] += data
431             log.msg('Streaming piece hashes from peer')
432             df = stream.readStream(response.stream, _gotPeerPiece)
433             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
434                             callbackArgs=(key, site), errbackArgs=(key, site))
435
436     def _gotPeerError(self, err, key, site):
437         """Peer failed, try again."""
438         log.msg('Peer piece hash request failed for %r' % (site, ))
439         log.err(err)
440         self.getPeerPieces(key, site)
441
442     def _gotPeerPieces(self, result, key, site):
443         """Check the retrieved pieces from the peer."""
444         log.msg('Finished streaming piece hashes from peer %r' % (site, ))
445         if self.pieces is not None:
446             # Already done
447             log.msg('Already done')
448             return
449         
450         try:
451             result = bdecode(self.peers[site]['pieces'])
452         except:
453             log.msg('Error bdecoding piece hashes')
454             log.err()
455             self.getPeerPieces(key, site)
456             return
457             
458         result_hash = sha.new(result.get('t', '')).digest()
459         if result_hash == key:
460             pieces = result['t']
461             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
462             log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
463             self.startDownload()
464         else:
465             log.msg('Peer returned a piece string that did not match')
466             self.getPeerPieces(key, site)
467
468     #{ Downloading the file
469     def sort(self):
470         """Sort the peers by their rank (highest ranked at the end)."""
471         def sort(a, b):
472             """Sort peers by their rank."""
473             if a.rank > b.rank:
474                 return 1
475             elif a.rank < b.rank:
476                 return -1
477             return 0
478         self.peerlist.sort(sort)
479
480     def startDownload(self):
481         """Start the download from the peers."""
482         # Don't start twice
483         if self.started:
484             return
485         
486         log.msg('Starting to download %s' % self.path)
487         self.started = True
488         assert self.pieces, "You must initialize the piece hashes first"
489         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
490         
491         # Use the mirror if there are few peers
492         if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
493             parsed = urlparse(self.mirror)
494             if parsed[0] == "http":
495                 site = splitHostPort(parsed[0], parsed[1])
496                 self.mirror_path = urlunparse(('', '') + parsed[2:])
497                 peer = self.manager.getPeer(site)
498                 peer.mirror = True
499                 self.peerlist.append(peer)
500         
501         # Special case if there's only one good peer left
502 #        if len(self.peerlist) == 1:
503 #            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
504 #            self.defer.callback(self.peerlist[0].get(self.path))
505 #            return
506         
507         # Start sending the return file
508         self.stream = GrowingFileStream(self.file, self.hash.expSize)
509         resp = Response(200, {}, self.stream)
510         self.defer.callback(resp)
511
512         # Begin to download the pieces
513         self.outstanding = 0
514         self.nextFinish = 0
515         self.completePieces = [False for piece in self.pieces]
516         self.getPieces()
517         
518     #{ Downloading the pieces
519     def getPieces(self):
520         """Download the next pieces from the peers."""
521         log.msg('Checking for more piece requests to send')
522         self.sort()
523         piece = self.nextFinish
524         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
525             log.msg('Checking piece %d' % piece)
526             if self.completePieces[piece] == False:
527                 # Send a request to the highest ranked peer
528                 peer = self.peerlist.pop()
529                 self.completePieces[piece] = peer
530                 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
531                 
532                 self.outstanding += 1
533                 if peer.mirror:
534                     df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
535                 else:
536                     df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
537                 reactor.callLater(0, df.addCallbacks,
538                                   *(self._getPiece, self._getError),
539                                   **{'callbackArgs': (piece, peer),
540                                      'errbackArgs': (piece, peer)})
541             piece += 1
542                 
543         log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
544         # Check if we're done
545         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
546             log.msg('We seem to be done with all pieces')
547             self.stream.allAvailable()
548     
549     def _getPiece(self, response, piece, peer):
550         """Process the retrieved headers from the peer."""
551         log.msg('Got response for piece %d from peer %r' % (piece, peer))
552         if ((len(self.completePieces) > 1 and response.code != 206) or
553             (response.code not in (200, 206))):
554             # Request failed, try a different peer
555             log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
556             peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
557             self.completePieces[piece] = False
558             if response.stream and response.stream.length:
559                 stream.readAndDiscard(response.stream)
560         else:
561             # Read the response stream to the file
562             log.msg('Streaming piece %d from peer %r' % (piece, peer))
563             if response.code == 206:
564                 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
565             else:
566                 df = StreamToFile(response.stream, self.file).run()
567             df.addCallbacks(self._gotPiece, self._gotError,
568                             callbackArgs=(piece, peer), errbackArgs=(piece, peer))
569
570         self.outstanding -= 1
571         self.peerlist.append(peer)
572         self.getPieces()
573
574     def _getError(self, err, piece, peer):
575         """Peer failed, try again."""
576         log.msg('Got error for piece %d from peer %r' % (piece, peer))
577         self.outstanding -= 1
578         self.peerlist.append(peer)
579         self.completePieces[piece] = False
580         self.getPieces()
581         log.err(err)
582
583     def _gotPiece(self, response, piece, peer):
584         """Process the retrieved piece from the peer."""
585         log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
586         if self.pieces[piece] and response != self.pieces[piece]:
587             # Hash doesn't match
588             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
589             peer.hashError('Piece received from peer does not match expected')
590             self.completePieces[piece] = False
591         else:
592             # Successfully completed one of several pieces
593             log.msg('Finished with piece %d from peer %r' % (piece, peer))
594             self.completePieces[piece] = True
595             while (self.nextFinish < len(self.completePieces) and
596                    self.completePieces[self.nextFinish] == True):
597                 self.nextFinish += 1
598                 self.stream.updateAvailable(PIECE_SIZE)
599
600         self.getPieces()
601
602     def _gotError(self, err, piece, peer):
603         """Piece download failed, try again."""
604         log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
605         log.err(err)
606         self.completePieces[piece] = False
607         self.getPieces()
608         
609 class PeerManager:
610     """Manage a set of peers and the requests to them.
611     
612     @type cache_dir: L{twisted.python.filepath.FilePath}
613     @ivar cache_dir: the directory to use for storing all files
614     @type dht: L{interfaces.IDHT}
615     @ivar dht: the DHT instance
616     @type clients: C{dictionary}
617     @ivar clients: the available peers that have been previously contacted
618     """
619
620     def __init__(self, cache_dir, dht):
621         """Initialize the instance."""
622         self.cache_dir = cache_dir
623         self.cache_dir.restat(False)
624         if not self.cache_dir.exists():
625             self.cache_dir.makedirs()
626         self.dht = dht
627         self.clients = {}
628         
629     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
630         """Download from a list of peers or fallback to a mirror.
631         
632         @type hash: L{Hash.HashObject}
633         @param hash: the hash object containing the expected hash for the file
634         @param mirror: the URI of the file on the mirror
635         @type peers: C{list} of C{string}
636         @param peers: a list of the peer info where the file can be found
637             (optional, defaults to downloading from the mirror)
638         @type method: C{string}
639         @param method: the HTTP method to use, 'GET' or 'HEAD'
640             (optional, defaults to 'GET')
641         @type modtime: C{int}
642         @param modtime: the modification time to use for an 'If-Modified-Since'
643             header, as seconds since the epoch
644             (optional, defaults to not sending that header)
645         """
646         if not peers or method != "GET" or modtime is not None:
647             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
648             parsed = urlparse(mirror)
649             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
650             site = splitHostPort(parsed[0], parsed[1])
651             path = urlunparse(('', '') + parsed[2:])
652             peer = self.getPeer(site)
653             peer.mirror = True
654             return peer.get(path, method, modtime)
655 #        elif len(peers) == 1:
656 #            site = uncompact(peers[0]['c'])
657 #            log.msg('Downloading from peer %r' % (site, ))
658 #            path = '/~/' + quote_plus(hash.expected())
659 #            peer = self.getPeer(site)
660 #            return peer.get(path)
661         else:
662             tmpfile = self.cache_dir.child(hash.hexexpected())
663             return FileDownload(self, hash, mirror, peers, tmpfile).run()
664         
665     def getPeer(self, site):
666         """Create a new peer if necessary and return it.
667         
668         @type site: (C{string}, C{int})
669         @param site: the IP address and port of the peer
670         """
671         if site not in self.clients:
672             self.clients[site] = Peer(site[0], site[1])
673         return self.clients[site]
674     
675     def close(self):
676         """Close all the connections to peers."""
677         for site in self.clients:
678             self.clients[site].close()
679         self.clients = {}
680
681 class TestPeerManager(unittest.TestCase):
682     """Unit tests for the PeerManager."""
683     
684     manager = None
685     pending_calls = []
686     
687     def tearDown(self):
688         for p in self.pending_calls:
689             if p.active():
690                 p.cancel()
691         self.pending_calls = []
692         if self.manager:
693             self.manager.close()
694             self.manager = None