Add statistics reporting to the main program (untested).
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
8 import sha
9
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
15
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20 from apt_p2p_conf import config
21
22 class GrowingFileStream(stream.FileStream):
23     """Modified to stream data from a file as it becomes available.
24     
25     @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
26     @ivar deferred: waiting for the result of the last read attempt
27     @ivar available: the number of bytes that are currently available to read
28     @ivar position: the current position in the file where the next read will begin
29     @ivar finished: True when no more data will be coming available
30     """
31
32     CHUNK_SIZE = 4*1024
33
34     def __init__(self, f, length = None):
35         stream.FileStream.__init__(self, f)
36         self.length = length
37         self.deferred = None
38         self.available = 0L
39         self.position = 0L
40         self.finished = False
41
42     def updateAvailable(self, newlyAvailable):
43         """Update the number of bytes that are available.
44         
45         Call it with 0 to trigger reading of a fully read file.
46         
47         @param newlyAvailable: the number of bytes that just became available
48         """
49         assert not self.finished
50         self.available += newlyAvailable
51         
52         # If a read is pending, let it go
53         if self.deferred and self.position < self.available:
54             # Try to read some data from the file
55             length = self.available - self.position
56             readSize = min(length, self.CHUNK_SIZE)
57             self.f.seek(self.position)
58             b = self.f.read(readSize)
59             bytesRead = len(b)
60             
61             # Check if end of file was reached
62             if bytesRead:
63                 self.position += bytesRead
64                 deferred = self.deferred
65                 self.deferred = None
66                 deferred.callback(b)
67
68     def allAvailable(self):
69         """Indicate that no more data will be coming available."""
70         self.finished = True
71
72         # If a read is pending, let it go
73         if self.deferred:
74             if self.position < self.available:
75                 # Try to read some data from the file
76                 length = self.available - self.position
77                 readSize = min(length, self.CHUNK_SIZE)
78                 self.f.seek(self.position)
79                 b = self.f.read(readSize)
80                 bytesRead = len(b)
81     
82                 # Check if end of file was reached
83                 if bytesRead:
84                     self.position += bytesRead
85                     deferred = self.deferred
86                     self.deferred = None
87                     deferred.callback(b)
88                 else:
89                     # We're done
90                     deferred = self.deferred
91                     self.deferred = None
92                     deferred.callback(None)
93             else:
94                 # We're done
95                 deferred = self.deferred
96                 self.deferred = None
97                 deferred.callback(None)
98         
99     def read(self, sendfile=False):
100         assert not self.deferred, "A previous read is still deferred."
101
102         if self.f is None:
103             return None
104
105         length = self.available - self.position
106         readSize = min(length, self.CHUNK_SIZE)
107
108         # If we don't have any available, we're done or deferred
109         if readSize <= 0:
110             if self.finished:
111                 return None
112             else:
113                 self.deferred = defer.Deferred()
114                 return self.deferred
115
116         # Try to read some data from the file
117         self.f.seek(self.position)
118         b = self.f.read(readSize)
119         bytesRead = len(b)
120         if not bytesRead:
121             # End of file was reached, we're done or deferred
122             if self.finished:
123                 return None
124             else:
125                 self.deferred = defer.Deferred()
126                 return self.deferred
127         else:
128             self.position += bytesRead
129             return b
130
131 class StreamToFile:
132     """Save a stream to a partial file and hash it.
133     
134     @type stream: L{twisted.web2.stream.IByteStream}
135     @ivar stream: the input stream being read
136     @type outFile: L{twisted.python.filepath.FilePath}
137     @ivar outFile: the file being written
138     @type hash: C{sha1}
139     @ivar hash: the hash object for the data
140     @type position: C{int}
141     @ivar position: the current file position to write the next data to
142     @type length: C{int}
143     @ivar length: the position in the file to not write beyond
144     @type inform: C{method}
145     @ivar inform: a function to call with the length of data received
146     @type doneDefer: L{twisted.internet.defer.Deferred}
147     @ivar doneDefer: the deferred that will fire when done writing
148     """
149     
150     def __init__(self, inputStream, outFile, start = 0, length = None, inform = None):
151         """Initializes the file.
152         
153         @type inputStream: L{twisted.web2.stream.IByteStream}
154         @param inputStream: the input stream to read from
155         @type outFile: L{twisted.python.filepath.FilePath}
156         @param outFile: the file to write to
157         @type start: C{int}
158         @param start: the file position to start writing at
159             (optional, defaults to the start of the file)
160         @type length: C{int}
161         @param length: the maximum amount of data to write to the file
162             (optional, defaults to not limiting the writing to the file
163         @type inform: C{method}
164         @param inform: a function to call with the length of data received
165             (optional, defaults to calling nothing)
166         """
167         self.stream = inputStream
168         self.outFile = outFile
169         self.hash = sha.new()
170         self.position = start
171         self.length = None
172         if length is not None:
173             self.length = start + length
174         self.inform = inform
175         self.doneDefer = None
176         
177     def run(self):
178         """Start the streaming.
179
180         @rtype: L{twisted.internet.defer.Deferred}
181         """
182         log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
183         self.doneDefer = stream.readStream(self.stream, self._gotData)
184         self.doneDefer.addCallbacks(self._done, self._error)
185         return self.doneDefer
186
187     def _gotData(self, data):
188         """Process the received data."""
189         if self.outFile.closed:
190             raise Exception, "outFile was unexpectedly closed"
191         
192         if data is None:
193             raise Exception, "Data is None?"
194         
195         # Make sure we don't go too far
196         if self.length is not None and self.position + len(data) > self.length:
197             data = data[:(self.length - self.position)]
198         
199         # Write and hash the streamed data
200         self.outFile.seek(self.position)
201         self.outFile.write(data)
202         self.hash.update(data)
203         self.position += len(data)
204         self.inform(len(data))
205         
206     def _done(self, result):
207         """Return the result."""
208         log.msg('Streaming is complete')
209         return self.hash.digest()
210     
211     def _error(self, err):
212         """Log the error."""
213         log.msg('Streaming error')
214         log.err(err)
215         return err
216     
217 class FileDownload:
218     """Manage a download from a list of peers or a mirror.
219     
220     @type manager: L{PeerManager}
221     @ivar manager: the manager to send requests for peers to
222     @type hash: L{Hash.HashObject}
223     @ivar hash: the hash object containing the expected hash for the file
224     @ivar mirror: the URI of the file on the mirror
225     @type compact_peers: C{list} of C{dictionary}
226     @ivar compact_peers: a list of the peer info where the file can be found
227     @type file: C{file}
228     @ivar file: the open file to right the download to
229     @type path: C{string}
230     @ivar path: the path to request from peers to access the file
231     @type pieces: C{list} of C{string} 
232     @ivar pieces: the hashes of the pieces in the file
233     @type started: C{boolean}
234     @ivar started: whether the download has begun yet
235     @type defer: L{twisted.internet.defer.Deferred}
236     @ivar defer: the deferred that will callback with the result of the download
237     @type peers: C{dictionary}
238     @ivar peers: information about each of the peers available to download from
239     @type outstanding: C{int}
240     @ivar outstanding: the number of requests to peers currently outstanding
241     @type peerlist: C{list} of L{HTTPDownloader.Peer}
242     @ivar peerlist: the sorted list of peers for this download
243     @type stream: L{GrowingFileStream}
244     @ivar stream: the stream of resulting data from the download
245     @type nextFinish: C{int}
246     @ivar nextFinish: the next piece that is needed to finish for the stream
247     @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
248     @ivar completePieces: one per piece, will be False if no requests are
249         outstanding for the piece, True if the piece has been successfully
250         downloaded, or the Peer that a request for this piece has been sent  
251     """
252     
253     def __init__(self, manager, hash, mirror, compact_peers, file):
254         """Initialize the instance and check for piece hashes.
255         
256         @type manager: L{PeerManager}
257         @param manager: the manager to send requests for peers to
258         @type hash: L{Hash.HashObject}
259         @param hash: the hash object containing the expected hash for the file
260         @param mirror: the URI of the file on the mirror
261         @type compact_peers: C{list} of C{dictionary}
262         @param compact_peers: a list of the peer info where the file can be found
263         @type file: L{twisted.python.filepath.FilePath}
264         @param file: the temporary file to use to store the downloaded file
265         """
266         self.manager = manager
267         self.hash = hash
268         self.mirror = mirror
269         self.compact_peers = compact_peers
270         
271         self.path = '/~/' + quote_plus(hash.expected())
272         self.mirror_path = None
273         self.pieces = None
274         self.started = False
275         
276         file.restat(False)
277         if file.exists():
278             file.remove()
279         self.file = file.open('w+')
280
281     def run(self):
282         """Start the downloading process."""
283         log.msg('Checking for pieces for %s' % self.path)
284         self.defer = defer.Deferred()
285         self.peers = {}
286         no_pieces = 0
287         pieces_string = {0: 0}
288         pieces_hash = {0: 0}
289         pieces_dl_hash = {0: 0}
290
291         for compact_peer in self.compact_peers:
292             # Build a list of all the peers for this download
293             site = uncompact(compact_peer['c'])
294             peer = self.manager.getPeer(site)
295             self.peers.setdefault(site, {})['peer'] = peer
296
297             # Extract any piece information from the peers list
298             if 't' in compact_peer:
299                 self.peers[site]['t'] = compact_peer['t']['t']
300                 pieces_string.setdefault(compact_peer['t']['t'], 0)
301                 pieces_string[compact_peer['t']['t']] += 1
302             elif 'h' in compact_peer:
303                 self.peers[site]['h'] = compact_peer['h']
304                 pieces_hash.setdefault(compact_peer['h'], 0)
305                 pieces_hash[compact_peer['h']] += 1
306             elif 'l' in compact_peer:
307                 self.peers[site]['l'] = compact_peer['l']
308                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
309                 pieces_dl_hash[compact_peer['l']] += 1
310             else:
311                 no_pieces += 1
312         
313         # Select the most popular piece info
314         max_found = max(no_pieces, max(pieces_string.values()),
315                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
316
317         if max_found < len(self.peers):
318             log.msg('Misleading piece information found, using most popular %d of %d peers' % 
319                     (max_found, len(self.peers)))
320
321         if max_found == no_pieces:
322             # The file is not split into pieces
323             log.msg('No pieces were found for the file')
324             self.pieces = [self.hash.expected()]
325             self.startDownload()
326         elif max_found == max(pieces_string.values()):
327             # Small number of pieces in a string
328             for pieces, num in pieces_string.items():
329                 # Find the most popular piece string
330                 if num == max_found:
331                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
332                     log.msg('Peer info contained %d piece hashes' % len(self.pieces))
333                     self.startDownload()
334                     break
335         elif max_found == max(pieces_hash.values()):
336             # Medium number of pieces stored in the DHT
337             for pieces, num in pieces_hash.items():
338                 # Find the most popular piece hash to lookup
339                 if num == max_found:
340                     log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
341                     self.getDHTPieces(pieces)
342                     break
343         elif max_found == max(pieces_dl_hash.values()):
344             # Large number of pieces stored in peers
345             for pieces, num in pieces_dl_hash.items():
346                 # Find the most popular piece hash to download
347                 if num == max_found:
348                     log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
349                     self.getPeerPieces(pieces)
350                     break
351         return self.defer
352
353     #{ Downloading the piece hashes
354     def getDHTPieces(self, key):
355         """Retrieve the piece information from the DHT.
356         
357         @param key: the key to lookup in the DHT
358         """
359         # Remove any peers with the wrong piece hash
360         #for site in self.peers.keys():
361         #    if self.peers[site].get('h', '') != key:
362         #        del self.peers[site]
363
364         # Start the DHT lookup
365         lookupDefer = self.manager.dht.getValue(key)
366         lookupDefer.addBoth(self._getDHTPieces, key)
367         
368     def _getDHTPieces(self, results, key):
369         """Check the retrieved values."""
370         if isinstance(results, list):
371             for result in results:
372                 # Make sure the hash matches the key
373                 result_hash = sha.new(result.get('t', '')).digest()
374                 if result_hash == key:
375                     pieces = result['t']
376                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
377                     log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
378                     self.startDownload()
379                     return
380                 
381             log.msg('Could not retrieve the piece hashes from the DHT')
382         else:
383             log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
384             
385         # Continue without the piece hashes
386         self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
387         self.startDownload()
388
389     def getPeerPieces(self, key, failedSite = None):
390         """Retrieve the piece information from the peers.
391         
392         @param key: the key to request from the peers
393         """
394         if failedSite is None:
395             log.msg('Starting the lookup of piece hashes in peers')
396             self.outstanding = 0
397             # Remove any peers with the wrong piece hash
398             #for site in self.peers.keys():
399             #    if self.peers[site].get('l', '') != key:
400             #        del self.peers[site]
401         else:
402             log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
403             self.peers[failedSite]['failed'] = True
404             self.outstanding -= 1
405
406         if self.pieces is None:
407             # Send a request to one or more peers
408             log.msg('Checking for a peer to request piece hashes from')
409             for site in self.peers:
410                 if self.peers[site].get('failed', False) != True:
411                     log.msg('Sending a piece hash request to %r' % (site, ))
412                     path = '/~/' + quote_plus(key)
413                     lookupDefer = self.peers[site]['peer'].get(path)
414                     reactor.callLater(0, lookupDefer.addCallbacks,
415                                       *(self._getPeerPieces, self._gotPeerError),
416                                       **{'callbackArgs': (key, site),
417                                          'errbackArgs': (key, site)})
418                     self.outstanding += 1
419                     if self.outstanding >= 4:
420                         break
421         
422         log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
423         if self.pieces is None and self.outstanding <= 0:
424             # Continue without the piece hashes
425             log.msg('Could not retrieve the piece hashes from the peers')
426             self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
427             self.startDownload()
428         
429     def _getPeerPieces(self, response, key, site):
430         """Process the retrieved response from the peer."""
431         log.msg('Got a piece hash response %d from %r' % (response.code, site))
432         if response.code != 200:
433             # Request failed, try a different peer
434             log.msg('Did not like response %d from %r' % (response.code, site))
435             self.getPeerPieces(key, site)
436         else:
437             # Read the response stream to a string
438             self.peers[site]['pieces'] = ''
439             def _gotPeerPiece(data, self = self, site = site):
440                 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
441                 self.peers[site]['pieces'] += data
442             log.msg('Streaming piece hashes from peer')
443             df = stream.readStream(response.stream, _gotPeerPiece)
444             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
445                             callbackArgs=(key, site), errbackArgs=(key, site))
446
447     def _gotPeerError(self, err, key, site):
448         """Peer failed, try again."""
449         log.msg('Peer piece hash request failed for %r' % (site, ))
450         log.err(err)
451         self.getPeerPieces(key, site)
452
453     def _gotPeerPieces(self, result, key, site):
454         """Check the retrieved pieces from the peer."""
455         log.msg('Finished streaming piece hashes from peer %r' % (site, ))
456         if self.pieces is not None:
457             # Already done
458             log.msg('Already done')
459             return
460         
461         try:
462             result = bdecode(self.peers[site]['pieces'])
463         except:
464             log.msg('Error bdecoding piece hashes')
465             log.err()
466             self.getPeerPieces(key, site)
467             return
468             
469         result_hash = sha.new(result.get('t', '')).digest()
470         if result_hash == key:
471             pieces = result['t']
472             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
473             log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
474             self.startDownload()
475         else:
476             log.msg('Peer returned a piece string that did not match')
477             self.getPeerPieces(key, site)
478
479     #{ Downloading the file
480     def sort(self):
481         """Sort the peers by their rank (highest ranked at the end)."""
482         def sort(a, b):
483             """Sort peers by their rank."""
484             if a.rank > b.rank:
485                 return 1
486             elif a.rank < b.rank:
487                 return -1
488             return 0
489         self.peerlist.sort(sort)
490
491     def startDownload(self):
492         """Start the download from the peers."""
493         # Don't start twice
494         if self.started:
495             return
496         
497         log.msg('Starting to download %s' % self.path)
498         self.started = True
499         assert self.pieces, "You must initialize the piece hashes first"
500         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
501         
502         # Use the mirror if there are few peers
503         if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
504             parsed = urlparse(self.mirror)
505             if parsed[0] == "http":
506                 site = splitHostPort(parsed[0], parsed[1])
507                 self.mirror_path = urlunparse(('', '') + parsed[2:])
508                 peer = self.manager.getPeer(site)
509                 peer.mirror = True
510                 self.peerlist.append(peer)
511         
512         # Special case if there's only one good peer left
513 #        if len(self.peerlist) == 1:
514 #            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
515 #            self.defer.callback(self.peerlist[0].get(self.path))
516 #            return
517         
518         # Start sending the return file
519         self.stream = GrowingFileStream(self.file, self.hash.expSize)
520         resp = Response(200, {}, self.stream)
521         self.defer.callback(resp)
522
523         # Begin to download the pieces
524         self.outstanding = 0
525         self.nextFinish = 0
526         self.completePieces = [False for piece in self.pieces]
527         self.getPieces()
528         
529     #{ Downloading the pieces
530     def getPieces(self):
531         """Download the next pieces from the peers."""
532         log.msg('Checking for more piece requests to send')
533         self.sort()
534         piece = self.nextFinish
535         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
536             log.msg('Checking piece %d' % piece)
537             if self.completePieces[piece] == False:
538                 # Send a request to the highest ranked peer
539                 peer = self.peerlist.pop()
540                 self.completePieces[piece] = peer
541                 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
542                 
543                 self.outstanding += 1
544                 if peer.mirror:
545                     df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
546                 else:
547                     df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
548                 reactor.callLater(0, df.addCallbacks,
549                                   *(self._getPiece, self._getError),
550                                   **{'callbackArgs': (piece, peer),
551                                      'errbackArgs': (piece, peer)})
552             piece += 1
553                 
554         log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
555         # Check if we're done
556         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
557             log.msg('We seem to be done with all pieces')
558             self.stream.allAvailable()
559     
560     def _getPiece(self, response, piece, peer):
561         """Process the retrieved headers from the peer."""
562         log.msg('Got response for piece %d from peer %r' % (piece, peer))
563         if ((len(self.completePieces) > 1 and response.code != 206) or
564             (response.code not in (200, 206))):
565             # Request failed, try a different peer
566             log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
567             peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
568             self.completePieces[piece] = False
569             if response.stream and response.stream.length:
570                 stream.readAndDiscard(response.stream)
571         else:
572             # Read the response stream to the file
573             log.msg('Streaming piece %d from peer %r' % (piece, peer))
574             def statUpdate(bytes, stats = self.manager.stats, mirror = peer.mirror):
575                 stats.receivedBytes(bytes, mirror)
576             if response.code == 206:
577                 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
578                                   PIECE_SIZE, statUpdate).run()
579             else:
580                 df = StreamToFile(response.stream, self.file,
581                                   inform = statUpdate).run()
582             reactor.callLater(0, df.addCallbacks,
583                               *(self._gotPiece, self._gotError),
584                               **{'callbackArgs': (piece, peer),
585                                  'errbackArgs': (piece, peer)})
586
587         self.outstanding -= 1
588         self.peerlist.append(peer)
589         self.getPieces()
590
591     def _getError(self, err, piece, peer):
592         """Peer failed, try again."""
593         log.msg('Got error for piece %d from peer %r' % (piece, peer))
594         self.outstanding -= 1
595         self.peerlist.append(peer)
596         self.completePieces[piece] = False
597         self.getPieces()
598         log.err(err)
599
600     def _gotPiece(self, response, piece, peer):
601         """Process the retrieved piece from the peer."""
602         log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
603         if self.pieces[piece] and response != self.pieces[piece]:
604             # Hash doesn't match
605             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
606             peer.hashError('Piece received from peer does not match expected')
607             self.completePieces[piece] = False
608         else:
609             # Successfully completed one of several pieces
610             log.msg('Finished with piece %d from peer %r' % (piece, peer))
611             self.completePieces[piece] = True
612             while (self.nextFinish < len(self.completePieces) and
613                    self.completePieces[self.nextFinish] == True):
614                 self.nextFinish += 1
615                 self.stream.updateAvailable(PIECE_SIZE)
616
617         self.getPieces()
618
619     def _gotError(self, err, piece, peer):
620         """Piece download failed, try again."""
621         log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
622         log.err(err)
623         self.completePieces[piece] = False
624         self.getPieces()
625         
626 class PeerManager:
627     """Manage a set of peers and the requests to them.
628     
629     @type cache_dir: L{twisted.python.filepath.FilePath}
630     @ivar cache_dir: the directory to use for storing all files
631     @type dht: L{interfaces.IDHT}
632     @ivar dht: the DHT instance
633     @type stats: L{stats.StatsLogger}
634     @ivar stats: the statistics logger to record sent data to
635     @type clients: C{dictionary}
636     @ivar clients: the available peers that have been previously contacted
637     """
638
639     def __init__(self, cache_dir, dht, stats):
640         """Initialize the instance."""
641         self.cache_dir = cache_dir
642         self.cache_dir.restat(False)
643         if not self.cache_dir.exists():
644             self.cache_dir.makedirs()
645         self.dht = dht
646         self.stats = stats
647         self.clients = {}
648         
649     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
650         """Download from a list of peers or fallback to a mirror.
651         
652         @type hash: L{Hash.HashObject}
653         @param hash: the hash object containing the expected hash for the file
654         @param mirror: the URI of the file on the mirror
655         @type peers: C{list} of C{string}
656         @param peers: a list of the peer info where the file can be found
657             (optional, defaults to downloading from the mirror)
658         @type method: C{string}
659         @param method: the HTTP method to use, 'GET' or 'HEAD'
660             (optional, defaults to 'GET')
661         @type modtime: C{int}
662         @param modtime: the modification time to use for an 'If-Modified-Since'
663             header, as seconds since the epoch
664             (optional, defaults to not sending that header)
665         """
666         if not peers or method != "GET" or modtime is not None:
667             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
668             parsed = urlparse(mirror)
669             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
670             site = splitHostPort(parsed[0], parsed[1])
671             path = urlunparse(('', '') + parsed[2:])
672             peer = self.getPeer(site)
673             peer.mirror = True
674             return peer.get(path, method, modtime)
675 #        elif len(peers) == 1:
676 #            site = uncompact(peers[0]['c'])
677 #            log.msg('Downloading from peer %r' % (site, ))
678 #            path = '/~/' + quote_plus(hash.expected())
679 #            peer = self.getPeer(site)
680 #            return peer.get(path)
681         else:
682             tmpfile = self.cache_dir.child(hash.hexexpected())
683             return FileDownload(self, hash, mirror, peers, tmpfile).run()
684         
685     def getPeer(self, site):
686         """Create a new peer if necessary and return it.
687         
688         @type site: (C{string}, C{int})
689         @param site: the IP address and port of the peer
690         """
691         if site not in self.clients:
692             self.clients[site] = Peer(site[0], site[1])
693         return self.clients[site]
694     
695     def close(self):
696         """Close all the connections to peers."""
697         for site in self.clients:
698             self.clients[site].close()
699         self.clients = {}
700
701 class TestPeerManager(unittest.TestCase):
702     """Unit tests for the PeerManager."""
703     
704     manager = None
705     pending_calls = []
706     
707     def tearDown(self):
708         for p in self.pending_calls:
709             if p.active():
710                 p.cancel()
711         self.pending_calls = []
712         if self.manager:
713             self.manager.close()
714             self.manager = None