from twisted.python import log
from twisted.trial import unittest
from twisted.web2 import stream
-from twisted.web2.http import splitHostPort
+from twisted.web2.http import Response, splitHostPort
from HTTPDownloader import Peer
from util import uncompact
deferred.callback(b)
def allAvailable(self):
- """Indicate that no more data is coming available."""
+ """Indicate that no more data will be coming available."""
self.finished = True
# If a read is pending, let it go
return b
class StreamToFile(defer.Deferred):
- """Saves a stream to a file.
+ """Save a stream to a partial file and hash it.
@type stream: L{twisted.web2.stream.IByteStream}
@ivar stream: the input stream being read
@type outFile: L{twisted.python.filepath.FilePath}
@ivar outFile: the file being written
- @type hash: L{Hash.HashObject}
- @ivar hash: the hash object for the file
+ @type hash: C{sha1}
+ @ivar hash: the hash object for the data
+ @type position: C{int}
+ @ivar position: the current file position to write the next data to
@type length: C{int}
- @ivar length: the length of the original (compressed) file
+ @ivar length: the position in the file to not write beyond
@type doneDefer: L{twisted.internet.defer.Deferred}
- @ivar doneDefer: the deferred that will fire when done streaming
+ @ivar doneDefer: the deferred that will fire when done writing
"""
- def __init__(self, inputStream, outFile, hash, start, length):
+ def __init__(self, inputStream, outFile, start = 0, length = None):
"""Initializes the file.
@type inputStream: L{twisted.web2.stream.IByteStream}
@param inputStream: the input stream to read from
@type outFile: L{twisted.python.filepath.FilePath}
@param outFile: the file to write to
- @type hash: L{Hash.HashObject}
- @param hash: the hash object to use for the file
+ @type start: C{int}
+ @param start: the file position to start writing at
+ (optional, defaults to the start of the file)
+ @type length: C{int}
+ @param length: the maximum amount of data to write to the file
+ (optional, defaults to not limiting the writing to the file
"""
self.stream = inputStream
self.outFile = outFile
- self.hash = hash
- self.hash.new()
- self.length = self.stream.length
+ self.hash = sha.new()
+ self.position = start
+ self.length = None
+ if length is not None:
+ self.length = start + length
self.doneDefer = None
def run(self):
- """Start the streaming."""
- self.doneDefer = stream.readStream(self.stream, _gotData)
+ """Start the streaming.
+
+ @rtype: L{twisted.internet.defer.Deferred}
+ """
+ self.doneDefer = stream.readStream(self.stream, self._gotData)
self.doneDefer.addCallbacks(self._done, self._error)
return self.doneDefer
- def _done(self):
- """Close all the output files, return the result."""
- if not self.outFile.closed:
- self.outFile.close()
- self.hash.digest()
- self.doneDefer.callback(self.hash)
-
def _gotData(self, data):
+ """Process the received data."""
if self.outFile.closed:
- return
+ raise Exception, "outFile was unexpectedly closed"
if data is None:
- self._done()
+ raise Exception, "Data is None?"
+
+ # Make sure we don't go too far
+ if self.length is not None and self.position + len(data) > self.length:
+ data = data[:(self.length - self.position)]
# Write and hash the streamed data
+ self.outFile.seek(self.position)
self.outFile.write(data)
self.hash.update(data)
+ self.position += len(data)
+ def _done(self, result):
+ """Return the result."""
+ return self.hash.digest()
+
+ def _error(self, err):
+ """Log the error."""
+ log.err(err)
+ return err
+
class FileDownload:
"""Manage a download from a list of peers or a mirror.
-
+ @type manager: L{PeerManager}
+ @ivar manager: the manager to send requests for peers to
+ @type hash: L{Hash.HashObject}
+ @ivar hash: the hash object containing the expected hash for the file
+ @ivar mirror: the URI of the file on the mirror
+ @type compact_peers: C{list} of C{dictionary}
+ @ivar compact_peers: a list of the peer info where the file can be found
+ @type file: C{file}
+ @ivar file: the open file to right the download to
+ @type path: C{string}
+ @ivar path: the path to request from peers to access the file
+ @type pieces: C{list} of C{string}
+ @ivar pieces: the hashes of the pieces in the file
+ @type started: C{boolean}
+ @ivar started: whether the download has begun yet
+ @type defer: L{twisted.internet.defer.Deferred}
+ @ivar defer: the deferred that will callback with the result of the download
+ @type peers: C{dictionary}
+ @ivar peers: information about each of the peers available to download from
+ @type outstanding: C{int}
+ @ivar outstanding: the number of requests to peers currently outstanding
+ @type peerlist: C{list} of L{HTTPDownloader.Peer}
+ @ivar peerlist: the sorted list of peers for this download
+ @type stream: L{GrowingFileStream}
+ @ivar stream: the stream of resulting data from the download
+ @type nextFinish: C{int}
+ @ivar nextFinish: the next piece that is needed to finish for the stream
+ @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
+ @ivar completePieces: one per piece, will be False if no requests are
+ outstanding for the piece, True if the piece has been successfully
+ downloaded, or the Peer that a request for this piece has been sent
"""
def __init__(self, manager, hash, mirror, compact_peers, file):
"""Initialize the instance and check for piece hashes.
+ @type manager: L{PeerManager}
+ @param manager: the manager to send requests for peers to
@type hash: L{Hash.HashObject}
@param hash: the hash object containing the expected hash for the file
@param mirror: the URI of the file on the mirror
- @type compact_peers: C{list} of C{string}
+ @type compact_peers: C{list} of C{dictionary}
@param compact_peers: a list of the peer info where the file can be found
@type file: L{twisted.python.filepath.FilePath}
@param file: the temporary file to use to store the downloaded file
# Find the most popular piece string
if num == max_found:
self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+ log.msg('Peer info contained %d piece hashes' % len(self.pieces))
self.startDownload()
break
elif max_found == max(pieces_hash.values()):
self.defer.callback(self.peerlist[0].get(self.path))
return
- self.sort()
+ # Start sending the return file
+ self.stream = GrowingFileStream(self.file)
+ resp = Response(200, {}, self.stream)
+ self.defer.callback(resp)
+
+ # Begin to download the pieces
self.outstanding = 0
- self.next_piece = 0
+ self.nextFinish = 0
+ if self.pieces:
+ self.completePieces = [False for piece in self.pieces]
+ else:
+ self.completePieces = [False]
+ self.getPieces()
- while self.outstanding < 3 and self.peerlist and self.next_piece < len(self.pieces):
- peer = self.peerlist.pop()
- piece = self.next_piece
- self.next_piece += 1
-
- self.outstanding += 1
- df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
- df.addCallbacks(self._gotPiece, self._gotError,
- callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+ #{ Downloading the pieces
+ def getPieces(self):
+ """Download the next pieces from the peers."""
+ self.sort()
+ piece = self.nextFinish
+ while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
+ if self.completePieces[piece] == False:
+ # Send a request to the highest ranked peer
+ peer = self.peerlist.pop()
+ self.completePieces[piece] = peer
+
+ self.outstanding += 1
+ if self.pieces:
+ df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+ else:
+ df = peer.get(self.path)
+ reactor.callLater(0, df.addCallbacks,
+ *(self._getPiece, self._getError),
+ **{'callbackArgs': (piece, peer),
+ 'errbackArgs': (piece, peer)})
+ piece += 1
+
+ # Check if we're don
+ if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
+ self.stream.allAvailable()
- def _gotPiece(self, response, piece, peer):
- """Process the retrieved piece from the peer."""
- if response.code != 206:
+ def _getPiece(self, response, piece, peer):
+ """Process the retrieved headers from the peer."""
+ if ((len(self.completePieces) > 1 and response.code != 206) or
+ (response.code not in (200, 206))):
# Request failed, try a different peer
- self.getPeerPieces(key, site)
+ peer.hashError()
+ self.completePieces[piece] = False
+ if response.stream and response.stream.length:
+ stream.readAndDiscard(response.stream)
else:
# Read the response stream to the file
- df = StreamToFile(response.stream, self.file, self.hash, piece*PIECE_SIZE, PIECE_SIZE).run()
- df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
- callbackArgs=(key, site), errbackArgs=(key, site))
+ if response.code == 206:
+ df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
+ else:
+ df = StreamToFile(response.stream, self.file).run()
+ df.addCallbacks(self._gotPiece, self._gotError,
+ callbackArgs=(piece, peer), errbackArgs=(piece, peer))
- def _gotError(self, err, piece, peer):
+ self.outstanding -= 1
+ self.peerlist.append(peer)
+ self.getPieces()
+
+ def _getError(self, err, piece, peer):
"""Peer failed, try again."""
+ self.outstanding -= 1
+ self.peerlist.append(peer)
+ self.completePieces[piece] = False
+ self.getPieces()
log.err(err)
+ def _gotPiece(self, response, piece, peer):
+ """Process the retrieved piece from the peer."""
+ if ((self.pieces and response != self.pieces[piece]) or
+ (len(self.pieces) == 0 and response == self.hash.expected())):
+ # Hash doesn't match
+ peer.hashError()
+ self.completePieces[piece] = False
+ elif self.pieces:
+ # Successfully completed one of several pieces
+ self.completePieces[piece] = True
+ while (self.nextFinish < len(self.completePieces) and
+ self.completePieces[self.nextFinish] == True):
+ self.nextFinish += 1
+ self.stream.updateAvailable(PIECE_SIZE)
+ else:
+ # Whole download (only one piece) is complete
+ self.completePieces[piece] = True
+ self.stream.updateAvailable(2**30)
+
+ self.getPieces()
+
+ def _gotError(self, err, piece, peer):
+ """Piece download failed, try again."""
+ log.err(err)
+ self.completePieces[piece] = False
+ self.getPieces()
class PeerManager:
"""Manage a set of peers and the requests to them.
+ @type cache_dir: L{twisted.python.filepath.FilePath}
+ @ivar cache_dir: the directory to use for storing all files
+ @type dht: L{interfaces.IDHT}
+ @ivar dht: the DHT instance
@type clients: C{dictionary}
@ivar clients: the available peers that have been previously contacted
"""