2 """Manage a set of peers and the requests to them."""
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
21 class GrowingFileStream(stream.FileStream):
22 """Modified to stream data from a file as it becomes available.
24 @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
25 @ivar deferred: waiting for the result of the last read attempt
26 @ivar available: the number of bytes that are currently available to read
27 @ivar position: the current position in the file where the next read will begin
28 @ivar finished: True when no more data will be coming available
33 def __init__(self, f):
34 stream.FileStream.__init__(self, f)
41 def updateAvaliable(self, newlyAvailable):
42 """Update the number of bytes that are available.
44 Call it with 0 to trigger reading of a fully read file.
46 @param newlyAvailable: the number of bytes that just became available
48 assert not self.finished
49 self.available += newlyAvailable
51 # If a read is pending, let it go
52 if self.deferred and self.position < self.available:
53 # Try to read some data from the file
54 length = self.available - self.position
55 readSize = min(length, self.CHUNK_SIZE)
56 self.f.seek(self.position)
57 b = self.f.read(readSize)
60 # Check if end of file was reached
62 self.position += bytesRead
63 deferred = self.deferred
67 def allAvailable(self):
68 """Indicate that no more data will be coming available."""
71 # If a read is pending, let it go
73 if self.position < self.available:
74 # Try to read some data from the file
75 length = self.available - self.position
76 readSize = min(length, self.CHUNK_SIZE)
77 self.f.seek(self.position)
78 b = self.f.read(readSize)
81 # Check if end of file was reached
83 self.position += bytesRead
84 deferred = self.deferred
89 deferred.callback(None)
92 deferred.callback(None)
94 def read(self, sendfile=False):
95 assert not self.deferred, "A previous read is still deferred."
100 length = self.available - self.position
101 readSize = min(length, self.CHUNK_SIZE)
103 # If we don't have any available, we're done or deferred
108 self.deferred = defer.Deferred()
111 # Try to read some data from the file
112 self.f.seek(self.position)
113 b = self.f.read(readSize)
116 # End of file was reached, we're done or deferred
120 self.deferred = defer.Deferred()
123 self.position += bytesRead
126 class StreamToFile(defer.Deferred):
127 """Save a stream to a partial file and hash it.
129 @type stream: L{twisted.web2.stream.IByteStream}
130 @ivar stream: the input stream being read
131 @type outFile: L{twisted.python.filepath.FilePath}
132 @ivar outFile: the file being written
134 @ivar hash: the hash object for the data
135 @type position: C{int}
136 @ivar position: the current file position to write the next data to
138 @ivar length: the position in the file to not write beyond
139 @type doneDefer: L{twisted.internet.defer.Deferred}
140 @ivar doneDefer: the deferred that will fire when done writing
143 def __init__(self, inputStream, outFile, start = 0, length = None):
144 """Initializes the file.
146 @type inputStream: L{twisted.web2.stream.IByteStream}
147 @param inputStream: the input stream to read from
148 @type outFile: L{twisted.python.filepath.FilePath}
149 @param outFile: the file to write to
151 @param start: the file position to start writing at
152 (optional, defaults to the start of the file)
154 @param length: the maximum amount of data to write to the file
155 (optional, defaults to not limiting the writing to the file
157 self.stream = inputStream
158 self.outFile = outFile
159 self.hash = sha.new()
160 self.position = start
162 if length is not None:
163 self.length = start + length
164 self.doneDefer = None
167 """Start the streaming.
169 @rtype: L{twisted.internet.defer.Deferred}
171 self.doneDefer = stream.readStream(self.stream, self._gotData)
172 self.doneDefer.addCallbacks(self._done, self._error)
173 return self.doneDefer
175 def _gotData(self, data):
176 """Process the received data."""
177 if self.outFile.closed:
178 raise Exception, "outFile was unexpectedly closed"
181 raise Exception, "Data is None?"
183 # Make sure we don't go too far
184 if self.length is not None and self.position + len(data) > self.length:
185 data = data[:(self.length - self.position)]
187 # Write and hash the streamed data
188 self.outFile.seek(self.position)
189 self.outFile.write(data)
190 self.hash.update(data)
191 self.position += len(data)
193 def _done(self, result):
194 """Return the result."""
195 return self.hash.digest()
197 def _error(self, err):
203 """Manage a download from a list of peers or a mirror.
205 @type manager: L{PeerManager}
206 @ivar manager: the manager to send requests for peers to
207 @type hash: L{Hash.HashObject}
208 @ivar hash: the hash object containing the expected hash for the file
209 @ivar mirror: the URI of the file on the mirror
210 @type compact_peers: C{list} of C{dictionary}
211 @ivar compact_peers: a list of the peer info where the file can be found
213 @ivar file: the open file to right the download to
214 @type path: C{string}
215 @ivar path: the path to request from peers to access the file
216 @type pieces: C{list} of C{string}
217 @ivar pieces: the hashes of the pieces in the file
218 @type started: C{boolean}
219 @ivar started: whether the download has begun yet
220 @type defer: L{twisted.internet.defer.Deferred}
221 @ivar defer: the deferred that will callback with the result of the download
222 @type peers: C{dictionary}
223 @ivar peers: information about each of the peers available to download from
224 @type outstanding: C{int}
225 @ivar outstanding: the number of requests to peers currently outstanding
226 @type peerlist: C{list} of L{HTTPDownloader.Peer}
227 @ivar peerlist: the sorted list of peers for this download
228 @type stream: L{GrowingFileStream}
229 @ivar stream: the stream of resulting data from the download
230 @type nextFinish: C{int}
231 @ivar nextFinish: the next piece that is needed to finish for the stream
232 @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
233 @ivar completePieces: one per piece, will be False if no requests are
234 outstanding for the piece, True if the piece has been successfully
235 downloaded, or the Peer that a request for this piece has been sent
238 def __init__(self, manager, hash, mirror, compact_peers, file):
239 """Initialize the instance and check for piece hashes.
241 @type manager: L{PeerManager}
242 @param manager: the manager to send requests for peers to
243 @type hash: L{Hash.HashObject}
244 @param hash: the hash object containing the expected hash for the file
245 @param mirror: the URI of the file on the mirror
246 @type compact_peers: C{list} of C{dictionary}
247 @param compact_peers: a list of the peer info where the file can be found
248 @type file: L{twisted.python.filepath.FilePath}
249 @param file: the temporary file to use to store the downloaded file
251 self.manager = manager
254 self.compact_peers = compact_peers
256 self.path = '/~/' + quote_plus(hash.expected())
263 self.file = file.open('w')
266 """Start the downloading process."""
267 self.defer = defer.Deferred()
274 for compact_peer in self.compact_peers:
275 # Build a list of all the peers for this download
276 site = uncompact(compact_peer['c'])
277 peer = manager.getPeer(site)
278 self.peers.setdefault(site, {})['peer'] = peer
280 # Extract any piece information from the peers list
281 if 't' in compact_peer:
282 self.peers[site]['t'] = compact_peer['t']['t']
283 pieces_string.setdefault(compact_peer['t']['t'], 0)
284 pieces_string[compact_peer['t']['t']] += 1
285 elif 'h' in compact_peer:
286 self.peers[site]['h'] = compact_peer['h']
287 pieces_hash.setdefault(compact_peer['h'], 0)
288 pieces_hash[compact_peer['h']] += 1
289 elif 'l' in compact_peer:
290 self.peers[site]['l'] = compact_peer['l']
291 pieces_dl_hash.setdefault(compact_peer['l'], 0)
292 pieces_dl_hash[compact_peer['l']] += 1
296 # Select the most popular piece info
297 max_found = max(no_pieces, max(pieces_string.values()),
298 max(pieces_hash.values()), max(pieces_dl_hash.values()))
300 if max_found < len(self.peers):
301 log.msg('Misleading piece information found, using most popular %d of %d peers' %
302 (max_found, len(self.peers)))
304 if max_found == no_pieces:
305 # The file is not split into pieces
308 elif max_found == max(pieces_string.values()):
309 # Small number of pieces in a string
310 for pieces, num in pieces_string.items():
311 # Find the most popular piece string
313 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
314 log.msg('Peer info contained %d piece hashes' % len(self.pieces))
317 elif max_found == max(pieces_hash.values()):
318 # Medium number of pieces stored in the DHT
319 for pieces, num in pieces_hash.items():
320 # Find the most popular piece hash to lookup
322 self.getDHTPieces(pieces)
324 elif max_found == max(pieces_dl_hash.values()):
325 # Large number of pieces stored in peers
326 for pieces, num in pieces_hash.items():
327 # Find the most popular piece hash to download
329 self.getPeerPieces(pieces)
333 #{ Downloading the piece hashes
334 def getDHTPieces(self, key):
335 """Retrieve the piece information from the DHT.
337 @param key: the key to lookup in the DHT
339 # Remove any peers with the wrong piece hash
340 #for site in self.peers.keys():
341 # if self.peers[site].get('h', '') != key:
342 # del self.peers[site]
344 # Start the DHT lookup
345 lookupDefer = self.manager.dht.getValue(key)
346 lookupDefer.addCallback(self._getDHTPieces, key)
348 def _getDHTPieces(self, results, key):
349 """Check the retrieved values."""
350 for result in results:
351 # Make sure the hash matches the key
352 result_hash = sha.new(result.get('t', '')).digest()
353 if result_hash == key:
355 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
356 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
360 # Continue without the piece hashes
361 log.msg('Could not retrieve the piece hashes from the DHT')
365 def getPeerPieces(self, key, failedSite = None):
366 """Retrieve the piece information from the peers.
368 @param key: the key to request from the peers
370 if failedSite is None:
372 # Remove any peers with the wrong piece hash
373 #for site in self.peers.keys():
374 # if self.peers[site].get('l', '') != key:
375 # del self.peers[site]
377 self.peers[failedSite]['failed'] = True
378 self.outstanding -= 1
380 if self.pieces is None:
381 # Send a request to one or more peers
382 for site in self.peers:
383 if self.peers[site].get('failed', False) != True:
384 path = '/~/' + quote_plus(key)
385 lookupDefer = self.peers[site]['peer'].get(path)
386 lookupDefer.addCallbacks(self._getPeerPieces, self._gotPeerError,
387 callbackArgs=(key, site), errbackArgs=(key, site))
388 self.outstanding += 1
389 if self.outstanding >= 3:
392 if self.pieces is None and self.outstanding == 0:
393 # Continue without the piece hashes
394 log.msg('Could not retrieve the piece hashes from the peers')
398 def _getPeerPieces(self, response, key, site):
399 """Process the retrieved response from the peer."""
400 if response.code != 200:
401 # Request failed, try a different peer
402 self.getPeerPieces(key, site)
404 # Read the response stream to a string
405 self.peers[site]['pieces'] = ''
406 def _gotPeerPiece(data, self = self, site = site):
407 self.peers[site]['pieces'] += data
408 df = stream.readStream(response.stream, _gotPeerPiece)
409 df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
410 callbackArgs=(key, site), errbackArgs=(key, site))
412 def _gotPeerError(self, err, key, site):
413 """Peer failed, try again."""
415 self.getPeerPieces(key, site)
417 def _gotPeerPieces(self, result, key, site):
418 """Check the retrieved pieces from the peer."""
419 if self.pieces is not None:
424 result = bdecode(self.peers[site]['pieces'])
427 self.getPeerPieces(key, site)
430 result_hash = sha.new(result.get('t', '')).digest()
431 if result_hash == key:
433 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
434 log.msg('Retrieved %d piece hashes from the peer' % len(self.pieces))
437 log.msg('Peer returned a piece string that did not match')
438 self.getPeerPieces(key, site)
440 #{ Downloading the file
442 """Sort the peers by their rank (highest ranked at the end)."""
444 """Sort peers by their rank."""
447 elif a.rank < b.rank:
450 self.peerlist.sort(sort)
452 def startDownload(self):
453 """Start the download from the peers."""
459 assert self.pieces is not None, "You must initialize the piece hashes first"
460 self.peerlist = [self.peers[site]['peer'] for site in self.peers]
462 # Special case if there's only one good peer left
463 if len(self.peerlist) == 1:
464 log.msg('Downloading from peer %r' % (self.peerlist[0], ))
465 self.defer.callback(self.peerlist[0].get(self.path))
468 # Start sending the return file
469 self.stream = GrowingFileStream(self.file)
470 resp = Response(200, {}, self.stream)
471 self.defer.callback(resp)
473 # Begin to download the pieces
477 self.completePieces = [False for piece in self.pieces]
479 self.completePieces = [False]
482 #{ Downloading the pieces
484 """Download the next pieces from the peers."""
486 piece = self.nextFinish
487 while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
488 if self.completePieces[piece] == False:
489 # Send a request to the highest ranked peer
490 peer = self.peerlist.pop()
491 self.completePieces[piece] = peer
493 self.outstanding += 1
495 df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
497 df = peer.get(self.path)
498 reactor.callLater(0, df.addCallbacks,
499 *(self._getPiece, self._getError),
500 **{'callbackArgs': (piece, peer),
501 'errbackArgs': (piece, peer)})
505 if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
506 self.stream.allAvailable()
508 def _getPiece(self, response, piece, peer):
509 """Process the retrieved headers from the peer."""
510 if ((len(self.completePieces) > 1 and response.code != 206) or
511 (response.code not in (200, 206))):
512 # Request failed, try a different peer
514 self.completePieces[piece] = False
515 if response.stream and response.stream.length:
516 stream.readAndDiscard(response.stream)
518 # Read the response stream to the file
519 if response.code == 206:
520 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
522 df = StreamToFile(response.stream, self.file).run()
523 df.addCallbacks(self._gotPiece, self._gotError,
524 callbackArgs=(piece, peer), errbackArgs=(piece, peer))
526 self.outstanding -= 1
527 self.peerlist.append(peer)
530 def _getError(self, err, piece, peer):
531 """Peer failed, try again."""
532 self.outstanding -= 1
533 self.peerlist.append(peer)
534 self.completePieces[piece] = False
538 def _gotPiece(self, response, piece, peer):
539 """Process the retrieved piece from the peer."""
540 if ((self.pieces and response != self.pieces[piece]) or
541 (len(self.pieces) == 0 and response == self.hash.expected())):
544 self.completePieces[piece] = False
546 # Successfully completed one of several pieces
547 self.completePieces[piece] = True
548 while (self.nextFinish < len(self.completePieces) and
549 self.completePieces[self.nextFinish] == True):
551 self.stream.updateAvailable(PIECE_SIZE)
553 # Whole download (only one piece) is complete
554 self.completePieces[piece] = True
555 self.stream.updateAvailable(2**30)
559 def _gotError(self, err, piece, peer):
560 """Piece download failed, try again."""
562 self.completePieces[piece] = False
566 """Manage a set of peers and the requests to them.
568 @type cache_dir: L{twisted.python.filepath.FilePath}
569 @ivar cache_dir: the directory to use for storing all files
570 @type dht: L{interfaces.IDHT}
571 @ivar dht: the DHT instance
572 @type clients: C{dictionary}
573 @ivar clients: the available peers that have been previously contacted
576 def __init__(self, cache_dir, dht):
577 """Initialize the instance."""
578 self.cache_dir = cache_dir
579 self.cache_dir.restat(False)
580 if not self.cache_dir.exists():
581 self.cache_dir.makedirs()
585 def get(self, hash, mirror, peers = [], method="GET", modtime=None):
586 """Download from a list of peers or fallback to a mirror.
588 @type hash: L{Hash.HashObject}
589 @param hash: the hash object containing the expected hash for the file
590 @param mirror: the URI of the file on the mirror
591 @type peers: C{list} of C{string}
592 @param peers: a list of the peer info where the file can be found
593 (optional, defaults to downloading from the mirror)
594 @type method: C{string}
595 @param method: the HTTP method to use, 'GET' or 'HEAD'
596 (optional, defaults to 'GET')
597 @type modtime: C{int}
598 @param modtime: the modification time to use for an 'If-Modified-Since'
599 header, as seconds since the epoch
600 (optional, defaults to not sending that header)
602 if not peers or method != "GET" or modtime is not None:
603 log.msg('Downloading (%s) from mirror %s' % (method, mirror))
604 parsed = urlparse(mirror)
605 assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
606 site = splitHostPort(parsed[0], parsed[1])
607 path = urlunparse(('', '') + parsed[2:])
608 peer = self.getPeer(site)
609 return peer.get(path, method, modtime)
610 elif len(peers) == 1:
611 site = uncompact(peers[0]['c'])
612 log.msg('Downloading from peer %r' % (site, ))
613 path = '/~/' + quote_plus(hash.expected())
614 peer = self.getPeer(site)
615 return peer.get(path)
617 tmpfile = self.cache_dir.child(hash.hexexpected())
618 return FileDownload(self, hash, mirror, peers, tmpfile).run()
620 def getPeer(self, site):
621 """Create a new peer if necessary and return it.
623 @type site: (C{string}, C{int})
624 @param site: the IP address and port of the peer
626 if site not in self.clients:
627 self.clients[site] = Peer(site[0], site[1])
628 return self.clients[site]
631 """Close all the connections to peers."""
632 for site in self.clients:
633 self.clients[site].close()
636 class TestPeerManager(unittest.TestCase):
637 """Unit tests for the PeerManager."""
642 def gotResp(self, resp, num, expect):
643 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
644 if expect is not None:
645 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
650 stream.readStream(resp.stream, print_).addCallback(printdone)
652 def test_download(self):
653 """Tests a normal download."""
654 self.manager = PeerManager()
657 host = 'www.ietf.org'
658 d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt')
659 d.addCallback(self.gotResp, 1, 1070)
663 """Tests a 'HEAD' request."""
664 self.manager = PeerManager()
667 host = 'www.ietf.org'
668 d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt', method = "HEAD")
669 d.addCallback(self.gotResp, 1, 0)
672 def test_multiple_downloads(self):
673 """Tests multiple downloads with queueing and connection closing."""
674 self.manager = PeerManager()
676 lastDefer = defer.Deferred()
678 def newRequest(host, path, num, expect, last=False):
679 d = self.manager.get('', 'http://' + host + ':' + str(80) + path)
680 d.addCallback(self.gotResp, num, expect)
682 d.addBoth(lastDefer.callback)
684 newRequest('www.ietf.org', "/rfc/rfc0006.txt", 1, 1776)
685 newRequest('www.ietf.org', "/rfc/rfc2362.txt", 2, 159833)
686 newRequest('www.google.ca', "/", 3, None)
687 self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
688 self.pending_calls.append(reactor.callLater(10, newRequest, 'www.ietf.org', '/rfc/rfc0048.txt', 5, 41696))
689 self.pending_calls.append(reactor.callLater(30, newRequest, 'www.ietf.org', '/rfc/rfc0022.txt', 6, 4606))
690 self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
691 self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0014.txt', 8, 27))
692 self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0001.txt', 9, 21088))
693 self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
697 for p in self.pending_calls:
700 self.pending_calls = []