2 """Manage a set of peers and the requests to them."""
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from Hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20 from apt_p2p_conf import config
22 class GrowingFileStream(stream.FileStream):
23 """Modified to stream data from a file as it becomes available.
25 @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
26 @ivar deferred: waiting for the result of the last read attempt
27 @ivar available: the number of bytes that are currently available to read
28 @ivar position: the current position in the file where the next read will begin
29 @ivar finished: True when no more data will be coming available
34 def __init__(self, f, length = None):
35 stream.FileStream.__init__(self, f)
42 def updateAvailable(self, newlyAvailable):
43 """Update the number of bytes that are available.
45 Call it with 0 to trigger reading of a fully read file.
47 @param newlyAvailable: the number of bytes that just became available
49 assert not self.finished
50 self.available += newlyAvailable
52 # If a read is pending, let it go
53 if self.deferred and self.position < self.available:
54 # Try to read some data from the file
55 length = self.available - self.position
56 readSize = min(length, self.CHUNK_SIZE)
57 self.f.seek(self.position)
58 b = self.f.read(readSize)
61 # Check if end of file was reached
63 self.position += bytesRead
64 deferred = self.deferred
68 def allAvailable(self):
69 """Indicate that no more data will be coming available."""
72 # If a read is pending, let it go
74 if self.position < self.available:
75 # Try to read some data from the file
76 length = self.available - self.position
77 readSize = min(length, self.CHUNK_SIZE)
78 self.f.seek(self.position)
79 b = self.f.read(readSize)
82 # Check if end of file was reached
84 self.position += bytesRead
85 deferred = self.deferred
90 deferred = self.deferred
92 deferred.callback(None)
95 deferred = self.deferred
97 deferred.callback(None)
99 def read(self, sendfile=False):
100 assert not self.deferred, "A previous read is still deferred."
105 length = self.available - self.position
106 readSize = min(length, self.CHUNK_SIZE)
108 # If we don't have any available, we're done or deferred
113 self.deferred = defer.Deferred()
116 # Try to read some data from the file
117 self.f.seek(self.position)
118 b = self.f.read(readSize)
121 # End of file was reached, we're done or deferred
125 self.deferred = defer.Deferred()
128 self.position += bytesRead
132 """Save a stream to a partial file and hash it.
134 @type stream: L{twisted.web2.stream.IByteStream}
135 @ivar stream: the input stream being read
136 @type outFile: L{twisted.python.filepath.FilePath}
137 @ivar outFile: the file being written
139 @ivar hash: the hash object for the data
140 @type position: C{int}
141 @ivar position: the current file position to write the next data to
143 @ivar length: the position in the file to not write beyond
144 @type doneDefer: L{twisted.internet.defer.Deferred}
145 @ivar doneDefer: the deferred that will fire when done writing
148 def __init__(self, inputStream, outFile, start = 0, length = None):
149 """Initializes the file.
151 @type inputStream: L{twisted.web2.stream.IByteStream}
152 @param inputStream: the input stream to read from
153 @type outFile: L{twisted.python.filepath.FilePath}
154 @param outFile: the file to write to
156 @param start: the file position to start writing at
157 (optional, defaults to the start of the file)
159 @param length: the maximum amount of data to write to the file
160 (optional, defaults to not limiting the writing to the file
162 self.stream = inputStream
163 self.outFile = outFile
164 self.hash = sha.new()
165 self.position = start
167 if length is not None:
168 self.length = start + length
169 self.doneDefer = None
172 """Start the streaming.
174 @rtype: L{twisted.internet.defer.Deferred}
176 log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
177 self.doneDefer = stream.readStream(self.stream, self._gotData)
178 self.doneDefer.addCallbacks(self._done, self._error)
179 return self.doneDefer
181 def _gotData(self, data):
182 """Process the received data."""
183 if self.outFile.closed:
184 raise Exception, "outFile was unexpectedly closed"
187 raise Exception, "Data is None?"
189 # Make sure we don't go too far
190 if self.length is not None and self.position + len(data) > self.length:
191 data = data[:(self.length - self.position)]
193 # Write and hash the streamed data
194 self.outFile.seek(self.position)
195 self.outFile.write(data)
196 self.hash.update(data)
197 self.position += len(data)
199 def _done(self, result):
200 """Return the result."""
201 log.msg('Streaming is complete')
202 return self.hash.digest()
204 def _error(self, err):
206 log.msg('Streaming error')
211 """Manage a download from a list of peers or a mirror.
213 @type manager: L{PeerManager}
214 @ivar manager: the manager to send requests for peers to
215 @type hash: L{Hash.HashObject}
216 @ivar hash: the hash object containing the expected hash for the file
217 @ivar mirror: the URI of the file on the mirror
218 @type compact_peers: C{list} of C{dictionary}
219 @ivar compact_peers: a list of the peer info where the file can be found
221 @ivar file: the open file to right the download to
222 @type path: C{string}
223 @ivar path: the path to request from peers to access the file
224 @type pieces: C{list} of C{string}
225 @ivar pieces: the hashes of the pieces in the file
226 @type started: C{boolean}
227 @ivar started: whether the download has begun yet
228 @type defer: L{twisted.internet.defer.Deferred}
229 @ivar defer: the deferred that will callback with the result of the download
230 @type peers: C{dictionary}
231 @ivar peers: information about each of the peers available to download from
232 @type outstanding: C{int}
233 @ivar outstanding: the number of requests to peers currently outstanding
234 @type peerlist: C{list} of L{HTTPDownloader.Peer}
235 @ivar peerlist: the sorted list of peers for this download
236 @type stream: L{GrowingFileStream}
237 @ivar stream: the stream of resulting data from the download
238 @type nextFinish: C{int}
239 @ivar nextFinish: the next piece that is needed to finish for the stream
240 @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
241 @ivar completePieces: one per piece, will be False if no requests are
242 outstanding for the piece, True if the piece has been successfully
243 downloaded, or the Peer that a request for this piece has been sent
246 def __init__(self, manager, hash, mirror, compact_peers, file):
247 """Initialize the instance and check for piece hashes.
249 @type manager: L{PeerManager}
250 @param manager: the manager to send requests for peers to
251 @type hash: L{Hash.HashObject}
252 @param hash: the hash object containing the expected hash for the file
253 @param mirror: the URI of the file on the mirror
254 @type compact_peers: C{list} of C{dictionary}
255 @param compact_peers: a list of the peer info where the file can be found
256 @type file: L{twisted.python.filepath.FilePath}
257 @param file: the temporary file to use to store the downloaded file
259 self.manager = manager
262 self.compact_peers = compact_peers
264 self.path = '/~/' + quote_plus(hash.expected())
265 self.mirror_path = None
272 self.file = file.open('w+')
275 """Start the downloading process."""
276 log.msg('Checking for pieces for %s' % self.path)
277 self.defer = defer.Deferred()
280 pieces_string = {0: 0}
282 pieces_dl_hash = {0: 0}
284 for compact_peer in self.compact_peers:
285 # Build a list of all the peers for this download
286 site = uncompact(compact_peer['c'])
287 peer = self.manager.getPeer(site)
288 self.peers.setdefault(site, {})['peer'] = peer
290 # Extract any piece information from the peers list
291 if 't' in compact_peer:
292 self.peers[site]['t'] = compact_peer['t']['t']
293 pieces_string.setdefault(compact_peer['t']['t'], 0)
294 pieces_string[compact_peer['t']['t']] += 1
295 elif 'h' in compact_peer:
296 self.peers[site]['h'] = compact_peer['h']
297 pieces_hash.setdefault(compact_peer['h'], 0)
298 pieces_hash[compact_peer['h']] += 1
299 elif 'l' in compact_peer:
300 self.peers[site]['l'] = compact_peer['l']
301 pieces_dl_hash.setdefault(compact_peer['l'], 0)
302 pieces_dl_hash[compact_peer['l']] += 1
306 # Select the most popular piece info
307 max_found = max(no_pieces, max(pieces_string.values()),
308 max(pieces_hash.values()), max(pieces_dl_hash.values()))
310 if max_found < len(self.peers):
311 log.msg('Misleading piece information found, using most popular %d of %d peers' %
312 (max_found, len(self.peers)))
314 if max_found == no_pieces:
315 # The file is not split into pieces
316 log.msg('No pieces were found for the file')
317 self.pieces = [self.hash.expected()]
319 elif max_found == max(pieces_string.values()):
320 # Small number of pieces in a string
321 for pieces, num in pieces_string.items():
322 # Find the most popular piece string
324 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
325 log.msg('Peer info contained %d piece hashes' % len(self.pieces))
328 elif max_found == max(pieces_hash.values()):
329 # Medium number of pieces stored in the DHT
330 for pieces, num in pieces_hash.items():
331 # Find the most popular piece hash to lookup
333 log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
334 self.getDHTPieces(pieces)
336 elif max_found == max(pieces_dl_hash.values()):
337 # Large number of pieces stored in peers
338 for pieces, num in pieces_dl_hash.items():
339 # Find the most popular piece hash to download
341 log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
342 self.getPeerPieces(pieces)
346 #{ Downloading the piece hashes
347 def getDHTPieces(self, key):
348 """Retrieve the piece information from the DHT.
350 @param key: the key to lookup in the DHT
352 # Remove any peers with the wrong piece hash
353 #for site in self.peers.keys():
354 # if self.peers[site].get('h', '') != key:
355 # del self.peers[site]
357 # Start the DHT lookup
358 lookupDefer = self.manager.dht.getValue(key)
359 lookupDefer.addCallback(self._getDHTPieces, key)
361 def _getDHTPieces(self, results, key):
362 """Check the retrieved values."""
363 for result in results:
364 # Make sure the hash matches the key
365 result_hash = sha.new(result.get('t', '')).digest()
366 if result_hash == key:
368 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
369 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
373 # Continue without the piece hashes
374 log.msg('Could not retrieve the piece hashes from the DHT')
375 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
378 def getPeerPieces(self, key, failedSite = None):
379 """Retrieve the piece information from the peers.
381 @param key: the key to request from the peers
383 if failedSite is None:
384 log.msg('Starting the lookup of piece hashes in peers')
386 # Remove any peers with the wrong piece hash
387 #for site in self.peers.keys():
388 # if self.peers[site].get('l', '') != key:
389 # del self.peers[site]
391 log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
392 self.peers[failedSite]['failed'] = True
393 self.outstanding -= 1
395 if self.pieces is None:
396 # Send a request to one or more peers
397 log.msg('Checking for a peer to request piece hashes from')
398 for site in self.peers:
399 if self.peers[site].get('failed', False) != True:
400 log.msg('Sending a piece hash request to %r' % (site, ))
401 path = '/~/' + quote_plus(key)
402 lookupDefer = self.peers[site]['peer'].get(path)
403 reactor.callLater(0, lookupDefer.addCallbacks,
404 *(self._getPeerPieces, self._gotPeerError),
405 **{'callbackArgs': (key, site),
406 'errbackArgs': (key, site)})
407 self.outstanding += 1
408 if self.outstanding >= 4:
411 log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
412 if self.pieces is None and self.outstanding <= 0:
413 # Continue without the piece hashes
414 log.msg('Could not retrieve the piece hashes from the peers')
415 self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
418 def _getPeerPieces(self, response, key, site):
419 """Process the retrieved response from the peer."""
420 log.msg('Got a piece hash response %d from %r' % (response.code, site))
421 if response.code != 200:
422 # Request failed, try a different peer
423 log.msg('Did not like response %d from %r' % (response.code, site))
424 self.getPeerPieces(key, site)
426 # Read the response stream to a string
427 self.peers[site]['pieces'] = ''
428 def _gotPeerPiece(data, self = self, site = site):
429 log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
430 self.peers[site]['pieces'] += data
431 log.msg('Streaming piece hashes from peer')
432 df = stream.readStream(response.stream, _gotPeerPiece)
433 df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
434 callbackArgs=(key, site), errbackArgs=(key, site))
436 def _gotPeerError(self, err, key, site):
437 """Peer failed, try again."""
438 log.msg('Peer piece hash request failed for %r' % (site, ))
440 self.getPeerPieces(key, site)
442 def _gotPeerPieces(self, result, key, site):
443 """Check the retrieved pieces from the peer."""
444 log.msg('Finished streaming piece hashes from peer %r' % (site, ))
445 if self.pieces is not None:
447 log.msg('Already done')
451 result = bdecode(self.peers[site]['pieces'])
453 log.msg('Error bdecoding piece hashes')
455 self.getPeerPieces(key, site)
458 result_hash = sha.new(result.get('t', '')).digest()
459 if result_hash == key:
461 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
462 log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
465 log.msg('Peer returned a piece string that did not match')
466 self.getPeerPieces(key, site)
468 #{ Downloading the file
470 """Sort the peers by their rank (highest ranked at the end)."""
472 """Sort peers by their rank."""
475 elif a.rank < b.rank:
478 self.peerlist.sort(sort)
480 def startDownload(self):
481 """Start the download from the peers."""
486 log.msg('Starting to download %s' % self.path)
488 assert self.pieces, "You must initialize the piece hashes first"
489 self.peerlist = [self.peers[site]['peer'] for site in self.peers]
491 # Use the mirror if there are few peers
492 if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
493 parsed = urlparse(self.mirror)
494 if parsed[0] == "http":
495 site = splitHostPort(parsed[0], parsed[1])
496 self.mirror_path = urlunparse(('', '') + parsed[2:])
497 peer = self.manager.getPeer(site)
499 self.peerlist.append(peer)
501 # Special case if there's only one good peer left
502 # if len(self.peerlist) == 1:
503 # log.msg('Downloading from peer %r' % (self.peerlist[0], ))
504 # self.defer.callback(self.peerlist[0].get(self.path))
507 # Start sending the return file
508 self.stream = GrowingFileStream(self.file, self.hash.expSize)
509 resp = Response(200, {}, self.stream)
510 self.defer.callback(resp)
512 # Begin to download the pieces
515 self.completePieces = [False for piece in self.pieces]
518 #{ Downloading the pieces
520 """Download the next pieces from the peers."""
521 log.msg('Checking for more piece requests to send')
523 piece = self.nextFinish
524 while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
525 log.msg('Checking piece %d' % piece)
526 if self.completePieces[piece] == False:
527 # Send a request to the highest ranked peer
528 peer = self.peerlist.pop()
529 self.completePieces[piece] = peer
530 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
532 self.outstanding += 1
534 df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
536 df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
537 reactor.callLater(0, df.addCallbacks,
538 *(self._getPiece, self._getError),
539 **{'callbackArgs': (piece, peer),
540 'errbackArgs': (piece, peer)})
543 log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
544 # Check if we're done
545 if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
546 log.msg('We seem to be done with all pieces')
547 self.stream.allAvailable()
549 def _getPiece(self, response, piece, peer):
550 """Process the retrieved headers from the peer."""
551 log.msg('Got response for piece %d from peer %r' % (piece, peer))
552 if ((len(self.completePieces) > 1 and response.code != 206) or
553 (response.code not in (200, 206))):
554 # Request failed, try a different peer
555 log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
556 peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
557 self.completePieces[piece] = False
558 if response.stream and response.stream.length:
559 stream.readAndDiscard(response.stream)
561 # Read the response stream to the file
562 log.msg('Streaming piece %d from peer %r' % (piece, peer))
563 if response.code == 206:
564 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
566 df = StreamToFile(response.stream, self.file).run()
567 df.addCallbacks(self._gotPiece, self._gotError,
568 callbackArgs=(piece, peer), errbackArgs=(piece, peer))
570 self.outstanding -= 1
571 self.peerlist.append(peer)
574 def _getError(self, err, piece, peer):
575 """Peer failed, try again."""
576 log.msg('Got error for piece %d from peer %r' % (piece, peer))
577 self.outstanding -= 1
578 self.peerlist.append(peer)
579 self.completePieces[piece] = False
583 def _gotPiece(self, response, piece, peer):
584 """Process the retrieved piece from the peer."""
585 log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
586 if self.pieces[piece] and response != self.pieces[piece]:
588 log.msg('Hash error for piece %d from peer %r' % (piece, peer))
589 peer.hashError('Piece received from peer does not match expected')
590 self.completePieces[piece] = False
592 # Successfully completed one of several pieces
593 log.msg('Finished with piece %d from peer %r' % (piece, peer))
594 self.completePieces[piece] = True
595 while (self.nextFinish < len(self.completePieces) and
596 self.completePieces[self.nextFinish] == True):
598 self.stream.updateAvailable(PIECE_SIZE)
602 def _gotError(self, err, piece, peer):
603 """Piece download failed, try again."""
604 log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
606 self.completePieces[piece] = False
610 """Manage a set of peers and the requests to them.
612 @type cache_dir: L{twisted.python.filepath.FilePath}
613 @ivar cache_dir: the directory to use for storing all files
614 @type dht: L{interfaces.IDHT}
615 @ivar dht: the DHT instance
616 @type clients: C{dictionary}
617 @ivar clients: the available peers that have been previously contacted
620 def __init__(self, cache_dir, dht):
621 """Initialize the instance."""
622 self.cache_dir = cache_dir
623 self.cache_dir.restat(False)
624 if not self.cache_dir.exists():
625 self.cache_dir.makedirs()
629 def get(self, hash, mirror, peers = [], method="GET", modtime=None):
630 """Download from a list of peers or fallback to a mirror.
632 @type hash: L{Hash.HashObject}
633 @param hash: the hash object containing the expected hash for the file
634 @param mirror: the URI of the file on the mirror
635 @type peers: C{list} of C{string}
636 @param peers: a list of the peer info where the file can be found
637 (optional, defaults to downloading from the mirror)
638 @type method: C{string}
639 @param method: the HTTP method to use, 'GET' or 'HEAD'
640 (optional, defaults to 'GET')
641 @type modtime: C{int}
642 @param modtime: the modification time to use for an 'If-Modified-Since'
643 header, as seconds since the epoch
644 (optional, defaults to not sending that header)
646 if not peers or method != "GET" or modtime is not None:
647 log.msg('Downloading (%s) from mirror %s' % (method, mirror))
648 parsed = urlparse(mirror)
649 assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
650 site = splitHostPort(parsed[0], parsed[1])
651 path = urlunparse(('', '') + parsed[2:])
652 peer = self.getPeer(site)
654 return peer.get(path, method, modtime)
655 # elif len(peers) == 1:
656 # site = uncompact(peers[0]['c'])
657 # log.msg('Downloading from peer %r' % (site, ))
658 # path = '/~/' + quote_plus(hash.expected())
659 # peer = self.getPeer(site)
660 # return peer.get(path)
662 tmpfile = self.cache_dir.child(hash.hexexpected())
663 return FileDownload(self, hash, mirror, peers, tmpfile).run()
665 def getPeer(self, site):
666 """Create a new peer if necessary and return it.
668 @type site: (C{string}, C{int})
669 @param site: the IP address and port of the peer
671 if site not in self.clients:
672 self.clients[site] = Peer(site[0], site[1])
673 return self.clients[site]
676 """Close all the connections to peers."""
677 for site in self.clients:
678 self.clients[site].close()
681 class TestPeerManager(unittest.TestCase):
682 """Unit tests for the PeerManager."""
688 for p in self.pending_calls:
691 self.pending_calls = []