import sha
from twisted.internet import reactor, defer
-from twisted.python import log
+from twisted.python import log, filepath
from twisted.trial import unittest
from twisted.web2 import stream
from twisted.web2.http import Response, splitHostPort
from apt_p2p_Khashmir.bencode import bdecode
from apt_p2p_conf import config
+
+class PeerError(Exception):
+ """An error occurred downloading from peers."""
+
class GrowingFileStream(stream.FileStream):
"""Modified to stream data from a file as it becomes available.
deferred.callback(b)
else:
# We're done
+ self._close()
deferred = self.deferred
self.deferred = None
deferred.callback(None)
else:
# We're done
+ self._close()
deferred = self.deferred
self.deferred = None
deferred.callback(None)
# If we don't have any available, we're done or deferred
if readSize <= 0:
if self.finished:
+ self._close()
return None
else:
self.deferred = defer.Deferred()
if not bytesRead:
# End of file was reached, we're done or deferred
if self.finished:
+ self._close()
return None
else:
self.deferred = defer.Deferred()
self.position += bytesRead
return b
+ def _close(self):
+ """Close the temporary file and remove it."""
+ self.f.close()
+ filepath.FilePath(self.f.name).remove()
+ self.f = None
+
class StreamToFile:
"""Save a stream to a partial file and hash it.
@ivar stream: the input stream being read
@type outFile: L{twisted.python.filepath.FilePath}
@ivar outFile: the file being written
- @type hash: C{sha1}
- @ivar hash: the hash object for the data
+ @type hasher: hashing object, e.g. C{sha1}
+ @ivar hasher: the hash object for the data
@type position: C{int}
@ivar position: the current file position to write the next data to
@type length: C{int}
@ivar doneDefer: the deferred that will fire when done writing
"""
- def __init__(self, inputStream, outFile, start = 0, length = None):
+ def __init__(self, hasher, inputStream, outFile, start = 0, length = None):
"""Initializes the file.
+ @type hasher: hashing object, e.g. C{sha1}
+ @param hasher: the hash object for the data
@type inputStream: L{twisted.web2.stream.IByteStream}
@param inputStream: the input stream to read from
@type outFile: L{twisted.python.filepath.FilePath}
"""
self.stream = inputStream
self.outFile = outFile
- self.hash = sha.new()
+ self.hasher = hasher
self.position = start
self.length = None
if length is not None:
@rtype: L{twisted.internet.defer.Deferred}
"""
- log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
self.doneDefer = stream.readStream(self.stream, self._gotData)
self.doneDefer.addCallbacks(self._done, self._error)
return self.doneDefer
def _gotData(self, data):
"""Process the received data."""
if self.outFile.closed:
- raise Exception, "outFile was unexpectedly closed"
+ raise PeerError, "outFile was unexpectedly closed"
if data is None:
- raise Exception, "Data is None?"
+ raise PeerError, "Data is None?"
# Make sure we don't go too far
if self.length is not None and self.position + len(data) > self.length:
# Write and hash the streamed data
self.outFile.seek(self.position)
self.outFile.write(data)
- self.hash.update(data)
+ self.hasher.update(data)
self.position += len(data)
def _done(self, result):
"""Return the result."""
- log.msg('Streaming is complete')
- return self.hash.digest()
+ return self.hasher.digest()
def _error(self, err):
"""Log the error."""
self.compact_peers = compact_peers
self.path = '/~/' + quote_plus(hash.expected())
+ self.defer = None
self.mirror_path = None
self.pieces = None
self.started = False
if self.pieces is None:
# Send a request to one or more peers
- log.msg('Checking for a peer to request piece hashes from')
for site in self.peers:
if self.peers[site].get('failed', False) != True:
log.msg('Sending a piece hash request to %r' % (site, ))
if self.outstanding >= 4:
break
- log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
if self.pieces is None and self.outstanding <= 0:
# Continue without the piece hashes
log.msg('Could not retrieve the piece hashes from the peers')
log.msg('Got a piece hash response %d from %r' % (response.code, site))
if response.code != 200:
# Request failed, try a different peer
- log.msg('Did not like response %d from %r' % (response.code, site))
self.getPeerPieces(key, site)
else:
# Read the response stream to a string
# self.defer.callback(self.peerlist[0].get(self.path))
# return
- # Start sending the return file
- self.stream = GrowingFileStream(self.file, self.hash.expSize)
- resp = Response(200, {}, self.stream)
- self.defer.callback(resp)
-
# Begin to download the pieces
self.outstanding = 0
self.nextFinish = 0
#{ Downloading the pieces
def getPieces(self):
"""Download the next pieces from the peers."""
- log.msg('Checking for more piece requests to send')
self.sort()
piece = self.nextFinish
while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
- log.msg('Checking piece %d' % piece)
if self.completePieces[piece] == False:
# Send a request to the highest ranked peer
peer = self.peerlist.pop()
log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
self.outstanding += 1
+ path = self.path
if peer.mirror:
- df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+ path = self.mirror_path
+ if len(self.completePieces) > 1:
+ df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
else:
- df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+ df = peer.get(path)
reactor.callLater(0, df.addCallbacks,
*(self._getPiece, self._getError),
**{'callbackArgs': (piece, peer),
'errbackArgs': (piece, peer)})
piece += 1
- log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
# Check if we're done
if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
- log.msg('We seem to be done with all pieces')
+ log.msg('Download is complete for %s' % self.path)
self.stream.allAvailable()
def _getPiece(self, response, piece, peer):
"""Process the retrieved headers from the peer."""
- log.msg('Got response for piece %d from peer %r' % (piece, peer))
if ((len(self.completePieces) > 1 and response.code != 206) or
(response.code not in (200, 206))):
# Request failed, try a different peer
if response.stream and response.stream.length:
stream.readAndDiscard(response.stream)
else:
+ if self.defer:
+ # Start sending the return file
+ df = self.defer
+ self.defer = None
+ self.stream = GrowingFileStream(self.file, self.hash.expSize)
+
+ # Get the headers from the peer's response
+ headers = {}
+ if response.headers.hasHeader('last-modified'):
+ headers['last-modified'] = response.headers.getHeader('last-modified')
+ resp = Response(200, headers, self.stream)
+ df.callback(resp)
+
# Read the response stream to the file
log.msg('Streaming piece %d from peer %r' % (piece, peer))
if response.code == 206:
- df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
- PIECE_SIZE).run()
+ df = StreamToFile(self.hash.newPieceHasher(), response.stream,
+ self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
else:
- df = StreamToFile(response.stream, self.file).run()
+ df = StreamToFile(self.hash.newHasher(), response.stream,
+ self.file).run()
reactor.callLater(0, df.addCallbacks,
*(self._gotPiece, self._gotError),
**{'callbackArgs': (piece, peer),
def _gotPiece(self, response, piece, peer):
"""Process the retrieved piece from the peer."""
- log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
if self.pieces[piece] and response != self.pieces[piece]:
# Hash doesn't match
log.msg('Hash error for piece %d from peer %r' % (piece, peer))