2 """Manage a set of peers and the requests to them."""
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
21 class GrowingFileStream(stream.FileStream):
22 """Modified to stream data from a file as it becomes available.
24 @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
25 @ivar deferred: waiting for the result of the last read attempt
26 @ivar available: the number of bytes that are currently available to read
27 @ivar position: the current position in the file where the next read will begin
28 @ivar finished: True when no more data will be coming available
33 def __init__(self, f, length = None):
34 stream.FileStream.__init__(self, f)
41 def updateAvailable(self, newlyAvailable):
42 """Update the number of bytes that are available.
44 Call it with 0 to trigger reading of a fully read file.
46 @param newlyAvailable: the number of bytes that just became available
48 assert not self.finished
49 self.available += newlyAvailable
51 # If a read is pending, let it go
52 if self.deferred and self.position < self.available:
53 # Try to read some data from the file
54 length = self.available - self.position
55 readSize = min(length, self.CHUNK_SIZE)
56 self.f.seek(self.position)
57 b = self.f.read(readSize)
60 # Check if end of file was reached
62 self.position += bytesRead
63 deferred = self.deferred
67 def allAvailable(self):
68 """Indicate that no more data will be coming available."""
71 # If a read is pending, let it go
73 if self.position < self.available:
74 # Try to read some data from the file
75 length = self.available - self.position
76 readSize = min(length, self.CHUNK_SIZE)
77 self.f.seek(self.position)
78 b = self.f.read(readSize)
81 # Check if end of file was reached
83 self.position += bytesRead
84 deferred = self.deferred
89 deferred = self.deferred
91 deferred.callback(None)
94 deferred = self.deferred
96 deferred.callback(None)
98 def read(self, sendfile=False):
99 assert not self.deferred, "A previous read is still deferred."
104 length = self.available - self.position
105 readSize = min(length, self.CHUNK_SIZE)
107 # If we don't have any available, we're done or deferred
112 self.deferred = defer.Deferred()
115 # Try to read some data from the file
116 self.f.seek(self.position)
117 b = self.f.read(readSize)
120 # End of file was reached, we're done or deferred
124 self.deferred = defer.Deferred()
127 self.position += bytesRead
131 """Save a stream to a partial file and hash it.
133 @type stream: L{twisted.web2.stream.IByteStream}
134 @ivar stream: the input stream being read
135 @type outFile: L{twisted.python.filepath.FilePath}
136 @ivar outFile: the file being written
138 @ivar hash: the hash object for the data
139 @type position: C{int}
140 @ivar position: the current file position to write the next data to
142 @ivar length: the position in the file to not write beyond
143 @type doneDefer: L{twisted.internet.defer.Deferred}
144 @ivar doneDefer: the deferred that will fire when done writing
147 def __init__(self, inputStream, outFile, start = 0, length = None):
148 """Initializes the file.
150 @type inputStream: L{twisted.web2.stream.IByteStream}
151 @param inputStream: the input stream to read from
152 @type outFile: L{twisted.python.filepath.FilePath}
153 @param outFile: the file to write to
155 @param start: the file position to start writing at
156 (optional, defaults to the start of the file)
158 @param length: the maximum amount of data to write to the file
159 (optional, defaults to not limiting the writing to the file
161 self.stream = inputStream
162 self.outFile = outFile
163 self.hash = sha.new()
164 self.position = start
166 if length is not None:
167 self.length = start + length
168 self.doneDefer = None
171 """Start the streaming.
173 @rtype: L{twisted.internet.defer.Deferred}
175 log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
176 self.doneDefer = stream.readStream(self.stream, self._gotData)
177 self.doneDefer.addCallbacks(self._done, self._error)
178 return self.doneDefer
180 def _gotData(self, data):
181 """Process the received data."""
182 if self.outFile.closed:
183 raise Exception, "outFile was unexpectedly closed"
186 raise Exception, "Data is None?"
188 # Make sure we don't go too far
189 if self.length is not None and self.position + len(data) > self.length:
190 data = data[:(self.length - self.position)]
192 # Write and hash the streamed data
193 self.outFile.seek(self.position)
194 self.outFile.write(data)
195 self.hash.update(data)
196 self.position += len(data)
198 def _done(self, result):
199 """Return the result."""
200 log.msg('Streaming is complete')
201 return self.hash.digest()
203 def _error(self, err):
205 log.msg('Streaming error')
210 """Manage a download from a list of peers or a mirror.
212 @type manager: L{PeerManager}
213 @ivar manager: the manager to send requests for peers to
214 @type hash: L{Hash.HashObject}
215 @ivar hash: the hash object containing the expected hash for the file
216 @ivar mirror: the URI of the file on the mirror
217 @type compact_peers: C{list} of C{dictionary}
218 @ivar compact_peers: a list of the peer info where the file can be found
220 @ivar file: the open file to right the download to
221 @type path: C{string}
222 @ivar path: the path to request from peers to access the file
223 @type pieces: C{list} of C{string}
224 @ivar pieces: the hashes of the pieces in the file
225 @type started: C{boolean}
226 @ivar started: whether the download has begun yet
227 @type defer: L{twisted.internet.defer.Deferred}
228 @ivar defer: the deferred that will callback with the result of the download
229 @type peers: C{dictionary}
230 @ivar peers: information about each of the peers available to download from
231 @type outstanding: C{int}
232 @ivar outstanding: the number of requests to peers currently outstanding
233 @type peerlist: C{list} of L{HTTPDownloader.Peer}
234 @ivar peerlist: the sorted list of peers for this download
235 @type stream: L{GrowingFileStream}
236 @ivar stream: the stream of resulting data from the download
237 @type nextFinish: C{int}
238 @ivar nextFinish: the next piece that is needed to finish for the stream
239 @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
240 @ivar completePieces: one per piece, will be False if no requests are
241 outstanding for the piece, True if the piece has been successfully
242 downloaded, or the Peer that a request for this piece has been sent
245 def __init__(self, manager, hash, mirror, compact_peers, file):
246 """Initialize the instance and check for piece hashes.
248 @type manager: L{PeerManager}
249 @param manager: the manager to send requests for peers to
250 @type hash: L{Hash.HashObject}
251 @param hash: the hash object containing the expected hash for the file
252 @param mirror: the URI of the file on the mirror
253 @type compact_peers: C{list} of C{dictionary}
254 @param compact_peers: a list of the peer info where the file can be found
255 @type file: L{twisted.python.filepath.FilePath}
256 @param file: the temporary file to use to store the downloaded file
258 self.manager = manager
261 self.compact_peers = compact_peers
263 self.path = '/~/' + quote_plus(hash.expected())
270 self.file = file.open('w+')
273 """Start the downloading process."""
274 log.msg('Checking for pieces for %s' % self.path)
275 self.defer = defer.Deferred()
278 pieces_string = {0: 0}
280 pieces_dl_hash = {0: 0}
282 for compact_peer in self.compact_peers:
283 # Build a list of all the peers for this download
284 site = uncompact(compact_peer['c'])
285 peer = self.manager.getPeer(site)
286 self.peers.setdefault(site, {})['peer'] = peer
288 # Extract any piece information from the peers list
289 if 't' in compact_peer:
290 self.peers[site]['t'] = compact_peer['t']['t']
291 pieces_string.setdefault(compact_peer['t']['t'], 0)
292 pieces_string[compact_peer['t']['t']] += 1
293 elif 'h' in compact_peer:
294 self.peers[site]['h'] = compact_peer['h']
295 pieces_hash.setdefault(compact_peer['h'], 0)
296 pieces_hash[compact_peer['h']] += 1
297 elif 'l' in compact_peer:
298 self.peers[site]['l'] = compact_peer['l']
299 pieces_dl_hash.setdefault(compact_peer['l'], 0)
300 pieces_dl_hash[compact_peer['l']] += 1
304 # Select the most popular piece info
305 max_found = max(no_pieces, max(pieces_string.values()),
306 max(pieces_hash.values()), max(pieces_dl_hash.values()))
308 if max_found < len(self.peers):
309 log.msg('Misleading piece information found, using most popular %d of %d peers' %
310 (max_found, len(self.peers)))
312 if max_found == no_pieces:
313 # The file is not split into pieces
314 log.msg('No pieces were found for the file')
317 elif max_found == max(pieces_string.values()):
318 # Small number of pieces in a string
319 for pieces, num in pieces_string.items():
320 # Find the most popular piece string
322 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
323 log.msg('Peer info contained %d piece hashes' % len(self.pieces))
326 elif max_found == max(pieces_hash.values()):
327 # Medium number of pieces stored in the DHT
328 for pieces, num in pieces_hash.items():
329 # Find the most popular piece hash to lookup
331 log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
332 self.getDHTPieces(pieces)
334 elif max_found == max(pieces_dl_hash.values()):
335 # Large number of pieces stored in peers
336 for pieces, num in pieces_dl_hash.items():
337 # Find the most popular piece hash to download
339 log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
340 self.getPeerPieces(pieces)
344 #{ Downloading the piece hashes
345 def getDHTPieces(self, key):
346 """Retrieve the piece information from the DHT.
348 @param key: the key to lookup in the DHT
350 # Remove any peers with the wrong piece hash
351 #for site in self.peers.keys():
352 # if self.peers[site].get('h', '') != key:
353 # del self.peers[site]
355 # Start the DHT lookup
356 lookupDefer = self.manager.dht.getValue(key)
357 lookupDefer.addCallback(self._getDHTPieces, key)
359 def _getDHTPieces(self, results, key):
360 """Check the retrieved values."""
361 for result in results:
362 # Make sure the hash matches the key
363 result_hash = sha.new(result.get('t', '')).digest()
364 if result_hash == key:
366 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
367 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
371 # Continue without the piece hashes
372 log.msg('Could not retrieve the piece hashes from the DHT')
376 def getPeerPieces(self, key, failedSite = None):
377 """Retrieve the piece information from the peers.
379 @param key: the key to request from the peers
381 if failedSite is None:
382 log.msg('Starting the lookup of piece hashes in peers')
384 # Remove any peers with the wrong piece hash
385 #for site in self.peers.keys():
386 # if self.peers[site].get('l', '') != key:
387 # del self.peers[site]
389 log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
390 self.peers[failedSite]['failed'] = True
391 self.outstanding -= 1
393 if self.pieces is None:
394 # Send a request to one or more peers
395 log.msg('Checking for a peer to request piece hashes from')
396 for site in self.peers:
397 if self.peers[site].get('failed', False) != True:
398 log.msg('Sending a piece hash request to %r' % (site, ))
399 path = '/~/' + quote_plus(key)
400 lookupDefer = self.peers[site]['peer'].get(path)
401 reactor.callLater(0, lookupDefer.addCallbacks,
402 *(self._getPeerPieces, self._gotPeerError),
403 **{'callbackArgs': (key, site),
404 'errbackArgs': (key, site)})
405 self.outstanding += 1
406 if self.outstanding >= 4:
409 log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
410 if self.pieces is None and self.outstanding <= 0:
411 # Continue without the piece hashes
412 log.msg('Could not retrieve the piece hashes from the peers')
416 def _getPeerPieces(self, response, key, site):
417 """Process the retrieved response from the peer."""
418 log.msg('Got a piece hash response %d from %r' % (response.code, site))
419 if response.code != 200:
420 # Request failed, try a different peer
421 log.msg('Did not like response %d from %r' % (response.code, site))
422 self.getPeerPieces(key, site)
424 # Read the response stream to a string
425 self.peers[site]['pieces'] = ''
426 def _gotPeerPiece(data, self = self, site = site):
427 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
428 self.peers[site]['pieces'] += data
429 log.msg('Streaming piece hashes from peer')
430 df = stream.readStream(response.stream, _gotPeerPiece)
431 df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
432 callbackArgs=(key, site), errbackArgs=(key, site))
434 def _gotPeerError(self, err, key, site):
435 """Peer failed, try again."""
436 log.msg('Peer piece hash request failed for %r' % (site, ))
438 self.getPeerPieces(key, site)
440 def _gotPeerPieces(self, result, key, site):
441 """Check the retrieved pieces from the peer."""
442 log.msg('Finished streaming piece hashes from peer %r' % (site, ))
443 if self.pieces is not None:
445 log.msg('Already done')
449 result = bdecode(self.peers[site]['pieces'])
451 log.msg('Error bdecoding piece hashes')
453 self.getPeerPieces(key, site)
456 result_hash = sha.new(result.get('t', '')).digest()
457 if result_hash == key:
459 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
460 log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
463 log.msg('Peer returned a piece string that did not match')
464 self.getPeerPieces(key, site)
466 #{ Downloading the file
468 """Sort the peers by their rank (highest ranked at the end)."""
470 """Sort peers by their rank."""
473 elif a.rank < b.rank:
476 self.peerlist.sort(sort)
478 def startDownload(self):
479 """Start the download from the peers."""
484 log.msg('Starting to download %s' % self.path)
486 assert self.pieces is not None, "You must initialize the piece hashes first"
487 self.peerlist = [self.peers[site]['peer'] for site in self.peers]
489 # Special case if there's only one good peer left
490 if len(self.peerlist) == 1:
491 log.msg('Downloading from peer %r' % (self.peerlist[0], ))
492 self.defer.callback(self.peerlist[0].get(self.path))
495 # Start sending the return file
496 self.stream = GrowingFileStream(self.file, self.hash.expSize)
497 resp = Response(200, {}, self.stream)
498 self.defer.callback(resp)
500 # Begin to download the pieces
504 self.completePieces = [False for piece in self.pieces]
506 self.completePieces = [False]
509 #{ Downloading the pieces
511 """Download the next pieces from the peers."""
512 log.msg('Checking for more piece requests to send')
514 piece = self.nextFinish
515 while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
516 log.msg('Checking piece %d' % piece)
517 if self.completePieces[piece] == False:
518 # Send a request to the highest ranked peer
519 peer = self.peerlist.pop()
520 self.completePieces[piece] = peer
521 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
523 self.outstanding += 1
525 df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
527 df = peer.get(self.path)
528 reactor.callLater(0, df.addCallbacks,
529 *(self._getPiece, self._getError),
530 **{'callbackArgs': (piece, peer),
531 'errbackArgs': (piece, peer)})
534 log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
535 # Check if we're done
536 if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
537 log.msg('We seem to be done with all pieces')
538 self.stream.allAvailable()
540 def _getPiece(self, response, piece, peer):
541 """Process the retrieved headers from the peer."""
542 log.msg('Got response for piece %d from peer %r' % (piece, peer))
543 if ((len(self.completePieces) > 1 and response.code != 206) or
544 (response.code not in (200, 206))):
545 # Request failed, try a different peer
546 log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
547 peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
548 self.completePieces[piece] = False
549 if response.stream and response.stream.length:
550 stream.readAndDiscard(response.stream)
552 # Read the response stream to the file
553 log.msg('Streaming piece %d from peer %r' % (piece, peer))
554 if response.code == 206:
555 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
557 df = StreamToFile(response.stream, self.file).run()
558 df.addCallbacks(self._gotPiece, self._gotError,
559 callbackArgs=(piece, peer), errbackArgs=(piece, peer))
561 self.outstanding -= 1
562 self.peerlist.append(peer)
565 def _getError(self, err, piece, peer):
566 """Peer failed, try again."""
567 log.msg('Got error for piece %d from peer %r' % (piece, peer))
568 self.outstanding -= 1
569 self.peerlist.append(peer)
570 self.completePieces[piece] = False
574 def _gotPiece(self, response, piece, peer):
575 """Process the retrieved piece from the peer."""
576 log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
577 if ((self.pieces and response != self.pieces[piece]) or
578 (len(self.pieces) == 0 and response != self.hash.expected())):
580 log.msg('Hash error for piece %d from peer %r' % (piece, peer))
581 peer.hashError('Piece received from peer does not match expected')
582 self.completePieces[piece] = False
584 # Successfully completed one of several pieces
585 log.msg('Finished with piece %d from peer %r' % (piece, peer))
586 self.completePieces[piece] = True
587 while (self.nextFinish < len(self.completePieces) and
588 self.completePieces[self.nextFinish] == True):
590 self.stream.updateAvailable(PIECE_SIZE)
592 # Whole download (only one piece) is complete
593 log.msg('Piece %d from peer %r is the last piece' % (piece, peer))
594 self.completePieces[piece] = True
596 self.stream.updateAvailable(2**30)
600 def _gotError(self, err, piece, peer):
601 """Piece download failed, try again."""
602 log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
604 self.completePieces[piece] = False
608 """Manage a set of peers and the requests to them.
610 @type cache_dir: L{twisted.python.filepath.FilePath}
611 @ivar cache_dir: the directory to use for storing all files
612 @type dht: L{interfaces.IDHT}
613 @ivar dht: the DHT instance
614 @type clients: C{dictionary}
615 @ivar clients: the available peers that have been previously contacted
618 def __init__(self, cache_dir, dht):
619 """Initialize the instance."""
620 self.cache_dir = cache_dir
621 self.cache_dir.restat(False)
622 if not self.cache_dir.exists():
623 self.cache_dir.makedirs()
627 def get(self, hash, mirror, peers = [], method="GET", modtime=None):
628 """Download from a list of peers or fallback to a mirror.
630 @type hash: L{Hash.HashObject}
631 @param hash: the hash object containing the expected hash for the file
632 @param mirror: the URI of the file on the mirror
633 @type peers: C{list} of C{string}
634 @param peers: a list of the peer info where the file can be found
635 (optional, defaults to downloading from the mirror)
636 @type method: C{string}
637 @param method: the HTTP method to use, 'GET' or 'HEAD'
638 (optional, defaults to 'GET')
639 @type modtime: C{int}
640 @param modtime: the modification time to use for an 'If-Modified-Since'
641 header, as seconds since the epoch
642 (optional, defaults to not sending that header)
644 if not peers or method != "GET" or modtime is not None:
645 log.msg('Downloading (%s) from mirror %s' % (method, mirror))
646 parsed = urlparse(mirror)
647 assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
648 site = splitHostPort(parsed[0], parsed[1])
649 path = urlunparse(('', '') + parsed[2:])
650 peer = self.getPeer(site)
651 return peer.get(path, method, modtime)
652 elif len(peers) == 1:
653 site = uncompact(peers[0]['c'])
654 log.msg('Downloading from peer %r' % (site, ))
655 path = '/~/' + quote_plus(hash.expected())
656 peer = self.getPeer(site)
657 return peer.get(path)
659 tmpfile = self.cache_dir.child(hash.hexexpected())
660 return FileDownload(self, hash, mirror, peers, tmpfile).run()
662 def getPeer(self, site):
663 """Create a new peer if necessary and return it.
665 @type site: (C{string}, C{int})
666 @param site: the IP address and port of the peer
668 if site not in self.clients:
669 self.clients[site] = Peer(site[0], site[1])
670 return self.clients[site]
673 """Close all the connections to peers."""
674 for site in self.clients:
675 self.clients[site].close()
678 class TestPeerManager(unittest.TestCase):
679 """Unit tests for the PeerManager."""
685 for p in self.pending_calls:
688 self.pending_calls = []