2 """Manage a set of peers and the requests to them."""
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20 from apt_p2p_conf import config
23 class PeerError(Exception):
24 """An error occurred downloading from peers."""
26 class GrowingFileStream(stream.FileStream):
27 """Modified to stream data from a file as it becomes available.
29 @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
30 @ivar deferred: waiting for the result of the last read attempt
31 @ivar available: the number of bytes that are currently available to read
32 @ivar position: the current position in the file where the next read will begin
33 @ivar finished: True when no more data will be coming available
38 def __init__(self, f, length = None):
39 stream.FileStream.__init__(self, f)
46 def updateAvailable(self, newlyAvailable):
47 """Update the number of bytes that are available.
49 Call it with 0 to trigger reading of a fully read file.
51 @param newlyAvailable: the number of bytes that just became available
53 assert not self.finished
54 self.available += newlyAvailable
56 # If a read is pending, let it go
57 if self.deferred and self.position < self.available:
58 # Try to read some data from the file
59 length = self.available - self.position
60 readSize = min(length, self.CHUNK_SIZE)
61 self.f.seek(self.position)
62 b = self.f.read(readSize)
65 # Check if end of file was reached
67 self.position += bytesRead
68 deferred = self.deferred
72 def allAvailable(self):
73 """Indicate that no more data will be coming available."""
76 # If a read is pending, let it go
78 if self.position < self.available:
79 # Try to read some data from the file
80 length = self.available - self.position
81 readSize = min(length, self.CHUNK_SIZE)
82 self.f.seek(self.position)
83 b = self.f.read(readSize)
86 # Check if end of file was reached
88 self.position += bytesRead
89 deferred = self.deferred
94 deferred = self.deferred
96 deferred.callback(None)
99 deferred = self.deferred
101 deferred.callback(None)
103 def read(self, sendfile=False):
104 assert not self.deferred, "A previous read is still deferred."
109 length = self.available - self.position
110 readSize = min(length, self.CHUNK_SIZE)
112 # If we don't have any available, we're done or deferred
117 self.deferred = defer.Deferred()
120 # Try to read some data from the file
121 self.f.seek(self.position)
122 b = self.f.read(readSize)
125 # End of file was reached, we're done or deferred
129 self.deferred = defer.Deferred()
132 self.position += bytesRead
136 """Save a stream to a partial file and hash it.
138 @type stream: L{twisted.web2.stream.IByteStream}
139 @ivar stream: the input stream being read
140 @type outFile: L{twisted.python.filepath.FilePath}
141 @ivar outFile: the file being written
143 @ivar hash: the hash object for the data
144 @type position: C{int}
145 @ivar position: the current file position to write the next data to
147 @ivar length: the position in the file to not write beyond
148 @type doneDefer: L{twisted.internet.defer.Deferred}
149 @ivar doneDefer: the deferred that will fire when done writing
152 def __init__(self, inputStream, outFile, start = 0, length = None):
153 """Initializes the file.
155 @type inputStream: L{twisted.web2.stream.IByteStream}
156 @param inputStream: the input stream to read from
157 @type outFile: L{twisted.python.filepath.FilePath}
158 @param outFile: the file to write to
160 @param start: the file position to start writing at
161 (optional, defaults to the start of the file)
163 @param length: the maximum amount of data to write to the file
164 (optional, defaults to not limiting the writing to the file
166 self.stream = inputStream
167 self.outFile = outFile
168 self.hash = sha.new()
169 self.position = start
171 if length is not None:
172 self.length = start + length
173 self.doneDefer = None
176 """Start the streaming.
178 @rtype: L{twisted.internet.defer.Deferred}
180 log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
181 self.doneDefer = stream.readStream(self.stream, self._gotData)
182 self.doneDefer.addCallbacks(self._done, self._error)
183 return self.doneDefer
185 def _gotData(self, data):
186 """Process the received data."""
187 if self.outFile.closed:
188 raise PeerError, "outFile was unexpectedly closed"
191 raise PeerError, "Data is None?"
193 # Make sure we don't go too far
194 if self.length is not None and self.position + len(data) > self.length:
195 data = data[:(self.length - self.position)]
197 # Write and hash the streamed data
198 self.outFile.seek(self.position)
199 self.outFile.write(data)
200 self.hash.update(data)
201 self.position += len(data)
203 def _done(self, result):
204 """Return the result."""
205 log.msg('Streaming is complete')
206 return self.hash.digest()
208 def _error(self, err):
210 log.msg('Streaming error')
215 """Manage a download from a list of peers or a mirror.
217 @type manager: L{PeerManager}
218 @ivar manager: the manager to send requests for peers to
219 @type hash: L{Hash.HashObject}
220 @ivar hash: the hash object containing the expected hash for the file
221 @ivar mirror: the URI of the file on the mirror
222 @type compact_peers: C{list} of C{dictionary}
223 @ivar compact_peers: a list of the peer info where the file can be found
225 @ivar file: the open file to right the download to
226 @type path: C{string}
227 @ivar path: the path to request from peers to access the file
228 @type pieces: C{list} of C{string}
229 @ivar pieces: the hashes of the pieces in the file
230 @type started: C{boolean}
231 @ivar started: whether the download has begun yet
232 @type defer: L{twisted.internet.defer.Deferred}
233 @ivar defer: the deferred that will callback with the result of the download
234 @type peers: C{dictionary}
235 @ivar peers: information about each of the peers available to download from
236 @type outstanding: C{int}
237 @ivar outstanding: the number of requests to peers currently outstanding
238 @type peerlist: C{list} of L{HTTPDownloader.Peer}
239 @ivar peerlist: the sorted list of peers for this download
240 @type stream: L{GrowingFileStream}
241 @ivar stream: the stream of resulting data from the download
242 @type nextFinish: C{int}
243 @ivar nextFinish: the next piece that is needed to finish for the stream
244 @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
245 @ivar completePieces: one per piece, will be False if no requests are
246 outstanding for the piece, True if the piece has been successfully
247 downloaded, or the Peer that a request for this piece has been sent
250 def __init__(self, manager, hash, mirror, compact_peers, file):
251 """Initialize the instance and check for piece hashes.
253 @type manager: L{PeerManager}
254 @param manager: the manager to send requests for peers to
255 @type hash: L{Hash.HashObject}
256 @param hash: the hash object containing the expected hash for the file
257 @param mirror: the URI of the file on the mirror
258 @type compact_peers: C{list} of C{dictionary}
259 @param compact_peers: a list of the peer info where the file can be found
260 @type file: L{twisted.python.filepath.FilePath}
261 @param file: the temporary file to use to store the downloaded file
263 self.manager = manager
266 self.compact_peers = compact_peers
268 self.path = '/~/' + quote_plus(hash.expected())
269 self.mirror_path = None
276 self.file = file.open('w+')
279 """Start the downloading process."""
280 log.msg('Checking for pieces for %s' % self.path)
281 self.defer = defer.Deferred()
284 pieces_string = {0: 0}
286 pieces_dl_hash = {0: 0}
288 for compact_peer in self.compact_peers:
289 # Build a list of all the peers for this download
290 site = uncompact(compact_peer['c'])
291 peer = self.manager.getPeer(site)
292 self.peers.setdefault(site, {})['peer'] = peer
294 # Extract any piece information from the peers list
295 if 't' in compact_peer:
296 self.peers[site]['t'] = compact_peer['t']['t']
297 pieces_string.setdefault(compact_peer['t']['t'], 0)
298 pieces_string[compact_peer['t']['t']] += 1
299 elif 'h' in compact_peer:
300 self.peers[site]['h'] = compact_peer['h']
301 pieces_hash.setdefault(compact_peer['h'], 0)
302 pieces_hash[compact_peer['h']] += 1
303 elif 'l' in compact_peer:
304 self.peers[site]['l'] = compact_peer['l']
305 pieces_dl_hash.setdefault(compact_peer['l'], 0)
306 pieces_dl_hash[compact_peer['l']] += 1
310 # Select the most popular piece info
311 max_found = max(no_pieces, max(pieces_string.values()),
312 max(pieces_hash.values()), max(pieces_dl_hash.values()))
314 if max_found < len(self.peers):
315 log.msg('Misleading piece information found, using most popular %d of %d peers' %
316 (max_found, len(self.peers)))
318 if max_found == no_pieces:
319 # The file is not split into pieces
320 log.msg('No pieces were found for the file')
321 self.pieces = [self.hash.expected()]
323 elif max_found == max(pieces_string.values()):
324 # Small number of pieces in a string
325 for pieces, num in pieces_string.items():
326 # Find the most popular piece string
328 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
329 log.msg('Peer info contained %d piece hashes' % len(self.pieces))
332 elif max_found == max(pieces_hash.values()):
333 # Medium number of pieces stored in the DHT
334 for pieces, num in pieces_hash.items():
335 # Find the most popular piece hash to lookup
337 log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
338 self.getDHTPieces(pieces)
340 elif max_found == max(pieces_dl_hash.values()):
341 # Large number of pieces stored in peers
342 for pieces, num in pieces_dl_hash.items():
343 # Find the most popular piece hash to download
345 log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
346 self.getPeerPieces(pieces)
350 #{ Downloading the piece hashes
351 def getDHTPieces(self, key):
352 """Retrieve the piece information from the DHT.
354 @param key: the key to lookup in the DHT
356 # Remove any peers with the wrong piece hash
357 #for site in self.peers.keys():
358 # if self.peers[site].get('h', '') != key:
359 # del self.peers[site]
361 # Start the DHT lookup
362 lookupDefer = self.manager.dht.getValue(key)
363 lookupDefer.addBoth(self._getDHTPieces, key)
365 def _getDHTPieces(self, results, key):
366 """Check the retrieved values."""
367 if isinstance(results, list):
368 for result in results:
369 # Make sure the hash matches the key
370 result_hash = sha.new(result.get('t', '')).digest()
371 if result_hash == key:
373 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
374 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
378 log.msg('Could not retrieve the piece hashes from the DHT')
380 log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
382 # Continue without the piece hashes
383 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
386 def getPeerPieces(self, key, failedSite = None):
387 """Retrieve the piece information from the peers.
389 @param key: the key to request from the peers
391 if failedSite is None:
392 log.msg('Starting the lookup of piece hashes in peers')
394 # Remove any peers with the wrong piece hash
395 #for site in self.peers.keys():
396 # if self.peers[site].get('l', '') != key:
397 # del self.peers[site]
399 log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
400 self.peers[failedSite]['failed'] = True
401 self.outstanding -= 1
403 if self.pieces is None:
404 # Send a request to one or more peers
405 for site in self.peers:
406 if self.peers[site].get('failed', False) != True:
407 log.msg('Sending a piece hash request to %r' % (site, ))
408 path = '/~/' + quote_plus(key)
409 lookupDefer = self.peers[site]['peer'].get(path)
410 reactor.callLater(0, lookupDefer.addCallbacks,
411 *(self._getPeerPieces, self._gotPeerError),
412 **{'callbackArgs': (key, site),
413 'errbackArgs': (key, site)})
414 self.outstanding += 1
415 if self.outstanding >= 4:
418 if self.pieces is None and self.outstanding <= 0:
419 # Continue without the piece hashes
420 log.msg('Could not retrieve the piece hashes from the peers')
421 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
424 def _getPeerPieces(self, response, key, site):
425 """Process the retrieved response from the peer."""
426 log.msg('Got a piece hash response %d from %r' % (response.code, site))
427 if response.code != 200:
428 # Request failed, try a different peer
429 self.getPeerPieces(key, site)
431 # Read the response stream to a string
432 self.peers[site]['pieces'] = ''
433 def _gotPeerPiece(data, self = self, site = site):
434 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
435 self.peers[site]['pieces'] += data
436 log.msg('Streaming piece hashes from peer')
437 df = stream.readStream(response.stream, _gotPeerPiece)
438 df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
439 callbackArgs=(key, site), errbackArgs=(key, site))
441 def _gotPeerError(self, err, key, site):
442 """Peer failed, try again."""
443 log.msg('Peer piece hash request failed for %r' % (site, ))
445 self.getPeerPieces(key, site)
447 def _gotPeerPieces(self, result, key, site):
448 """Check the retrieved pieces from the peer."""
449 log.msg('Finished streaming piece hashes from peer %r' % (site, ))
450 if self.pieces is not None:
452 log.msg('Already done')
456 result = bdecode(self.peers[site]['pieces'])
458 log.msg('Error bdecoding piece hashes')
460 self.getPeerPieces(key, site)
463 result_hash = sha.new(result.get('t', '')).digest()
464 if result_hash == key:
466 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
467 log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
470 log.msg('Peer returned a piece string that did not match')
471 self.getPeerPieces(key, site)
473 #{ Downloading the file
475 """Sort the peers by their rank (highest ranked at the end)."""
477 """Sort peers by their rank."""
480 elif a.rank < b.rank:
483 self.peerlist.sort(sort)
485 def startDownload(self):
486 """Start the download from the peers."""
491 log.msg('Starting to download %s' % self.path)
493 assert self.pieces, "You must initialize the piece hashes first"
494 self.peerlist = [self.peers[site]['peer'] for site in self.peers]
496 # Use the mirror if there are few peers
497 if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
498 parsed = urlparse(self.mirror)
499 if parsed[0] == "http":
500 site = splitHostPort(parsed[0], parsed[1])
501 self.mirror_path = urlunparse(('', '') + parsed[2:])
502 peer = self.manager.getPeer(site, mirror = True)
503 self.peerlist.append(peer)
505 # Special case if there's only one good peer left
506 # if len(self.peerlist) == 1:
507 # log.msg('Downloading from peer %r' % (self.peerlist[0], ))
508 # self.defer.callback(self.peerlist[0].get(self.path))
511 # Start sending the return file
512 self.stream = GrowingFileStream(self.file, self.hash.expSize)
513 resp = Response(200, {}, self.stream)
514 self.defer.callback(resp)
516 # Begin to download the pieces
519 self.completePieces = [False for piece in self.pieces]
522 #{ Downloading the pieces
524 """Download the next pieces from the peers."""
526 piece = self.nextFinish
527 while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
528 if self.completePieces[piece] == False:
529 # Send a request to the highest ranked peer
530 peer = self.peerlist.pop()
531 self.completePieces[piece] = peer
532 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
534 self.outstanding += 1
536 df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
538 df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
539 reactor.callLater(0, df.addCallbacks,
540 *(self._getPiece, self._getError),
541 **{'callbackArgs': (piece, peer),
542 'errbackArgs': (piece, peer)})
545 # Check if we're done
546 if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
547 log.msg('We seem to be done with all pieces')
548 self.stream.allAvailable()
550 def _getPiece(self, response, piece, peer):
551 """Process the retrieved headers from the peer."""
552 log.msg('Got response for piece %d from peer %r' % (piece, peer))
553 if ((len(self.completePieces) > 1 and response.code != 206) or
554 (response.code not in (200, 206))):
555 # Request failed, try a different peer
556 log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
557 peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
558 self.completePieces[piece] = False
559 if response.stream and response.stream.length:
560 stream.readAndDiscard(response.stream)
562 # Read the response stream to the file
563 log.msg('Streaming piece %d from peer %r' % (piece, peer))
564 if response.code == 206:
565 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
568 df = StreamToFile(response.stream, self.file).run()
569 reactor.callLater(0, df.addCallbacks,
570 *(self._gotPiece, self._gotError),
571 **{'callbackArgs': (piece, peer),
572 'errbackArgs': (piece, peer)})
574 self.outstanding -= 1
575 self.peerlist.append(peer)
578 def _getError(self, err, piece, peer):
579 """Peer failed, try again."""
580 log.msg('Got error for piece %d from peer %r' % (piece, peer))
581 self.outstanding -= 1
582 self.peerlist.append(peer)
583 self.completePieces[piece] = False
587 def _gotPiece(self, response, piece, peer):
588 """Process the retrieved piece from the peer."""
589 log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
590 if self.pieces[piece] and response != self.pieces[piece]:
592 log.msg('Hash error for piece %d from peer %r' % (piece, peer))
593 peer.hashError('Piece received from peer does not match expected')
594 self.completePieces[piece] = False
596 # Successfully completed one of several pieces
597 log.msg('Finished with piece %d from peer %r' % (piece, peer))
598 self.completePieces[piece] = True
599 while (self.nextFinish < len(self.completePieces) and
600 self.completePieces[self.nextFinish] == True):
602 self.stream.updateAvailable(PIECE_SIZE)
606 def _gotError(self, err, piece, peer):
607 """Piece download failed, try again."""
608 log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
610 self.completePieces[piece] = False
614 """Manage a set of peers and the requests to them.
616 @type cache_dir: L{twisted.python.filepath.FilePath}
617 @ivar cache_dir: the directory to use for storing all files
618 @type dht: L{interfaces.IDHT}
619 @ivar dht: the DHT instance
620 @type stats: L{stats.StatsLogger}
621 @ivar stats: the statistics logger to record sent data to
622 @type clients: C{dictionary}
623 @ivar clients: the available peers that have been previously contacted
626 def __init__(self, cache_dir, dht, stats):
627 """Initialize the instance."""
628 self.cache_dir = cache_dir
629 self.cache_dir.restat(False)
630 if not self.cache_dir.exists():
631 self.cache_dir.makedirs()
636 def get(self, hash, mirror, peers = [], method="GET", modtime=None):
637 """Download from a list of peers or fallback to a mirror.
639 @type hash: L{Hash.HashObject}
640 @param hash: the hash object containing the expected hash for the file
641 @param mirror: the URI of the file on the mirror
642 @type peers: C{list} of C{string}
643 @param peers: a list of the peer info where the file can be found
644 (optional, defaults to downloading from the mirror)
645 @type method: C{string}
646 @param method: the HTTP method to use, 'GET' or 'HEAD'
647 (optional, defaults to 'GET')
648 @type modtime: C{int}
649 @param modtime: the modification time to use for an 'If-Modified-Since'
650 header, as seconds since the epoch
651 (optional, defaults to not sending that header)
653 if not peers or method != "GET" or modtime is not None:
654 log.msg('Downloading (%s) from mirror %s' % (method, mirror))
655 parsed = urlparse(mirror)
656 assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
657 site = splitHostPort(parsed[0], parsed[1])
658 path = urlunparse(('', '') + parsed[2:])
659 peer = self.getPeer(site, mirror = True)
660 return peer.get(path, method, modtime)
661 # elif len(peers) == 1:
662 # site = uncompact(peers[0]['c'])
663 # log.msg('Downloading from peer %r' % (site, ))
664 # path = '/~/' + quote_plus(hash.expected())
665 # peer = self.getPeer(site)
666 # return peer.get(path)
668 tmpfile = self.cache_dir.child(hash.hexexpected())
669 return FileDownload(self, hash, mirror, peers, tmpfile).run()
671 def getPeer(self, site, mirror = False):
672 """Create a new peer if necessary and return it.
674 @type site: (C{string}, C{int})
675 @param site: the IP address and port of the peer
676 @param mirror: whether the peer is actually a mirror
677 (optional, defaults to False)
679 if site not in self.clients:
680 self.clients[site] = Peer(site[0], site[1], self.stats)
682 self.clients[site].mirror = True
683 return self.clients[site]
686 """Close all the connections to peers."""
687 for site in self.clients:
688 self.clients[site].close()
691 class TestPeerManager(unittest.TestCase):
692 """Unit tests for the PeerManager."""
698 for p in self.pending_calls:
701 self.pending_calls = []