2 """Manage a set of peers and the requests to them."""
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20 from apt_p2p_conf import config
22 class GrowingFileStream(stream.FileStream):
23 """Modified to stream data from a file as it becomes available.
25 @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
26 @ivar deferred: waiting for the result of the last read attempt
27 @ivar available: the number of bytes that are currently available to read
28 @ivar position: the current position in the file where the next read will begin
29 @ivar finished: True when no more data will be coming available
34 def __init__(self, f, length = None):
35 stream.FileStream.__init__(self, f)
42 def updateAvailable(self, newlyAvailable):
43 """Update the number of bytes that are available.
45 Call it with 0 to trigger reading of a fully read file.
47 @param newlyAvailable: the number of bytes that just became available
49 assert not self.finished
50 self.available += newlyAvailable
52 # If a read is pending, let it go
53 if self.deferred and self.position < self.available:
54 # Try to read some data from the file
55 length = self.available - self.position
56 readSize = min(length, self.CHUNK_SIZE)
57 self.f.seek(self.position)
58 b = self.f.read(readSize)
61 # Check if end of file was reached
63 self.position += bytesRead
64 deferred = self.deferred
68 def allAvailable(self):
69 """Indicate that no more data will be coming available."""
72 # If a read is pending, let it go
74 if self.position < self.available:
75 # Try to read some data from the file
76 length = self.available - self.position
77 readSize = min(length, self.CHUNK_SIZE)
78 self.f.seek(self.position)
79 b = self.f.read(readSize)
82 # Check if end of file was reached
84 self.position += bytesRead
85 deferred = self.deferred
90 deferred = self.deferred
92 deferred.callback(None)
95 deferred = self.deferred
97 deferred.callback(None)
99 def read(self, sendfile=False):
100 assert not self.deferred, "A previous read is still deferred."
105 length = self.available - self.position
106 readSize = min(length, self.CHUNK_SIZE)
108 # If we don't have any available, we're done or deferred
113 self.deferred = defer.Deferred()
116 # Try to read some data from the file
117 self.f.seek(self.position)
118 b = self.f.read(readSize)
121 # End of file was reached, we're done or deferred
125 self.deferred = defer.Deferred()
128 self.position += bytesRead
132 """Save a stream to a partial file and hash it.
134 @type stream: L{twisted.web2.stream.IByteStream}
135 @ivar stream: the input stream being read
136 @type outFile: L{twisted.python.filepath.FilePath}
137 @ivar outFile: the file being written
139 @ivar hash: the hash object for the data
140 @type position: C{int}
141 @ivar position: the current file position to write the next data to
143 @ivar length: the position in the file to not write beyond
144 @type doneDefer: L{twisted.internet.defer.Deferred}
145 @ivar doneDefer: the deferred that will fire when done writing
148 def __init__(self, inputStream, outFile, start = 0, length = None):
149 """Initializes the file.
151 @type inputStream: L{twisted.web2.stream.IByteStream}
152 @param inputStream: the input stream to read from
153 @type outFile: L{twisted.python.filepath.FilePath}
154 @param outFile: the file to write to
156 @param start: the file position to start writing at
157 (optional, defaults to the start of the file)
159 @param length: the maximum amount of data to write to the file
160 (optional, defaults to not limiting the writing to the file
162 self.stream = inputStream
163 self.outFile = outFile
164 self.hash = sha.new()
165 self.position = start
167 if length is not None:
168 self.length = start + length
169 self.doneDefer = None
172 """Start the streaming.
174 @rtype: L{twisted.internet.defer.Deferred}
176 log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
177 self.doneDefer = stream.readStream(self.stream, self._gotData)
178 self.doneDefer.addCallbacks(self._done, self._error)
179 return self.doneDefer
181 def _gotData(self, data):
182 """Process the received data."""
183 if self.outFile.closed:
184 raise Exception, "outFile was unexpectedly closed"
187 raise Exception, "Data is None?"
189 # Make sure we don't go too far
190 if self.length is not None and self.position + len(data) > self.length:
191 data = data[:(self.length - self.position)]
193 # Write and hash the streamed data
194 self.outFile.seek(self.position)
195 self.outFile.write(data)
196 self.hash.update(data)
197 self.position += len(data)
199 def _done(self, result):
200 """Return the result."""
201 log.msg('Streaming is complete')
202 return self.hash.digest()
204 def _error(self, err):
206 log.msg('Streaming error')
211 """Manage a download from a list of peers or a mirror.
213 @type manager: L{PeerManager}
214 @ivar manager: the manager to send requests for peers to
215 @type hash: L{Hash.HashObject}
216 @ivar hash: the hash object containing the expected hash for the file
217 @ivar mirror: the URI of the file on the mirror
218 @type compact_peers: C{list} of C{dictionary}
219 @ivar compact_peers: a list of the peer info where the file can be found
221 @ivar file: the open file to right the download to
222 @type path: C{string}
223 @ivar path: the path to request from peers to access the file
224 @type pieces: C{list} of C{string}
225 @ivar pieces: the hashes of the pieces in the file
226 @type started: C{boolean}
227 @ivar started: whether the download has begun yet
228 @type defer: L{twisted.internet.defer.Deferred}
229 @ivar defer: the deferred that will callback with the result of the download
230 @type peers: C{dictionary}
231 @ivar peers: information about each of the peers available to download from
232 @type outstanding: C{int}
233 @ivar outstanding: the number of requests to peers currently outstanding
234 @type peerlist: C{list} of L{HTTPDownloader.Peer}
235 @ivar peerlist: the sorted list of peers for this download
236 @type stream: L{GrowingFileStream}
237 @ivar stream: the stream of resulting data from the download
238 @type nextFinish: C{int}
239 @ivar nextFinish: the next piece that is needed to finish for the stream
240 @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
241 @ivar completePieces: one per piece, will be False if no requests are
242 outstanding for the piece, True if the piece has been successfully
243 downloaded, or the Peer that a request for this piece has been sent
246 def __init__(self, manager, hash, mirror, compact_peers, file):
247 """Initialize the instance and check for piece hashes.
249 @type manager: L{PeerManager}
250 @param manager: the manager to send requests for peers to
251 @type hash: L{Hash.HashObject}
252 @param hash: the hash object containing the expected hash for the file
253 @param mirror: the URI of the file on the mirror
254 @type compact_peers: C{list} of C{dictionary}
255 @param compact_peers: a list of the peer info where the file can be found
256 @type file: L{twisted.python.filepath.FilePath}
257 @param file: the temporary file to use to store the downloaded file
259 self.manager = manager
262 self.compact_peers = compact_peers
264 self.path = '/~/' + quote_plus(hash.expected())
265 self.mirror_path = None
272 self.file = file.open('w+')
275 """Start the downloading process."""
276 log.msg('Checking for pieces for %s' % self.path)
277 self.defer = defer.Deferred()
280 pieces_string = {0: 0}
282 pieces_dl_hash = {0: 0}
284 for compact_peer in self.compact_peers:
285 # Build a list of all the peers for this download
286 site = uncompact(compact_peer['c'])
287 peer = self.manager.getPeer(site)
288 self.peers.setdefault(site, {})['peer'] = peer
290 # Extract any piece information from the peers list
291 if 't' in compact_peer:
292 self.peers[site]['t'] = compact_peer['t']['t']
293 pieces_string.setdefault(compact_peer['t']['t'], 0)
294 pieces_string[compact_peer['t']['t']] += 1
295 elif 'h' in compact_peer:
296 self.peers[site]['h'] = compact_peer['h']
297 pieces_hash.setdefault(compact_peer['h'], 0)
298 pieces_hash[compact_peer['h']] += 1
299 elif 'l' in compact_peer:
300 self.peers[site]['l'] = compact_peer['l']
301 pieces_dl_hash.setdefault(compact_peer['l'], 0)
302 pieces_dl_hash[compact_peer['l']] += 1
306 # Select the most popular piece info
307 max_found = max(no_pieces, max(pieces_string.values()),
308 max(pieces_hash.values()), max(pieces_dl_hash.values()))
310 if max_found < len(self.peers):
311 log.msg('Misleading piece information found, using most popular %d of %d peers' %
312 (max_found, len(self.peers)))
314 if max_found == no_pieces:
315 # The file is not split into pieces
316 log.msg('No pieces were found for the file')
317 self.pieces = [self.hash.expected()]
319 elif max_found == max(pieces_string.values()):
320 # Small number of pieces in a string
321 for pieces, num in pieces_string.items():
322 # Find the most popular piece string
324 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
325 log.msg('Peer info contained %d piece hashes' % len(self.pieces))
328 elif max_found == max(pieces_hash.values()):
329 # Medium number of pieces stored in the DHT
330 for pieces, num in pieces_hash.items():
331 # Find the most popular piece hash to lookup
333 log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
334 self.getDHTPieces(pieces)
336 elif max_found == max(pieces_dl_hash.values()):
337 # Large number of pieces stored in peers
338 for pieces, num in pieces_dl_hash.items():
339 # Find the most popular piece hash to download
341 log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
342 self.getPeerPieces(pieces)
346 #{ Downloading the piece hashes
347 def getDHTPieces(self, key):
348 """Retrieve the piece information from the DHT.
350 @param key: the key to lookup in the DHT
352 # Remove any peers with the wrong piece hash
353 #for site in self.peers.keys():
354 # if self.peers[site].get('h', '') != key:
355 # del self.peers[site]
357 # Start the DHT lookup
358 lookupDefer = self.manager.dht.getValue(key)
359 lookupDefer.addBoth(self._getDHTPieces, key)
361 def _getDHTPieces(self, results, key):
362 """Check the retrieved values."""
363 if isinstance(results, list):
364 for result in results:
365 # Make sure the hash matches the key
366 result_hash = sha.new(result.get('t', '')).digest()
367 if result_hash == key:
369 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
370 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
374 log.msg('Could not retrieve the piece hashes from the DHT')
376 log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
378 # Continue without the piece hashes
379 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
382 def getPeerPieces(self, key, failedSite = None):
383 """Retrieve the piece information from the peers.
385 @param key: the key to request from the peers
387 if failedSite is None:
388 log.msg('Starting the lookup of piece hashes in peers')
390 # Remove any peers with the wrong piece hash
391 #for site in self.peers.keys():
392 # if self.peers[site].get('l', '') != key:
393 # del self.peers[site]
395 log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
396 self.peers[failedSite]['failed'] = True
397 self.outstanding -= 1
399 if self.pieces is None:
400 # Send a request to one or more peers
401 log.msg('Checking for a peer to request piece hashes from')
402 for site in self.peers:
403 if self.peers[site].get('failed', False) != True:
404 log.msg('Sending a piece hash request to %r' % (site, ))
405 path = '/~/' + quote_plus(key)
406 lookupDefer = self.peers[site]['peer'].get(path)
407 reactor.callLater(0, lookupDefer.addCallbacks,
408 *(self._getPeerPieces, self._gotPeerError),
409 **{'callbackArgs': (key, site),
410 'errbackArgs': (key, site)})
411 self.outstanding += 1
412 if self.outstanding >= 4:
415 log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
416 if self.pieces is None and self.outstanding <= 0:
417 # Continue without the piece hashes
418 log.msg('Could not retrieve the piece hashes from the peers')
419 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
422 def _getPeerPieces(self, response, key, site):
423 """Process the retrieved response from the peer."""
424 log.msg('Got a piece hash response %d from %r' % (response.code, site))
425 if response.code != 200:
426 # Request failed, try a different peer
427 log.msg('Did not like response %d from %r' % (response.code, site))
428 self.getPeerPieces(key, site)
430 # Read the response stream to a string
431 self.peers[site]['pieces'] = ''
432 def _gotPeerPiece(data, self = self, site = site):
433 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
434 self.peers[site]['pieces'] += data
435 log.msg('Streaming piece hashes from peer')
436 df = stream.readStream(response.stream, _gotPeerPiece)
437 df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
438 callbackArgs=(key, site), errbackArgs=(key, site))
440 def _gotPeerError(self, err, key, site):
441 """Peer failed, try again."""
442 log.msg('Peer piece hash request failed for %r' % (site, ))
444 self.getPeerPieces(key, site)
446 def _gotPeerPieces(self, result, key, site):
447 """Check the retrieved pieces from the peer."""
448 log.msg('Finished streaming piece hashes from peer %r' % (site, ))
449 if self.pieces is not None:
451 log.msg('Already done')
455 result = bdecode(self.peers[site]['pieces'])
457 log.msg('Error bdecoding piece hashes')
459 self.getPeerPieces(key, site)
462 result_hash = sha.new(result.get('t', '')).digest()
463 if result_hash == key:
465 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
466 log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
469 log.msg('Peer returned a piece string that did not match')
470 self.getPeerPieces(key, site)
472 #{ Downloading the file
474 """Sort the peers by their rank (highest ranked at the end)."""
476 """Sort peers by their rank."""
479 elif a.rank < b.rank:
482 self.peerlist.sort(sort)
484 def startDownload(self):
485 """Start the download from the peers."""
490 log.msg('Starting to download %s' % self.path)
492 assert self.pieces, "You must initialize the piece hashes first"
493 self.peerlist = [self.peers[site]['peer'] for site in self.peers]
495 # Use the mirror if there are few peers
496 if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
497 parsed = urlparse(self.mirror)
498 if parsed[0] == "http":
499 site = splitHostPort(parsed[0], parsed[1])
500 self.mirror_path = urlunparse(('', '') + parsed[2:])
501 peer = self.manager.getPeer(site)
503 self.peerlist.append(peer)
505 # Special case if there's only one good peer left
506 # if len(self.peerlist) == 1:
507 # log.msg('Downloading from peer %r' % (self.peerlist[0], ))
508 # self.defer.callback(self.peerlist[0].get(self.path))
511 # Start sending the return file
512 self.stream = GrowingFileStream(self.file, self.hash.expSize)
513 resp = Response(200, {}, self.stream)
514 self.defer.callback(resp)
516 # Begin to download the pieces
519 self.completePieces = [False for piece in self.pieces]
522 #{ Downloading the pieces
524 """Download the next pieces from the peers."""
525 log.msg('Checking for more piece requests to send')
527 piece = self.nextFinish
528 while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
529 log.msg('Checking piece %d' % piece)
530 if self.completePieces[piece] == False:
531 # Send a request to the highest ranked peer
532 peer = self.peerlist.pop()
533 self.completePieces[piece] = peer
534 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
536 self.outstanding += 1
538 df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
540 df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
541 reactor.callLater(0, df.addCallbacks,
542 *(self._getPiece, self._getError),
543 **{'callbackArgs': (piece, peer),
544 'errbackArgs': (piece, peer)})
547 log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
548 # Check if we're done
549 if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
550 log.msg('We seem to be done with all pieces')
551 self.stream.allAvailable()
553 def _getPiece(self, response, piece, peer):
554 """Process the retrieved headers from the peer."""
555 log.msg('Got response for piece %d from peer %r' % (piece, peer))
556 if ((len(self.completePieces) > 1 and response.code != 206) or
557 (response.code not in (200, 206))):
558 # Request failed, try a different peer
559 log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
560 peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
561 self.completePieces[piece] = False
562 if response.stream and response.stream.length:
563 stream.readAndDiscard(response.stream)
565 # Read the response stream to the file
566 log.msg('Streaming piece %d from peer %r' % (piece, peer))
567 if response.code == 206:
568 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
570 df = StreamToFile(response.stream, self.file).run()
571 df.addCallbacks(self._gotPiece, self._gotError,
572 callbackArgs=(piece, peer), errbackArgs=(piece, peer))
574 self.outstanding -= 1
575 self.peerlist.append(peer)
578 def _getError(self, err, piece, peer):
579 """Peer failed, try again."""
580 log.msg('Got error for piece %d from peer %r' % (piece, peer))
581 self.outstanding -= 1
582 self.peerlist.append(peer)
583 self.completePieces[piece] = False
587 def _gotPiece(self, response, piece, peer):
588 """Process the retrieved piece from the peer."""
589 log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
590 if self.pieces[piece] and response != self.pieces[piece]:
592 log.msg('Hash error for piece %d from peer %r' % (piece, peer))
593 peer.hashError('Piece received from peer does not match expected')
594 self.completePieces[piece] = False
596 # Successfully completed one of several pieces
597 log.msg('Finished with piece %d from peer %r' % (piece, peer))
598 self.completePieces[piece] = True
599 while (self.nextFinish < len(self.completePieces) and
600 self.completePieces[self.nextFinish] == True):
602 self.stream.updateAvailable(PIECE_SIZE)
606 def _gotError(self, err, piece, peer):
607 """Piece download failed, try again."""
608 log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
610 self.completePieces[piece] = False
614 """Manage a set of peers and the requests to them.
616 @type cache_dir: L{twisted.python.filepath.FilePath}
617 @ivar cache_dir: the directory to use for storing all files
618 @type dht: L{interfaces.IDHT}
619 @ivar dht: the DHT instance
620 @type clients: C{dictionary}
621 @ivar clients: the available peers that have been previously contacted
624 def __init__(self, cache_dir, dht):
625 """Initialize the instance."""
626 self.cache_dir = cache_dir
627 self.cache_dir.restat(False)
628 if not self.cache_dir.exists():
629 self.cache_dir.makedirs()
633 def get(self, hash, mirror, peers = [], method="GET", modtime=None):
634 """Download from a list of peers or fallback to a mirror.
636 @type hash: L{Hash.HashObject}
637 @param hash: the hash object containing the expected hash for the file
638 @param mirror: the URI of the file on the mirror
639 @type peers: C{list} of C{string}
640 @param peers: a list of the peer info where the file can be found
641 (optional, defaults to downloading from the mirror)
642 @type method: C{string}
643 @param method: the HTTP method to use, 'GET' or 'HEAD'
644 (optional, defaults to 'GET')
645 @type modtime: C{int}
646 @param modtime: the modification time to use for an 'If-Modified-Since'
647 header, as seconds since the epoch
648 (optional, defaults to not sending that header)
650 if not peers or method != "GET" or modtime is not None:
651 log.msg('Downloading (%s) from mirror %s' % (method, mirror))
652 parsed = urlparse(mirror)
653 assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
654 site = splitHostPort(parsed[0], parsed[1])
655 path = urlunparse(('', '') + parsed[2:])
656 peer = self.getPeer(site)
658 return peer.get(path, method, modtime)
659 # elif len(peers) == 1:
660 # site = uncompact(peers[0]['c'])
661 # log.msg('Downloading from peer %r' % (site, ))
662 # path = '/~/' + quote_plus(hash.expected())
663 # peer = self.getPeer(site)
664 # return peer.get(path)
666 tmpfile = self.cache_dir.child(hash.hexexpected())
667 return FileDownload(self, hash, mirror, peers, tmpfile).run()
669 def getPeer(self, site):
670 """Create a new peer if necessary and return it.
672 @type site: (C{string}, C{int})
673 @param site: the IP address and port of the peer
675 if site not in self.clients:
676 self.clients[site] = Peer(site[0], site[1])
677 return self.clients[site]
680 """Close all the connections to peers."""
681 for site in self.clients:
682 self.clients[site].close()
685 class TestPeerManager(unittest.TestCase):
686 """Unit tests for the PeerManager."""
692 for p in self.pending_calls:
695 self.pending_calls = []