2 """Manage a set of peers and the requests to them."""
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20 from apt_p2p_conf import config
22 class GrowingFileStream(stream.FileStream):
23 """Modified to stream data from a file as it becomes available.
25 @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
26 @ivar deferred: waiting for the result of the last read attempt
27 @ivar available: the number of bytes that are currently available to read
28 @ivar position: the current position in the file where the next read will begin
29 @ivar finished: True when no more data will be coming available
34 def __init__(self, f, length = None):
35 stream.FileStream.__init__(self, f)
42 def updateAvailable(self, newlyAvailable):
43 """Update the number of bytes that are available.
45 Call it with 0 to trigger reading of a fully read file.
47 @param newlyAvailable: the number of bytes that just became available
49 assert not self.finished
50 self.available += newlyAvailable
52 # If a read is pending, let it go
53 if self.deferred and self.position < self.available:
54 # Try to read some data from the file
55 length = self.available - self.position
56 readSize = min(length, self.CHUNK_SIZE)
57 self.f.seek(self.position)
58 b = self.f.read(readSize)
61 # Check if end of file was reached
63 self.position += bytesRead
64 deferred = self.deferred
68 def allAvailable(self):
69 """Indicate that no more data will be coming available."""
72 # If a read is pending, let it go
74 if self.position < self.available:
75 # Try to read some data from the file
76 length = self.available - self.position
77 readSize = min(length, self.CHUNK_SIZE)
78 self.f.seek(self.position)
79 b = self.f.read(readSize)
82 # Check if end of file was reached
84 self.position += bytesRead
85 deferred = self.deferred
90 deferred = self.deferred
92 deferred.callback(None)
95 deferred = self.deferred
97 deferred.callback(None)
99 def read(self, sendfile=False):
100 assert not self.deferred, "A previous read is still deferred."
105 length = self.available - self.position
106 readSize = min(length, self.CHUNK_SIZE)
108 # If we don't have any available, we're done or deferred
113 self.deferred = defer.Deferred()
116 # Try to read some data from the file
117 self.f.seek(self.position)
118 b = self.f.read(readSize)
121 # End of file was reached, we're done or deferred
125 self.deferred = defer.Deferred()
128 self.position += bytesRead
132 """Save a stream to a partial file and hash it.
134 @type stream: L{twisted.web2.stream.IByteStream}
135 @ivar stream: the input stream being read
136 @type outFile: L{twisted.python.filepath.FilePath}
137 @ivar outFile: the file being written
139 @ivar hash: the hash object for the data
140 @type position: C{int}
141 @ivar position: the current file position to write the next data to
143 @ivar length: the position in the file to not write beyond
144 @type inform: C{method}
145 @ivar inform: a function to call with the length of data received
146 @type doneDefer: L{twisted.internet.defer.Deferred}
147 @ivar doneDefer: the deferred that will fire when done writing
150 def __init__(self, inputStream, outFile, start = 0, length = None, inform = None):
151 """Initializes the file.
153 @type inputStream: L{twisted.web2.stream.IByteStream}
154 @param inputStream: the input stream to read from
155 @type outFile: L{twisted.python.filepath.FilePath}
156 @param outFile: the file to write to
158 @param start: the file position to start writing at
159 (optional, defaults to the start of the file)
161 @param length: the maximum amount of data to write to the file
162 (optional, defaults to not limiting the writing to the file
163 @type inform: C{method}
164 @param inform: a function to call with the length of data received
165 (optional, defaults to calling nothing)
167 self.stream = inputStream
168 self.outFile = outFile
169 self.hash = sha.new()
170 self.position = start
172 if length is not None:
173 self.length = start + length
175 self.doneDefer = None
178 """Start the streaming.
180 @rtype: L{twisted.internet.defer.Deferred}
182 log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
183 self.doneDefer = stream.readStream(self.stream, self._gotData)
184 self.doneDefer.addCallbacks(self._done, self._error)
185 return self.doneDefer
187 def _gotData(self, data):
188 """Process the received data."""
189 if self.outFile.closed:
190 raise Exception, "outFile was unexpectedly closed"
193 raise Exception, "Data is None?"
195 # Make sure we don't go too far
196 if self.length is not None and self.position + len(data) > self.length:
197 data = data[:(self.length - self.position)]
199 # Write and hash the streamed data
200 self.outFile.seek(self.position)
201 self.outFile.write(data)
202 self.hash.update(data)
203 self.position += len(data)
204 self.inform(len(data))
206 def _done(self, result):
207 """Return the result."""
208 log.msg('Streaming is complete')
209 return self.hash.digest()
211 def _error(self, err):
213 log.msg('Streaming error')
218 """Manage a download from a list of peers or a mirror.
220 @type manager: L{PeerManager}
221 @ivar manager: the manager to send requests for peers to
222 @type hash: L{Hash.HashObject}
223 @ivar hash: the hash object containing the expected hash for the file
224 @ivar mirror: the URI of the file on the mirror
225 @type compact_peers: C{list} of C{dictionary}
226 @ivar compact_peers: a list of the peer info where the file can be found
228 @ivar file: the open file to right the download to
229 @type path: C{string}
230 @ivar path: the path to request from peers to access the file
231 @type pieces: C{list} of C{string}
232 @ivar pieces: the hashes of the pieces in the file
233 @type started: C{boolean}
234 @ivar started: whether the download has begun yet
235 @type defer: L{twisted.internet.defer.Deferred}
236 @ivar defer: the deferred that will callback with the result of the download
237 @type peers: C{dictionary}
238 @ivar peers: information about each of the peers available to download from
239 @type outstanding: C{int}
240 @ivar outstanding: the number of requests to peers currently outstanding
241 @type peerlist: C{list} of L{HTTPDownloader.Peer}
242 @ivar peerlist: the sorted list of peers for this download
243 @type stream: L{GrowingFileStream}
244 @ivar stream: the stream of resulting data from the download
245 @type nextFinish: C{int}
246 @ivar nextFinish: the next piece that is needed to finish for the stream
247 @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
248 @ivar completePieces: one per piece, will be False if no requests are
249 outstanding for the piece, True if the piece has been successfully
250 downloaded, or the Peer that a request for this piece has been sent
253 def __init__(self, manager, hash, mirror, compact_peers, file):
254 """Initialize the instance and check for piece hashes.
256 @type manager: L{PeerManager}
257 @param manager: the manager to send requests for peers to
258 @type hash: L{Hash.HashObject}
259 @param hash: the hash object containing the expected hash for the file
260 @param mirror: the URI of the file on the mirror
261 @type compact_peers: C{list} of C{dictionary}
262 @param compact_peers: a list of the peer info where the file can be found
263 @type file: L{twisted.python.filepath.FilePath}
264 @param file: the temporary file to use to store the downloaded file
266 self.manager = manager
269 self.compact_peers = compact_peers
271 self.path = '/~/' + quote_plus(hash.expected())
272 self.mirror_path = None
279 self.file = file.open('w+')
282 """Start the downloading process."""
283 log.msg('Checking for pieces for %s' % self.path)
284 self.defer = defer.Deferred()
287 pieces_string = {0: 0}
289 pieces_dl_hash = {0: 0}
291 for compact_peer in self.compact_peers:
292 # Build a list of all the peers for this download
293 site = uncompact(compact_peer['c'])
294 peer = self.manager.getPeer(site)
295 self.peers.setdefault(site, {})['peer'] = peer
297 # Extract any piece information from the peers list
298 if 't' in compact_peer:
299 self.peers[site]['t'] = compact_peer['t']['t']
300 pieces_string.setdefault(compact_peer['t']['t'], 0)
301 pieces_string[compact_peer['t']['t']] += 1
302 elif 'h' in compact_peer:
303 self.peers[site]['h'] = compact_peer['h']
304 pieces_hash.setdefault(compact_peer['h'], 0)
305 pieces_hash[compact_peer['h']] += 1
306 elif 'l' in compact_peer:
307 self.peers[site]['l'] = compact_peer['l']
308 pieces_dl_hash.setdefault(compact_peer['l'], 0)
309 pieces_dl_hash[compact_peer['l']] += 1
313 # Select the most popular piece info
314 max_found = max(no_pieces, max(pieces_string.values()),
315 max(pieces_hash.values()), max(pieces_dl_hash.values()))
317 if max_found < len(self.peers):
318 log.msg('Misleading piece information found, using most popular %d of %d peers' %
319 (max_found, len(self.peers)))
321 if max_found == no_pieces:
322 # The file is not split into pieces
323 log.msg('No pieces were found for the file')
324 self.pieces = [self.hash.expected()]
326 elif max_found == max(pieces_string.values()):
327 # Small number of pieces in a string
328 for pieces, num in pieces_string.items():
329 # Find the most popular piece string
331 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
332 log.msg('Peer info contained %d piece hashes' % len(self.pieces))
335 elif max_found == max(pieces_hash.values()):
336 # Medium number of pieces stored in the DHT
337 for pieces, num in pieces_hash.items():
338 # Find the most popular piece hash to lookup
340 log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
341 self.getDHTPieces(pieces)
343 elif max_found == max(pieces_dl_hash.values()):
344 # Large number of pieces stored in peers
345 for pieces, num in pieces_dl_hash.items():
346 # Find the most popular piece hash to download
348 log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
349 self.getPeerPieces(pieces)
353 #{ Downloading the piece hashes
354 def getDHTPieces(self, key):
355 """Retrieve the piece information from the DHT.
357 @param key: the key to lookup in the DHT
359 # Remove any peers with the wrong piece hash
360 #for site in self.peers.keys():
361 # if self.peers[site].get('h', '') != key:
362 # del self.peers[site]
364 # Start the DHT lookup
365 lookupDefer = self.manager.dht.getValue(key)
366 lookupDefer.addBoth(self._getDHTPieces, key)
368 def _getDHTPieces(self, results, key):
369 """Check the retrieved values."""
370 if isinstance(results, list):
371 for result in results:
372 # Make sure the hash matches the key
373 result_hash = sha.new(result.get('t', '')).digest()
374 if result_hash == key:
376 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
377 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
381 log.msg('Could not retrieve the piece hashes from the DHT')
383 log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
385 # Continue without the piece hashes
386 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
389 def getPeerPieces(self, key, failedSite = None):
390 """Retrieve the piece information from the peers.
392 @param key: the key to request from the peers
394 if failedSite is None:
395 log.msg('Starting the lookup of piece hashes in peers')
397 # Remove any peers with the wrong piece hash
398 #for site in self.peers.keys():
399 # if self.peers[site].get('l', '') != key:
400 # del self.peers[site]
402 log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
403 self.peers[failedSite]['failed'] = True
404 self.outstanding -= 1
406 if self.pieces is None:
407 # Send a request to one or more peers
408 log.msg('Checking for a peer to request piece hashes from')
409 for site in self.peers:
410 if self.peers[site].get('failed', False) != True:
411 log.msg('Sending a piece hash request to %r' % (site, ))
412 path = '/~/' + quote_plus(key)
413 lookupDefer = self.peers[site]['peer'].get(path)
414 reactor.callLater(0, lookupDefer.addCallbacks,
415 *(self._getPeerPieces, self._gotPeerError),
416 **{'callbackArgs': (key, site),
417 'errbackArgs': (key, site)})
418 self.outstanding += 1
419 if self.outstanding >= 4:
422 log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
423 if self.pieces is None and self.outstanding <= 0:
424 # Continue without the piece hashes
425 log.msg('Could not retrieve the piece hashes from the peers')
426 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
429 def _getPeerPieces(self, response, key, site):
430 """Process the retrieved response from the peer."""
431 log.msg('Got a piece hash response %d from %r' % (response.code, site))
432 if response.code != 200:
433 # Request failed, try a different peer
434 log.msg('Did not like response %d from %r' % (response.code, site))
435 self.getPeerPieces(key, site)
437 # Read the response stream to a string
438 self.peers[site]['pieces'] = ''
439 def _gotPeerPiece(data, self = self, site = site):
440 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
441 self.peers[site]['pieces'] += data
442 log.msg('Streaming piece hashes from peer')
443 df = stream.readStream(response.stream, _gotPeerPiece)
444 df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
445 callbackArgs=(key, site), errbackArgs=(key, site))
447 def _gotPeerError(self, err, key, site):
448 """Peer failed, try again."""
449 log.msg('Peer piece hash request failed for %r' % (site, ))
451 self.getPeerPieces(key, site)
453 def _gotPeerPieces(self, result, key, site):
454 """Check the retrieved pieces from the peer."""
455 log.msg('Finished streaming piece hashes from peer %r' % (site, ))
456 if self.pieces is not None:
458 log.msg('Already done')
462 result = bdecode(self.peers[site]['pieces'])
464 log.msg('Error bdecoding piece hashes')
466 self.getPeerPieces(key, site)
469 result_hash = sha.new(result.get('t', '')).digest()
470 if result_hash == key:
472 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
473 log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
476 log.msg('Peer returned a piece string that did not match')
477 self.getPeerPieces(key, site)
479 #{ Downloading the file
481 """Sort the peers by their rank (highest ranked at the end)."""
483 """Sort peers by their rank."""
486 elif a.rank < b.rank:
489 self.peerlist.sort(sort)
491 def startDownload(self):
492 """Start the download from the peers."""
497 log.msg('Starting to download %s' % self.path)
499 assert self.pieces, "You must initialize the piece hashes first"
500 self.peerlist = [self.peers[site]['peer'] for site in self.peers]
502 # Use the mirror if there are few peers
503 if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
504 parsed = urlparse(self.mirror)
505 if parsed[0] == "http":
506 site = splitHostPort(parsed[0], parsed[1])
507 self.mirror_path = urlunparse(('', '') + parsed[2:])
508 peer = self.manager.getPeer(site)
510 self.peerlist.append(peer)
512 # Special case if there's only one good peer left
513 # if len(self.peerlist) == 1:
514 # log.msg('Downloading from peer %r' % (self.peerlist[0], ))
515 # self.defer.callback(self.peerlist[0].get(self.path))
518 # Start sending the return file
519 self.stream = GrowingFileStream(self.file, self.hash.expSize)
520 resp = Response(200, {}, self.stream)
521 self.defer.callback(resp)
523 # Begin to download the pieces
526 self.completePieces = [False for piece in self.pieces]
529 #{ Downloading the pieces
531 """Download the next pieces from the peers."""
532 log.msg('Checking for more piece requests to send')
534 piece = self.nextFinish
535 while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
536 log.msg('Checking piece %d' % piece)
537 if self.completePieces[piece] == False:
538 # Send a request to the highest ranked peer
539 peer = self.peerlist.pop()
540 self.completePieces[piece] = peer
541 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
543 self.outstanding += 1
545 df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
547 df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
548 reactor.callLater(0, df.addCallbacks,
549 *(self._getPiece, self._getError),
550 **{'callbackArgs': (piece, peer),
551 'errbackArgs': (piece, peer)})
554 log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
555 # Check if we're done
556 if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
557 log.msg('We seem to be done with all pieces')
558 self.stream.allAvailable()
560 def _getPiece(self, response, piece, peer):
561 """Process the retrieved headers from the peer."""
562 log.msg('Got response for piece %d from peer %r' % (piece, peer))
563 if ((len(self.completePieces) > 1 and response.code != 206) or
564 (response.code not in (200, 206))):
565 # Request failed, try a different peer
566 log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
567 peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
568 self.completePieces[piece] = False
569 if response.stream and response.stream.length:
570 stream.readAndDiscard(response.stream)
572 # Read the response stream to the file
573 log.msg('Streaming piece %d from peer %r' % (piece, peer))
574 def statUpdate(bytes, stats = self.manager.stats, mirror = peer.mirror):
575 stats.receivedBytes(bytes, mirror)
576 if response.code == 206:
577 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
578 PIECE_SIZE, statUpdate).run()
580 df = StreamToFile(response.stream, self.file,
581 inform = statUpdate).run()
582 reactor.callLater(0, df.addCallbacks,
583 *(self._gotPiece, self._gotError),
584 **{'callbackArgs': (piece, peer),
585 'errbackArgs': (piece, peer)})
587 self.outstanding -= 1
588 self.peerlist.append(peer)
591 def _getError(self, err, piece, peer):
592 """Peer failed, try again."""
593 log.msg('Got error for piece %d from peer %r' % (piece, peer))
594 self.outstanding -= 1
595 self.peerlist.append(peer)
596 self.completePieces[piece] = False
600 def _gotPiece(self, response, piece, peer):
601 """Process the retrieved piece from the peer."""
602 log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
603 if self.pieces[piece] and response != self.pieces[piece]:
605 log.msg('Hash error for piece %d from peer %r' % (piece, peer))
606 peer.hashError('Piece received from peer does not match expected')
607 self.completePieces[piece] = False
609 # Successfully completed one of several pieces
610 log.msg('Finished with piece %d from peer %r' % (piece, peer))
611 self.completePieces[piece] = True
612 while (self.nextFinish < len(self.completePieces) and
613 self.completePieces[self.nextFinish] == True):
615 self.stream.updateAvailable(PIECE_SIZE)
619 def _gotError(self, err, piece, peer):
620 """Piece download failed, try again."""
621 log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
623 self.completePieces[piece] = False
627 """Manage a set of peers and the requests to them.
629 @type cache_dir: L{twisted.python.filepath.FilePath}
630 @ivar cache_dir: the directory to use for storing all files
631 @type dht: L{interfaces.IDHT}
632 @ivar dht: the DHT instance
633 @type stats: L{stats.StatsLogger}
634 @ivar stats: the statistics logger to record sent data to
635 @type clients: C{dictionary}
636 @ivar clients: the available peers that have been previously contacted
639 def __init__(self, cache_dir, dht, stats):
640 """Initialize the instance."""
641 self.cache_dir = cache_dir
642 self.cache_dir.restat(False)
643 if not self.cache_dir.exists():
644 self.cache_dir.makedirs()
649 def get(self, hash, mirror, peers = [], method="GET", modtime=None):
650 """Download from a list of peers or fallback to a mirror.
652 @type hash: L{Hash.HashObject}
653 @param hash: the hash object containing the expected hash for the file
654 @param mirror: the URI of the file on the mirror
655 @type peers: C{list} of C{string}
656 @param peers: a list of the peer info where the file can be found
657 (optional, defaults to downloading from the mirror)
658 @type method: C{string}
659 @param method: the HTTP method to use, 'GET' or 'HEAD'
660 (optional, defaults to 'GET')
661 @type modtime: C{int}
662 @param modtime: the modification time to use for an 'If-Modified-Since'
663 header, as seconds since the epoch
664 (optional, defaults to not sending that header)
666 if not peers or method != "GET" or modtime is not None:
667 log.msg('Downloading (%s) from mirror %s' % (method, mirror))
668 parsed = urlparse(mirror)
669 assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
670 site = splitHostPort(parsed[0], parsed[1])
671 path = urlunparse(('', '') + parsed[2:])
672 peer = self.getPeer(site)
674 return peer.get(path, method, modtime)
675 # elif len(peers) == 1:
676 # site = uncompact(peers[0]['c'])
677 # log.msg('Downloading from peer %r' % (site, ))
678 # path = '/~/' + quote_plus(hash.expected())
679 # peer = self.getPeer(site)
680 # return peer.get(path)
682 tmpfile = self.cache_dir.child(hash.hexexpected())
683 return FileDownload(self, hash, mirror, peers, tmpfile).run()
685 def getPeer(self, site):
686 """Create a new peer if necessary and return it.
688 @type site: (C{string}, C{int})
689 @param site: the IP address and port of the peer
691 if site not in self.clients:
692 self.clients[site] = Peer(site[0], site[1])
693 return self.clients[site]
696 """Close all the connections to peers."""
697 for site in self.clients:
698 self.clients[site].close()
701 class TestPeerManager(unittest.TestCase):
702 """Unit tests for the PeerManager."""
708 for p in self.pending_calls:
711 self.pending_calls = []