2 """Manage a set of peers and the requests to them."""
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
21 class GrowingFileStream(stream.FileStream):
22 """Modified to stream data from a file as it becomes available.
24 @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
25 @ivar deferred: waiting for the result of the last read attempt
26 @ivar available: the number of bytes that are currently available to read
27 @ivar position: the current position in the file where the next read will begin
28 @ivar finished: True when no more data will be coming available
33 def __init__(self, f, length = None):
34 stream.FileStream.__init__(self, f)
41 def updateAvailable(self, newlyAvailable):
42 """Update the number of bytes that are available.
44 Call it with 0 to trigger reading of a fully read file.
46 @param newlyAvailable: the number of bytes that just became available
48 assert not self.finished
49 self.available += newlyAvailable
51 # If a read is pending, let it go
52 if self.deferred and self.position < self.available:
53 # Try to read some data from the file
54 length = self.available - self.position
55 readSize = min(length, self.CHUNK_SIZE)
56 self.f.seek(self.position)
57 b = self.f.read(readSize)
60 # Check if end of file was reached
62 self.position += bytesRead
63 deferred = self.deferred
67 def allAvailable(self):
68 """Indicate that no more data will be coming available."""
71 # If a read is pending, let it go
73 if self.position < self.available:
74 # Try to read some data from the file
75 length = self.available - self.position
76 readSize = min(length, self.CHUNK_SIZE)
77 self.f.seek(self.position)
78 b = self.f.read(readSize)
81 # Check if end of file was reached
83 self.position += bytesRead
84 deferred = self.deferred
89 deferred = self.deferred
91 deferred.callback(None)
94 deferred = self.deferred
96 deferred.callback(None)
98 def read(self, sendfile=False):
99 assert not self.deferred, "A previous read is still deferred."
104 length = self.available - self.position
105 readSize = min(length, self.CHUNK_SIZE)
107 # If we don't have any available, we're done or deferred
112 self.deferred = defer.Deferred()
115 # Try to read some data from the file
116 self.f.seek(self.position)
117 b = self.f.read(readSize)
120 # End of file was reached, we're done or deferred
124 self.deferred = defer.Deferred()
127 self.position += bytesRead
131 """Save a stream to a partial file and hash it.
133 @type stream: L{twisted.web2.stream.IByteStream}
134 @ivar stream: the input stream being read
135 @type outFile: L{twisted.python.filepath.FilePath}
136 @ivar outFile: the file being written
138 @ivar hash: the hash object for the data
139 @type position: C{int}
140 @ivar position: the current file position to write the next data to
142 @ivar length: the position in the file to not write beyond
143 @type doneDefer: L{twisted.internet.defer.Deferred}
144 @ivar doneDefer: the deferred that will fire when done writing
147 def __init__(self, inputStream, outFile, start = 0, length = None):
148 """Initializes the file.
150 @type inputStream: L{twisted.web2.stream.IByteStream}
151 @param inputStream: the input stream to read from
152 @type outFile: L{twisted.python.filepath.FilePath}
153 @param outFile: the file to write to
155 @param start: the file position to start writing at
156 (optional, defaults to the start of the file)
158 @param length: the maximum amount of data to write to the file
159 (optional, defaults to not limiting the writing to the file
161 self.stream = inputStream
162 self.outFile = outFile
163 self.hash = sha.new()
164 self.position = start
166 if length is not None:
167 self.length = start + length
168 self.doneDefer = None
171 """Start the streaming.
173 @rtype: L{twisted.internet.defer.Deferred}
175 log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
176 self.doneDefer = stream.readStream(self.stream, self._gotData)
177 self.doneDefer.addCallbacks(self._done, self._error)
178 return self.doneDefer
180 def _gotData(self, data):
181 """Process the received data."""
182 if self.outFile.closed:
183 raise Exception, "outFile was unexpectedly closed"
186 raise Exception, "Data is None?"
188 # Make sure we don't go too far
189 if self.length is not None and self.position + len(data) > self.length:
190 data = data[:(self.length - self.position)]
192 # Write and hash the streamed data
193 self.outFile.seek(self.position)
194 self.outFile.write(data)
195 self.hash.update(data)
196 self.position += len(data)
198 def _done(self, result):
199 """Return the result."""
200 log.msg('Streaming is complete')
201 return self.hash.digest()
203 def _error(self, err):
205 log.msg('Streaming error')
210 """Manage a download from a list of peers or a mirror.
212 @type manager: L{PeerManager}
213 @ivar manager: the manager to send requests for peers to
214 @type hash: L{Hash.HashObject}
215 @ivar hash: the hash object containing the expected hash for the file
216 @ivar mirror: the URI of the file on the mirror
217 @type compact_peers: C{list} of C{dictionary}
218 @ivar compact_peers: a list of the peer info where the file can be found
220 @ivar file: the open file to right the download to
221 @type path: C{string}
222 @ivar path: the path to request from peers to access the file
223 @type pieces: C{list} of C{string}
224 @ivar pieces: the hashes of the pieces in the file
225 @type started: C{boolean}
226 @ivar started: whether the download has begun yet
227 @type defer: L{twisted.internet.defer.Deferred}
228 @ivar defer: the deferred that will callback with the result of the download
229 @type peers: C{dictionary}
230 @ivar peers: information about each of the peers available to download from
231 @type outstanding: C{int}
232 @ivar outstanding: the number of requests to peers currently outstanding
233 @type peerlist: C{list} of L{HTTPDownloader.Peer}
234 @ivar peerlist: the sorted list of peers for this download
235 @type stream: L{GrowingFileStream}
236 @ivar stream: the stream of resulting data from the download
237 @type nextFinish: C{int}
238 @ivar nextFinish: the next piece that is needed to finish for the stream
239 @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
240 @ivar completePieces: one per piece, will be False if no requests are
241 outstanding for the piece, True if the piece has been successfully
242 downloaded, or the Peer that a request for this piece has been sent
245 def __init__(self, manager, hash, mirror, compact_peers, file):
246 """Initialize the instance and check for piece hashes.
248 @type manager: L{PeerManager}
249 @param manager: the manager to send requests for peers to
250 @type hash: L{Hash.HashObject}
251 @param hash: the hash object containing the expected hash for the file
252 @param mirror: the URI of the file on the mirror
253 @type compact_peers: C{list} of C{dictionary}
254 @param compact_peers: a list of the peer info where the file can be found
255 @type file: L{twisted.python.filepath.FilePath}
256 @param file: the temporary file to use to store the downloaded file
258 self.manager = manager
261 self.compact_peers = compact_peers
263 self.path = '/~/' + quote_plus(hash.expected())
270 self.file = file.open('w+')
273 """Start the downloading process."""
274 log.msg('Checking for pieces for %s' % self.path)
275 self.defer = defer.Deferred()
278 pieces_string = {0: 0}
280 pieces_dl_hash = {0: 0}
282 for compact_peer in self.compact_peers:
283 # Build a list of all the peers for this download
284 site = uncompact(compact_peer['c'])
285 peer = self.manager.getPeer(site)
286 self.peers.setdefault(site, {})['peer'] = peer
288 # Extract any piece information from the peers list
289 if 't' in compact_peer:
290 self.peers[site]['t'] = compact_peer['t']['t']
291 pieces_string.setdefault(compact_peer['t']['t'], 0)
292 pieces_string[compact_peer['t']['t']] += 1
293 elif 'h' in compact_peer:
294 self.peers[site]['h'] = compact_peer['h']
295 pieces_hash.setdefault(compact_peer['h'], 0)
296 pieces_hash[compact_peer['h']] += 1
297 elif 'l' in compact_peer:
298 self.peers[site]['l'] = compact_peer['l']
299 pieces_dl_hash.setdefault(compact_peer['l'], 0)
300 pieces_dl_hash[compact_peer['l']] += 1
304 # Select the most popular piece info
305 max_found = max(no_pieces, max(pieces_string.values()),
306 max(pieces_hash.values()), max(pieces_dl_hash.values()))
308 if max_found < len(self.peers):
309 log.msg('Misleading piece information found, using most popular %d of %d peers' %
310 (max_found, len(self.peers)))
312 if max_found == no_pieces:
313 # The file is not split into pieces
314 log.msg('No pieces were found for the file')
315 self.pieces = [self.hash.expected()]
317 elif max_found == max(pieces_string.values()):
318 # Small number of pieces in a string
319 for pieces, num in pieces_string.items():
320 # Find the most popular piece string
322 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
323 log.msg('Peer info contained %d piece hashes' % len(self.pieces))
326 elif max_found == max(pieces_hash.values()):
327 # Medium number of pieces stored in the DHT
328 for pieces, num in pieces_hash.items():
329 # Find the most popular piece hash to lookup
331 log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
332 self.getDHTPieces(pieces)
334 elif max_found == max(pieces_dl_hash.values()):
335 # Large number of pieces stored in peers
336 for pieces, num in pieces_dl_hash.items():
337 # Find the most popular piece hash to download
339 log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
340 self.getPeerPieces(pieces)
344 #{ Downloading the piece hashes
345 def getDHTPieces(self, key):
346 """Retrieve the piece information from the DHT.
348 @param key: the key to lookup in the DHT
350 # Remove any peers with the wrong piece hash
351 #for site in self.peers.keys():
352 # if self.peers[site].get('h', '') != key:
353 # del self.peers[site]
355 # Start the DHT lookup
356 lookupDefer = self.manager.dht.getValue(key)
357 lookupDefer.addCallback(self._getDHTPieces, key)
359 def _getDHTPieces(self, results, key):
360 """Check the retrieved values."""
361 for result in results:
362 # Make sure the hash matches the key
363 result_hash = sha.new(result.get('t', '')).digest()
364 if result_hash == key:
366 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
367 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
371 # Continue without the piece hashes
372 log.msg('Could not retrieve the piece hashes from the DHT')
373 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
376 def getPeerPieces(self, key, failedSite = None):
377 """Retrieve the piece information from the peers.
379 @param key: the key to request from the peers
381 if failedSite is None:
382 log.msg('Starting the lookup of piece hashes in peers')
384 # Remove any peers with the wrong piece hash
385 #for site in self.peers.keys():
386 # if self.peers[site].get('l', '') != key:
387 # del self.peers[site]
389 log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
390 self.peers[failedSite]['failed'] = True
391 self.outstanding -= 1
393 if self.pieces is None:
394 # Send a request to one or more peers
395 log.msg('Checking for a peer to request piece hashes from')
396 for site in self.peers:
397 if self.peers[site].get('failed', False) != True:
398 log.msg('Sending a piece hash request to %r' % (site, ))
399 path = '/~/' + quote_plus(key)
400 lookupDefer = self.peers[site]['peer'].get(path)
401 reactor.callLater(0, lookupDefer.addCallbacks,
402 *(self._getPeerPieces, self._gotPeerError),
403 **{'callbackArgs': (key, site),
404 'errbackArgs': (key, site)})
405 self.outstanding += 1
406 if self.outstanding >= 4:
409 log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
410 if self.pieces is None and self.outstanding <= 0:
411 # Continue without the piece hashes
412 log.msg('Could not retrieve the piece hashes from the peers')
413 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
416 def _getPeerPieces(self, response, key, site):
417 """Process the retrieved response from the peer."""
418 log.msg('Got a piece hash response %d from %r' % (response.code, site))
419 if response.code != 200:
420 # Request failed, try a different peer
421 log.msg('Did not like response %d from %r' % (response.code, site))
422 self.getPeerPieces(key, site)
424 # Read the response stream to a string
425 self.peers[site]['pieces'] = ''
426 def _gotPeerPiece(data, self = self, site = site):
427 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
428 self.peers[site]['pieces'] += data
429 log.msg('Streaming piece hashes from peer')
430 df = stream.readStream(response.stream, _gotPeerPiece)
431 df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
432 callbackArgs=(key, site), errbackArgs=(key, site))
434 def _gotPeerError(self, err, key, site):
435 """Peer failed, try again."""
436 log.msg('Peer piece hash request failed for %r' % (site, ))
438 self.getPeerPieces(key, site)
440 def _gotPeerPieces(self, result, key, site):
441 """Check the retrieved pieces from the peer."""
442 log.msg('Finished streaming piece hashes from peer %r' % (site, ))
443 if self.pieces is not None:
445 log.msg('Already done')
449 result = bdecode(self.peers[site]['pieces'])
451 log.msg('Error bdecoding piece hashes')
453 self.getPeerPieces(key, site)
456 result_hash = sha.new(result.get('t', '')).digest()
457 if result_hash == key:
459 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
460 log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
463 log.msg('Peer returned a piece string that did not match')
464 self.getPeerPieces(key, site)
466 #{ Downloading the file
468 """Sort the peers by their rank (highest ranked at the end)."""
470 """Sort peers by their rank."""
473 elif a.rank < b.rank:
476 self.peerlist.sort(sort)
478 def startDownload(self):
479 """Start the download from the peers."""
484 log.msg('Starting to download %s' % self.path)
486 assert self.pieces, "You must initialize the piece hashes first"
487 self.peerlist = [self.peers[site]['peer'] for site in self.peers]
489 # Special case if there's only one good peer left
490 # if len(self.peerlist) == 1:
491 # log.msg('Downloading from peer %r' % (self.peerlist[0], ))
492 # self.defer.callback(self.peerlist[0].get(self.path))
495 # Start sending the return file
496 self.stream = GrowingFileStream(self.file, self.hash.expSize)
497 resp = Response(200, {}, self.stream)
498 self.defer.callback(resp)
500 # Begin to download the pieces
503 self.completePieces = [False for piece in self.pieces]
506 #{ Downloading the pieces
508 """Download the next pieces from the peers."""
509 log.msg('Checking for more piece requests to send')
511 piece = self.nextFinish
512 while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
513 log.msg('Checking piece %d' % piece)
514 if self.completePieces[piece] == False:
515 # Send a request to the highest ranked peer
516 peer = self.peerlist.pop()
517 self.completePieces[piece] = peer
518 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
520 self.outstanding += 1
521 df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
522 reactor.callLater(0, df.addCallbacks,
523 *(self._getPiece, self._getError),
524 **{'callbackArgs': (piece, peer),
525 'errbackArgs': (piece, peer)})
528 log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
529 # Check if we're done
530 if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
531 log.msg('We seem to be done with all pieces')
532 self.stream.allAvailable()
534 def _getPiece(self, response, piece, peer):
535 """Process the retrieved headers from the peer."""
536 log.msg('Got response for piece %d from peer %r' % (piece, peer))
537 if ((len(self.completePieces) > 1 and response.code != 206) or
538 (response.code not in (200, 206))):
539 # Request failed, try a different peer
540 log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
541 peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
542 self.completePieces[piece] = False
543 if response.stream and response.stream.length:
544 stream.readAndDiscard(response.stream)
546 # Read the response stream to the file
547 log.msg('Streaming piece %d from peer %r' % (piece, peer))
548 if response.code == 206:
549 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
551 df = StreamToFile(response.stream, self.file).run()
552 df.addCallbacks(self._gotPiece, self._gotError,
553 callbackArgs=(piece, peer), errbackArgs=(piece, peer))
555 self.outstanding -= 1
556 self.peerlist.append(peer)
559 def _getError(self, err, piece, peer):
560 """Peer failed, try again."""
561 log.msg('Got error for piece %d from peer %r' % (piece, peer))
562 self.outstanding -= 1
563 self.peerlist.append(peer)
564 self.completePieces[piece] = False
568 def _gotPiece(self, response, piece, peer):
569 """Process the retrieved piece from the peer."""
570 log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
571 if self.pieces[piece] and response != self.pieces[piece]:
573 log.msg('Hash error for piece %d from peer %r' % (piece, peer))
574 peer.hashError('Piece received from peer does not match expected')
575 self.completePieces[piece] = False
577 # Successfully completed one of several pieces
578 log.msg('Finished with piece %d from peer %r' % (piece, peer))
579 self.completePieces[piece] = True
580 while (self.nextFinish < len(self.completePieces) and
581 self.completePieces[self.nextFinish] == True):
583 self.stream.updateAvailable(PIECE_SIZE)
587 def _gotError(self, err, piece, peer):
588 """Piece download failed, try again."""
589 log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
591 self.completePieces[piece] = False
595 """Manage a set of peers and the requests to them.
597 @type cache_dir: L{twisted.python.filepath.FilePath}
598 @ivar cache_dir: the directory to use for storing all files
599 @type dht: L{interfaces.IDHT}
600 @ivar dht: the DHT instance
601 @type clients: C{dictionary}
602 @ivar clients: the available peers that have been previously contacted
605 def __init__(self, cache_dir, dht):
606 """Initialize the instance."""
607 self.cache_dir = cache_dir
608 self.cache_dir.restat(False)
609 if not self.cache_dir.exists():
610 self.cache_dir.makedirs()
614 def get(self, hash, mirror, peers = [], method="GET", modtime=None):
615 """Download from a list of peers or fallback to a mirror.
617 @type hash: L{Hash.HashObject}
618 @param hash: the hash object containing the expected hash for the file
619 @param mirror: the URI of the file on the mirror
620 @type peers: C{list} of C{string}
621 @param peers: a list of the peer info where the file can be found
622 (optional, defaults to downloading from the mirror)
623 @type method: C{string}
624 @param method: the HTTP method to use, 'GET' or 'HEAD'
625 (optional, defaults to 'GET')
626 @type modtime: C{int}
627 @param modtime: the modification time to use for an 'If-Modified-Since'
628 header, as seconds since the epoch
629 (optional, defaults to not sending that header)
631 if not peers or method != "GET" or modtime is not None:
632 log.msg('Downloading (%s) from mirror %s' % (method, mirror))
633 parsed = urlparse(mirror)
634 assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
635 site = splitHostPort(parsed[0], parsed[1])
636 path = urlunparse(('', '') + parsed[2:])
637 peer = self.getPeer(site)
638 return peer.get(path, method, modtime)
639 # elif len(peers) == 1:
640 # site = uncompact(peers[0]['c'])
641 # log.msg('Downloading from peer %r' % (site, ))
642 # path = '/~/' + quote_plus(hash.expected())
643 # peer = self.getPeer(site)
644 # return peer.get(path)
646 tmpfile = self.cache_dir.child(hash.hexexpected())
647 return FileDownload(self, hash, mirror, peers, tmpfile).run()
649 def getPeer(self, site):
650 """Create a new peer if necessary and return it.
652 @type site: (C{string}, C{int})
653 @param site: the IP address and port of the peer
655 if site not in self.clients:
656 self.clients[site] = Peer(site[0], site[1])
657 return self.clients[site]
660 """Close all the connections to peers."""
661 for site in self.clients:
662 self.clients[site].close()
665 class TestPeerManager(unittest.TestCase):
666 """Unit tests for the PeerManager."""
672 for p in self.pending_calls:
675 self.pending_calls = []