import sha
from twisted.internet import reactor, defer
-from twisted.python import log, filepath
+from twisted.python import log
from twisted.trial import unittest
from twisted.web2 import stream
from twisted.web2.http import Response, splitHostPort
from HTTPDownloader import Peer
+from Streams import GrowingFileStream, StreamToFile
from util import uncompact
from Hash import PIECE_SIZE
from apt_p2p_Khashmir.bencode import bdecode
from apt_p2p_conf import config
-
class PeerError(Exception):
"""An error occurred downloading from peers."""
-class GrowingFileStream(stream.FileStream):
- """Modified to stream data from a file as it becomes available.
-
- @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
- @ivar deferred: waiting for the result of the last read attempt
- @ivar available: the number of bytes that are currently available to read
- @ivar position: the current position in the file where the next read will begin
- @ivar finished: True when no more data will be coming available
- """
-
- CHUNK_SIZE = 4*1024
-
- def __init__(self, f, length = None):
- stream.FileStream.__init__(self, f)
- self.length = length
- self.deferred = None
- self.available = 0L
- self.position = 0L
- self.finished = False
-
- def updateAvailable(self, newlyAvailable):
- """Update the number of bytes that are available.
-
- Call it with 0 to trigger reading of a fully read file.
-
- @param newlyAvailable: the number of bytes that just became available
- """
- assert not self.finished
- self.available += newlyAvailable
-
- # If a read is pending, let it go
- if self.deferred and self.position < self.available:
- # Try to read some data from the file
- length = self.available - self.position
- readSize = min(length, self.CHUNK_SIZE)
- self.f.seek(self.position)
- b = self.f.read(readSize)
- bytesRead = len(b)
-
- # Check if end of file was reached
- if bytesRead:
- self.position += bytesRead
- deferred = self.deferred
- self.deferred = None
- deferred.callback(b)
-
- def allAvailable(self):
- """Indicate that no more data will be coming available."""
- self.finished = True
-
- # If a read is pending, let it go
- if self.deferred:
- if self.position < self.available:
- # Try to read some data from the file
- length = self.available - self.position
- readSize = min(length, self.CHUNK_SIZE)
- self.f.seek(self.position)
- b = self.f.read(readSize)
- bytesRead = len(b)
-
- # Check if end of file was reached
- if bytesRead:
- self.position += bytesRead
- deferred = self.deferred
- self.deferred = None
- deferred.callback(b)
- else:
- # We're done
- self._close()
- deferred = self.deferred
- self.deferred = None
- deferred.callback(None)
- else:
- # We're done
- self._close()
- deferred = self.deferred
- self.deferred = None
- deferred.callback(None)
-
- def read(self, sendfile=False):
- assert not self.deferred, "A previous read is still deferred."
-
- if self.f is None:
- return None
-
- length = self.available - self.position
- readSize = min(length, self.CHUNK_SIZE)
-
- # If we don't have any available, we're done or deferred
- if readSize <= 0:
- if self.finished:
- self._close()
- return None
- else:
- self.deferred = defer.Deferred()
- return self.deferred
-
- # Try to read some data from the file
- self.f.seek(self.position)
- b = self.f.read(readSize)
- bytesRead = len(b)
- if not bytesRead:
- # End of file was reached, we're done or deferred
- if self.finished:
- self._close()
- return None
- else:
- self.deferred = defer.Deferred()
- return self.deferred
- else:
- self.position += bytesRead
- return b
-
- def _close(self):
- """Close the temporary file and remove it."""
- self.f.close()
- filepath.FilePath(self.f.name).remove()
- self.f = None
-
-class StreamToFile:
- """Save a stream to a partial file and hash it.
-
- @type stream: L{twisted.web2.stream.IByteStream}
- @ivar stream: the input stream being read
- @type outFile: L{twisted.python.filepath.FilePath}
- @ivar outFile: the file being written
- @type hasher: hashing object, e.g. C{sha1}
- @ivar hasher: the hash object for the data
- @type position: C{int}
- @ivar position: the current file position to write the next data to
- @type length: C{int}
- @ivar length: the position in the file to not write beyond
- @type doneDefer: L{twisted.internet.defer.Deferred}
- @ivar doneDefer: the deferred that will fire when done writing
- """
-
- def __init__(self, hasher, inputStream, outFile, start = 0, length = None):
- """Initializes the file.
-
- @type hasher: hashing object, e.g. C{sha1}
- @param hasher: the hash object for the data
- @type inputStream: L{twisted.web2.stream.IByteStream}
- @param inputStream: the input stream to read from
- @type outFile: L{twisted.python.filepath.FilePath}
- @param outFile: the file to write to
- @type start: C{int}
- @param start: the file position to start writing at
- (optional, defaults to the start of the file)
- @type length: C{int}
- @param length: the maximum amount of data to write to the file
- (optional, defaults to not limiting the writing to the file
- """
- self.stream = inputStream
- self.outFile = outFile
- self.hasher = hasher
- self.position = start
- self.length = None
- if length is not None:
- self.length = start + length
- self.doneDefer = None
-
- def run(self):
- """Start the streaming.
-
- @rtype: L{twisted.internet.defer.Deferred}
- """
- self.doneDefer = stream.readStream(self.stream, self._gotData)
- self.doneDefer.addCallbacks(self._done, self._error)
- return self.doneDefer
-
- def _gotData(self, data):
- """Process the received data."""
- if self.outFile.closed:
- raise PeerError, "outFile was unexpectedly closed"
-
- if data is None:
- raise PeerError, "Data is None?"
-
- # Make sure we don't go too far
- if self.length is not None and self.position + len(data) > self.length:
- data = data[:(self.length - self.position)]
-
- # Write and hash the streamed data
- self.outFile.seek(self.position)
- self.outFile.write(data)
- self.hasher.update(data)
- self.position += len(data)
-
- def _done(self, result):
- """Return the result."""
- return self.hasher.digest()
-
- def _error(self, err):
- """Log the error."""
- log.msg('Streaming error')
- log.err(err)
- return err
-
class FileDownload:
"""Manage a download from a list of peers or a mirror.
#{ Downloading the pieces
def getPieces(self):
"""Download the next pieces from the peers."""
+ if self.file.closed:
+ log.msg('Download has been aborted for %s' % self.path)
+ self.stream.allAvailable(remove = True)
+ return
+
self.sort()
piece = self.nextFinish
while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
# Check if we're done
if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
log.msg('Download is complete for %s' % self.path)
- self.stream.allAvailable()
+ self.stream.allAvailable(remove = True)
def _getPiece(self, response, piece, peer):
"""Process the retrieved headers from the peer."""
self.getPieces()
log.err(err)
- def _gotPiece(self, response, piece, peer):
+ def _gotPiece(self, hash, piece, peer):
"""Process the retrieved piece from the peer."""
- if self.pieces[piece] and response != self.pieces[piece]:
+ if self.pieces[piece] and hash.digest() != self.pieces[piece]:
# Hash doesn't match
log.msg('Hash error for piece %d from peer %r' % (piece, peer))
peer.hashError('Piece received from peer does not match expected')
def _gotError(self, err, piece, peer):
"""Piece download failed, try again."""
- log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
+ log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, err))
log.err(err)
self.completePieces[piece] = False
self.getPieces()