2 """Manage a set of peers and the requests to them."""
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
10 from twisted.internet import reactor, defer
11 from twisted.python import log, filepath
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20 from apt_p2p_conf import config
23 class PeerError(Exception):
24 """An error occurred downloading from peers."""
26 class GrowingFileStream(stream.FileStream):
27 """Modified to stream data from a file as it becomes available.
29 @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
30 @ivar deferred: waiting for the result of the last read attempt
31 @ivar available: the number of bytes that are currently available to read
32 @ivar position: the current position in the file where the next read will begin
33 @ivar finished: True when no more data will be coming available
38 def __init__(self, f, length = None):
39 stream.FileStream.__init__(self, f)
46 def updateAvailable(self, newlyAvailable):
47 """Update the number of bytes that are available.
49 Call it with 0 to trigger reading of a fully read file.
51 @param newlyAvailable: the number of bytes that just became available
53 assert not self.finished
54 self.available += newlyAvailable
56 # If a read is pending, let it go
57 if self.deferred and self.position < self.available:
58 # Try to read some data from the file
59 length = self.available - self.position
60 readSize = min(length, self.CHUNK_SIZE)
61 self.f.seek(self.position)
62 b = self.f.read(readSize)
65 # Check if end of file was reached
67 self.position += bytesRead
68 deferred = self.deferred
72 def allAvailable(self):
73 """Indicate that no more data will be coming available."""
76 # If a read is pending, let it go
78 if self.position < self.available:
79 # Try to read some data from the file
80 length = self.available - self.position
81 readSize = min(length, self.CHUNK_SIZE)
82 self.f.seek(self.position)
83 b = self.f.read(readSize)
86 # Check if end of file was reached
88 self.position += bytesRead
89 deferred = self.deferred
95 deferred = self.deferred
97 deferred.callback(None)
101 deferred = self.deferred
103 deferred.callback(None)
105 def read(self, sendfile=False):
106 assert not self.deferred, "A previous read is still deferred."
111 length = self.available - self.position
112 readSize = min(length, self.CHUNK_SIZE)
114 # If we don't have any available, we're done or deferred
120 self.deferred = defer.Deferred()
123 # Try to read some data from the file
124 self.f.seek(self.position)
125 b = self.f.read(readSize)
128 # End of file was reached, we're done or deferred
133 self.deferred = defer.Deferred()
136 self.position += bytesRead
140 """Close the temporary file and remove it."""
142 filepath.FilePath(self.f.name).remove()
146 """Save a stream to a partial file and hash it.
148 @type stream: L{twisted.web2.stream.IByteStream}
149 @ivar stream: the input stream being read
150 @type outFile: L{twisted.python.filepath.FilePath}
151 @ivar outFile: the file being written
152 @type hasher: hashing object, e.g. C{sha1}
153 @ivar hasher: the hash object for the data
154 @type position: C{int}
155 @ivar position: the current file position to write the next data to
157 @ivar length: the position in the file to not write beyond
158 @type doneDefer: L{twisted.internet.defer.Deferred}
159 @ivar doneDefer: the deferred that will fire when done writing
162 def __init__(self, hasher, inputStream, outFile, start = 0, length = None):
163 """Initializes the file.
165 @type hasher: hashing object, e.g. C{sha1}
166 @param hasher: the hash object for the data
167 @type inputStream: L{twisted.web2.stream.IByteStream}
168 @param inputStream: the input stream to read from
169 @type outFile: L{twisted.python.filepath.FilePath}
170 @param outFile: the file to write to
172 @param start: the file position to start writing at
173 (optional, defaults to the start of the file)
175 @param length: the maximum amount of data to write to the file
176 (optional, defaults to not limiting the writing to the file
178 self.stream = inputStream
179 self.outFile = outFile
181 self.position = start
183 if length is not None:
184 self.length = start + length
185 self.doneDefer = None
188 """Start the streaming.
190 @rtype: L{twisted.internet.defer.Deferred}
192 self.doneDefer = stream.readStream(self.stream, self._gotData)
193 self.doneDefer.addCallbacks(self._done, self._error)
194 return self.doneDefer
196 def _gotData(self, data):
197 """Process the received data."""
198 if self.outFile.closed:
199 raise PeerError, "outFile was unexpectedly closed"
202 raise PeerError, "Data is None?"
204 # Make sure we don't go too far
205 if self.length is not None and self.position + len(data) > self.length:
206 data = data[:(self.length - self.position)]
208 # Write and hash the streamed data
209 self.outFile.seek(self.position)
210 self.outFile.write(data)
211 self.hasher.update(data)
212 self.position += len(data)
214 def _done(self, result):
215 """Return the result."""
216 return self.hasher.digest()
218 def _error(self, err):
220 log.msg('Streaming error')
225 """Manage a download from a list of peers or a mirror.
227 @type manager: L{PeerManager}
228 @ivar manager: the manager to send requests for peers to
229 @type hash: L{Hash.HashObject}
230 @ivar hash: the hash object containing the expected hash for the file
231 @ivar mirror: the URI of the file on the mirror
232 @type compact_peers: C{list} of C{dictionary}
233 @ivar compact_peers: a list of the peer info where the file can be found
235 @ivar file: the open file to right the download to
236 @type path: C{string}
237 @ivar path: the path to request from peers to access the file
238 @type pieces: C{list} of C{string}
239 @ivar pieces: the hashes of the pieces in the file
240 @type started: C{boolean}
241 @ivar started: whether the download has begun yet
242 @type defer: L{twisted.internet.defer.Deferred}
243 @ivar defer: the deferred that will callback with the result of the download
244 @type peers: C{dictionary}
245 @ivar peers: information about each of the peers available to download from
246 @type outstanding: C{int}
247 @ivar outstanding: the number of requests to peers currently outstanding
248 @type peerlist: C{list} of L{HTTPDownloader.Peer}
249 @ivar peerlist: the sorted list of peers for this download
250 @type stream: L{GrowingFileStream}
251 @ivar stream: the stream of resulting data from the download
252 @type nextFinish: C{int}
253 @ivar nextFinish: the next piece that is needed to finish for the stream
254 @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
255 @ivar completePieces: one per piece, will be False if no requests are
256 outstanding for the piece, True if the piece has been successfully
257 downloaded, or the Peer that a request for this piece has been sent
260 def __init__(self, manager, hash, mirror, compact_peers, file):
261 """Initialize the instance and check for piece hashes.
263 @type manager: L{PeerManager}
264 @param manager: the manager to send requests for peers to
265 @type hash: L{Hash.HashObject}
266 @param hash: the hash object containing the expected hash for the file
267 @param mirror: the URI of the file on the mirror
268 @type compact_peers: C{list} of C{dictionary}
269 @param compact_peers: a list of the peer info where the file can be found
270 @type file: L{twisted.python.filepath.FilePath}
271 @param file: the temporary file to use to store the downloaded file
273 self.manager = manager
276 self.compact_peers = compact_peers
278 self.path = '/~/' + quote_plus(hash.expected())
280 self.mirror_path = None
287 self.file = file.open('w+')
290 """Start the downloading process."""
291 log.msg('Checking for pieces for %s' % self.path)
292 self.defer = defer.Deferred()
295 pieces_string = {0: 0}
297 pieces_dl_hash = {0: 0}
299 for compact_peer in self.compact_peers:
300 # Build a list of all the peers for this download
301 site = uncompact(compact_peer['c'])
302 peer = self.manager.getPeer(site)
303 self.peers.setdefault(site, {})['peer'] = peer
305 # Extract any piece information from the peers list
306 if 't' in compact_peer:
307 self.peers[site]['t'] = compact_peer['t']['t']
308 pieces_string.setdefault(compact_peer['t']['t'], 0)
309 pieces_string[compact_peer['t']['t']] += 1
310 elif 'h' in compact_peer:
311 self.peers[site]['h'] = compact_peer['h']
312 pieces_hash.setdefault(compact_peer['h'], 0)
313 pieces_hash[compact_peer['h']] += 1
314 elif 'l' in compact_peer:
315 self.peers[site]['l'] = compact_peer['l']
316 pieces_dl_hash.setdefault(compact_peer['l'], 0)
317 pieces_dl_hash[compact_peer['l']] += 1
321 # Select the most popular piece info
322 max_found = max(no_pieces, max(pieces_string.values()),
323 max(pieces_hash.values()), max(pieces_dl_hash.values()))
325 if max_found < len(self.peers):
326 log.msg('Misleading piece information found, using most popular %d of %d peers' %
327 (max_found, len(self.peers)))
329 if max_found == no_pieces:
330 # The file is not split into pieces
331 log.msg('No pieces were found for the file')
332 self.pieces = [self.hash.expected()]
334 elif max_found == max(pieces_string.values()):
335 # Small number of pieces in a string
336 for pieces, num in pieces_string.items():
337 # Find the most popular piece string
339 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
340 log.msg('Peer info contained %d piece hashes' % len(self.pieces))
343 elif max_found == max(pieces_hash.values()):
344 # Medium number of pieces stored in the DHT
345 for pieces, num in pieces_hash.items():
346 # Find the most popular piece hash to lookup
348 log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
349 self.getDHTPieces(pieces)
351 elif max_found == max(pieces_dl_hash.values()):
352 # Large number of pieces stored in peers
353 for pieces, num in pieces_dl_hash.items():
354 # Find the most popular piece hash to download
356 log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
357 self.getPeerPieces(pieces)
361 #{ Downloading the piece hashes
362 def getDHTPieces(self, key):
363 """Retrieve the piece information from the DHT.
365 @param key: the key to lookup in the DHT
367 # Remove any peers with the wrong piece hash
368 #for site in self.peers.keys():
369 # if self.peers[site].get('h', '') != key:
370 # del self.peers[site]
372 # Start the DHT lookup
373 lookupDefer = self.manager.dht.getValue(key)
374 lookupDefer.addBoth(self._getDHTPieces, key)
376 def _getDHTPieces(self, results, key):
377 """Check the retrieved values."""
378 if isinstance(results, list):
379 for result in results:
380 # Make sure the hash matches the key
381 result_hash = sha.new(result.get('t', '')).digest()
382 if result_hash == key:
384 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
385 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
389 log.msg('Could not retrieve the piece hashes from the DHT')
391 log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
393 # Continue without the piece hashes
394 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
397 def getPeerPieces(self, key, failedSite = None):
398 """Retrieve the piece information from the peers.
400 @param key: the key to request from the peers
402 if failedSite is None:
403 log.msg('Starting the lookup of piece hashes in peers')
405 # Remove any peers with the wrong piece hash
406 #for site in self.peers.keys():
407 # if self.peers[site].get('l', '') != key:
408 # del self.peers[site]
410 log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
411 self.peers[failedSite]['failed'] = True
412 self.outstanding -= 1
414 if self.pieces is None:
415 # Send a request to one or more peers
416 for site in self.peers:
417 if self.peers[site].get('failed', False) != True:
418 log.msg('Sending a piece hash request to %r' % (site, ))
419 path = '/~/' + quote_plus(key)
420 lookupDefer = self.peers[site]['peer'].get(path)
421 reactor.callLater(0, lookupDefer.addCallbacks,
422 *(self._getPeerPieces, self._gotPeerError),
423 **{'callbackArgs': (key, site),
424 'errbackArgs': (key, site)})
425 self.outstanding += 1
426 if self.outstanding >= 4:
429 if self.pieces is None and self.outstanding <= 0:
430 # Continue without the piece hashes
431 log.msg('Could not retrieve the piece hashes from the peers')
432 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
435 def _getPeerPieces(self, response, key, site):
436 """Process the retrieved response from the peer."""
437 log.msg('Got a piece hash response %d from %r' % (response.code, site))
438 if response.code != 200:
439 # Request failed, try a different peer
440 self.getPeerPieces(key, site)
442 # Read the response stream to a string
443 self.peers[site]['pieces'] = ''
444 def _gotPeerPiece(data, self = self, site = site):
445 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
446 self.peers[site]['pieces'] += data
447 log.msg('Streaming piece hashes from peer')
448 df = stream.readStream(response.stream, _gotPeerPiece)
449 df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
450 callbackArgs=(key, site), errbackArgs=(key, site))
452 def _gotPeerError(self, err, key, site):
453 """Peer failed, try again."""
454 log.msg('Peer piece hash request failed for %r' % (site, ))
456 self.getPeerPieces(key, site)
458 def _gotPeerPieces(self, result, key, site):
459 """Check the retrieved pieces from the peer."""
460 log.msg('Finished streaming piece hashes from peer %r' % (site, ))
461 if self.pieces is not None:
463 log.msg('Already done')
467 result = bdecode(self.peers[site]['pieces'])
469 log.msg('Error bdecoding piece hashes')
471 self.getPeerPieces(key, site)
474 result_hash = sha.new(result.get('t', '')).digest()
475 if result_hash == key:
477 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
478 log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
481 log.msg('Peer returned a piece string that did not match')
482 self.getPeerPieces(key, site)
484 #{ Downloading the file
486 """Sort the peers by their rank (highest ranked at the end)."""
488 """Sort peers by their rank."""
491 elif a.rank < b.rank:
494 self.peerlist.sort(sort)
496 def startDownload(self):
497 """Start the download from the peers."""
502 log.msg('Starting to download %s' % self.path)
504 assert self.pieces, "You must initialize the piece hashes first"
505 self.peerlist = [self.peers[site]['peer'] for site in self.peers]
507 # Use the mirror if there are few peers
508 if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
509 parsed = urlparse(self.mirror)
510 if parsed[0] == "http":
511 site = splitHostPort(parsed[0], parsed[1])
512 self.mirror_path = urlunparse(('', '') + parsed[2:])
513 peer = self.manager.getPeer(site, mirror = True)
514 self.peerlist.append(peer)
516 # Special case if there's only one good peer left
517 # if len(self.peerlist) == 1:
518 # log.msg('Downloading from peer %r' % (self.peerlist[0], ))
519 # self.defer.callback(self.peerlist[0].get(self.path))
522 # Begin to download the pieces
525 self.completePieces = [False for piece in self.pieces]
528 #{ Downloading the pieces
530 """Download the next pieces from the peers."""
532 piece = self.nextFinish
533 while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
534 if self.completePieces[piece] == False:
535 # Send a request to the highest ranked peer
536 peer = self.peerlist.pop()
537 self.completePieces[piece] = peer
538 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
540 self.outstanding += 1
543 path = self.mirror_path
544 if len(self.completePieces) > 1:
545 df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
548 reactor.callLater(0, df.addCallbacks,
549 *(self._getPiece, self._getError),
550 **{'callbackArgs': (piece, peer),
551 'errbackArgs': (piece, peer)})
554 # Check if we're done
555 if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
556 log.msg('Download is complete for %s' % self.path)
557 self.stream.allAvailable()
559 def _getPiece(self, response, piece, peer):
560 """Process the retrieved headers from the peer."""
561 if ((len(self.completePieces) > 1 and response.code != 206) or
562 (response.code not in (200, 206))):
563 # Request failed, try a different peer
564 log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
565 peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
566 self.completePieces[piece] = False
567 if response.stream and response.stream.length:
568 stream.readAndDiscard(response.stream)
571 # Start sending the return file
574 self.stream = GrowingFileStream(self.file, self.hash.expSize)
576 # Get the headers from the peer's response
578 if response.headers.hasHeader('last-modified'):
579 headers['last-modified'] = response.headers.getHeader('last-modified')
580 resp = Response(200, headers, self.stream)
583 # Read the response stream to the file
584 log.msg('Streaming piece %d from peer %r' % (piece, peer))
585 if response.code == 206:
586 df = StreamToFile(self.hash.newPieceHasher(), response.stream,
587 self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
589 df = StreamToFile(self.hash.newHasher(), response.stream,
591 reactor.callLater(0, df.addCallbacks,
592 *(self._gotPiece, self._gotError),
593 **{'callbackArgs': (piece, peer),
594 'errbackArgs': (piece, peer)})
596 self.outstanding -= 1
597 self.peerlist.append(peer)
600 def _getError(self, err, piece, peer):
601 """Peer failed, try again."""
602 log.msg('Got error for piece %d from peer %r' % (piece, peer))
603 self.outstanding -= 1
604 self.peerlist.append(peer)
605 self.completePieces[piece] = False
609 def _gotPiece(self, response, piece, peer):
610 """Process the retrieved piece from the peer."""
611 if self.pieces[piece] and response != self.pieces[piece]:
613 log.msg('Hash error for piece %d from peer %r' % (piece, peer))
614 peer.hashError('Piece received from peer does not match expected')
615 self.completePieces[piece] = False
617 # Successfully completed one of several pieces
618 log.msg('Finished with piece %d from peer %r' % (piece, peer))
619 self.completePieces[piece] = True
620 while (self.nextFinish < len(self.completePieces) and
621 self.completePieces[self.nextFinish] == True):
623 self.stream.updateAvailable(PIECE_SIZE)
627 def _gotError(self, err, piece, peer):
628 """Piece download failed, try again."""
629 log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
631 self.completePieces[piece] = False
635 """Manage a set of peers and the requests to them.
637 @type cache_dir: L{twisted.python.filepath.FilePath}
638 @ivar cache_dir: the directory to use for storing all files
639 @type dht: L{interfaces.IDHT}
640 @ivar dht: the DHT instance
641 @type stats: L{stats.StatsLogger}
642 @ivar stats: the statistics logger to record sent data to
643 @type clients: C{dictionary}
644 @ivar clients: the available peers that have been previously contacted
647 def __init__(self, cache_dir, dht, stats):
648 """Initialize the instance."""
649 self.cache_dir = cache_dir
650 self.cache_dir.restat(False)
651 if not self.cache_dir.exists():
652 self.cache_dir.makedirs()
657 def get(self, hash, mirror, peers = [], method="GET", modtime=None):
658 """Download from a list of peers or fallback to a mirror.
660 @type hash: L{Hash.HashObject}
661 @param hash: the hash object containing the expected hash for the file
662 @param mirror: the URI of the file on the mirror
663 @type peers: C{list} of C{string}
664 @param peers: a list of the peer info where the file can be found
665 (optional, defaults to downloading from the mirror)
666 @type method: C{string}
667 @param method: the HTTP method to use, 'GET' or 'HEAD'
668 (optional, defaults to 'GET')
669 @type modtime: C{int}
670 @param modtime: the modification time to use for an 'If-Modified-Since'
671 header, as seconds since the epoch
672 (optional, defaults to not sending that header)
674 if not peers or method != "GET" or modtime is not None:
675 log.msg('Downloading (%s) from mirror %s' % (method, mirror))
676 parsed = urlparse(mirror)
677 assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
678 site = splitHostPort(parsed[0], parsed[1])
679 path = urlunparse(('', '') + parsed[2:])
680 peer = self.getPeer(site, mirror = True)
681 return peer.get(path, method, modtime)
682 # elif len(peers) == 1:
683 # site = uncompact(peers[0]['c'])
684 # log.msg('Downloading from peer %r' % (site, ))
685 # path = '/~/' + quote_plus(hash.expected())
686 # peer = self.getPeer(site)
687 # return peer.get(path)
689 tmpfile = self.cache_dir.child(hash.hexexpected())
690 return FileDownload(self, hash, mirror, peers, tmpfile).run()
692 def getPeer(self, site, mirror = False):
693 """Create a new peer if necessary and return it.
695 @type site: (C{string}, C{int})
696 @param site: the IP address and port of the peer
697 @param mirror: whether the peer is actually a mirror
698 (optional, defaults to False)
700 if site not in self.clients:
701 self.clients[site] = Peer(site[0], site[1], self.stats)
703 self.clients[site].mirror = True
704 return self.clients[site]
707 """Close all the connections to peers."""
708 for site in self.clients:
709 self.clients[site].close()
712 class TestPeerManager(unittest.TestCase):
713 """Unit tests for the PeerManager."""
719 for p in self.pending_calls:
722 self.pending_calls = []