]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/PeerManager.py
New TODO for hashing files with detached GPG signatures.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
index 7f63c974aadb7c48b12298433627611a20394a0b..cfc0f78103e5a2ce8fed2f0084d71cf2c8f253c3 100644 (file)
@@ -8,219 +8,21 @@ from binascii import b2a_hex, a2b_hex
 import sha
 
 from twisted.internet import reactor, defer
-from twisted.python import log, filepath
+from twisted.python import log
 from twisted.trial import unittest
 from twisted.web2 import stream
 from twisted.web2.http import Response, splitHostPort
 
 from HTTPDownloader import Peer
+from Streams import GrowingFileStream, StreamToFile
 from util import uncompact
 from Hash import PIECE_SIZE
 from apt_p2p_Khashmir.bencode import bdecode
 from apt_p2p_conf import config
 
-
 class PeerError(Exception):
     """An error occurred downloading from peers."""
     
-class GrowingFileStream(stream.FileStream):
-    """Modified to stream data from a file as it becomes available.
-    
-    @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
-    @ivar deferred: waiting for the result of the last read attempt
-    @ivar available: the number of bytes that are currently available to read
-    @ivar position: the current position in the file where the next read will begin
-    @ivar finished: True when no more data will be coming available
-    """
-
-    CHUNK_SIZE = 4*1024
-
-    def __init__(self, f, length = None):
-        stream.FileStream.__init__(self, f)
-        self.length = length
-        self.deferred = None
-        self.available = 0L
-        self.position = 0L
-        self.finished = False
-
-    def updateAvailable(self, newlyAvailable):
-        """Update the number of bytes that are available.
-        
-        Call it with 0 to trigger reading of a fully read file.
-        
-        @param newlyAvailable: the number of bytes that just became available
-        """
-        assert not self.finished
-        self.available += newlyAvailable
-        
-        # If a read is pending, let it go
-        if self.deferred and self.position < self.available:
-            # Try to read some data from the file
-            length = self.available - self.position
-            readSize = min(length, self.CHUNK_SIZE)
-            self.f.seek(self.position)
-            b = self.f.read(readSize)
-            bytesRead = len(b)
-            
-            # Check if end of file was reached
-            if bytesRead:
-                self.position += bytesRead
-                deferred = self.deferred
-                self.deferred = None
-                deferred.callback(b)
-
-    def allAvailable(self):
-        """Indicate that no more data will be coming available."""
-        self.finished = True
-
-        # If a read is pending, let it go
-        if self.deferred:
-            if self.position < self.available:
-                # Try to read some data from the file
-                length = self.available - self.position
-                readSize = min(length, self.CHUNK_SIZE)
-                self.f.seek(self.position)
-                b = self.f.read(readSize)
-                bytesRead = len(b)
-    
-                # Check if end of file was reached
-                if bytesRead:
-                    self.position += bytesRead
-                    deferred = self.deferred
-                    self.deferred = None
-                    deferred.callback(b)
-                else:
-                    # We're done
-                    self._close()
-                    deferred = self.deferred
-                    self.deferred = None
-                    deferred.callback(None)
-            else:
-                # We're done
-                self._close()
-                deferred = self.deferred
-                self.deferred = None
-                deferred.callback(None)
-        
-    def read(self, sendfile=False):
-        assert not self.deferred, "A previous read is still deferred."
-
-        if self.f is None:
-            return None
-
-        length = self.available - self.position
-        readSize = min(length, self.CHUNK_SIZE)
-
-        # If we don't have any available, we're done or deferred
-        if readSize <= 0:
-            if self.finished:
-                self._close()
-                return None
-            else:
-                self.deferred = defer.Deferred()
-                return self.deferred
-
-        # Try to read some data from the file
-        self.f.seek(self.position)
-        b = self.f.read(readSize)
-        bytesRead = len(b)
-        if not bytesRead:
-            # End of file was reached, we're done or deferred
-            if self.finished:
-                self._close()
-                return None
-            else:
-                self.deferred = defer.Deferred()
-                return self.deferred
-        else:
-            self.position += bytesRead
-            return b
-
-    def _close(self):
-        """Close the temporary file and remove it."""
-        self.f.close()
-        filepath.FilePath(self.f.name).remove()
-        self.f = None
-        
-class StreamToFile:
-    """Save a stream to a partial file and hash it.
-    
-    @type stream: L{twisted.web2.stream.IByteStream}
-    @ivar stream: the input stream being read
-    @type outFile: L{twisted.python.filepath.FilePath}
-    @ivar outFile: the file being written
-    @type hash: C{sha1}
-    @ivar hash: the hash object for the data
-    @type position: C{int}
-    @ivar position: the current file position to write the next data to
-    @type length: C{int}
-    @ivar length: the position in the file to not write beyond
-    @type doneDefer: L{twisted.internet.defer.Deferred}
-    @ivar doneDefer: the deferred that will fire when done writing
-    """
-    
-    def __init__(self, inputStream, outFile, start = 0, length = None):
-        """Initializes the file.
-        
-        @type inputStream: L{twisted.web2.stream.IByteStream}
-        @param inputStream: the input stream to read from
-        @type outFile: L{twisted.python.filepath.FilePath}
-        @param outFile: the file to write to
-        @type start: C{int}
-        @param start: the file position to start writing at
-            (optional, defaults to the start of the file)
-        @type length: C{int}
-        @param length: the maximum amount of data to write to the file
-            (optional, defaults to not limiting the writing to the file
-        """
-        self.stream = inputStream
-        self.outFile = outFile
-        self.hash = sha.new()
-        self.position = start
-        self.length = None
-        if length is not None:
-            self.length = start + length
-        self.doneDefer = None
-        
-    def run(self):
-        """Start the streaming.
-
-        @rtype: L{twisted.internet.defer.Deferred}
-        """
-        log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
-        self.doneDefer = stream.readStream(self.stream, self._gotData)
-        self.doneDefer.addCallbacks(self._done, self._error)
-        return self.doneDefer
-
-    def _gotData(self, data):
-        """Process the received data."""
-        if self.outFile.closed:
-            raise PeerError, "outFile was unexpectedly closed"
-        
-        if data is None:
-            raise PeerError, "Data is None?"
-        
-        # Make sure we don't go too far
-        if self.length is not None and self.position + len(data) > self.length:
-            data = data[:(self.length - self.position)]
-        
-        # Write and hash the streamed data
-        self.outFile.seek(self.position)
-        self.outFile.write(data)
-        self.hash.update(data)
-        self.position += len(data)
-        
-    def _done(self, result):
-        """Return the result."""
-        log.msg('Streaming is complete')
-        return self.hash.digest()
-    
-    def _error(self, err):
-        """Log the error."""
-        log.msg('Streaming error')
-        log.err(err)
-        return err
-    
 class FileDownload:
     """Manage a download from a list of peers or a mirror.
     
@@ -370,7 +172,7 @@ class FileDownload:
         #        del self.peers[site]
 
         # Start the DHT lookup
-        lookupDefer = self.manager.dht.getValue(key)
+        lookupDefer = self.manager.dht.get(key)
         lookupDefer.addBoth(self._getDHTPieces, key)
         
     def _getDHTPieces(self, results, key):
@@ -528,6 +330,11 @@ class FileDownload:
     #{ Downloading the pieces
     def getPieces(self):
         """Download the next pieces from the peers."""
+        if self.file.closed:
+            log.msg('Download has been aborted for %s' % self.path)
+            self.stream.allAvailable(remove = True)
+            return
+            
         self.sort()
         piece = self.nextFinish
         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
@@ -538,10 +345,13 @@ class FileDownload:
                 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
                 
                 self.outstanding += 1
+                path = self.path
                 if peer.mirror:
-                    df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+                    path = self.mirror_path
+                if len(self.completePieces) > 1:
+                    df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
                 else:
-                    df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+                    df = peer.get(path)
                 reactor.callLater(0, df.addCallbacks,
                                   *(self._getPiece, self._getError),
                                   **{'callbackArgs': (piece, peer),
@@ -550,12 +360,11 @@ class FileDownload:
                 
         # Check if we're done
         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
-            log.msg('We seem to be done with all pieces')
-            self.stream.allAvailable()
+            log.msg('Download is complete for %s' % self.path)
+            self.stream.allAvailable(remove = True)
     
     def _getPiece(self, response, piece, peer):
         """Process the retrieved headers from the peer."""
-        log.msg('Got response for piece %d from peer %r' % (piece, peer))
         if ((len(self.completePieces) > 1 and response.code != 206) or
             (response.code not in (200, 206))):
             # Request failed, try a different peer
@@ -575,16 +384,17 @@ class FileDownload:
                 headers = {}
                 if response.headers.hasHeader('last-modified'):
                     headers['last-modified'] = response.headers.getHeader('last-modified')
-                resp = Response(200, {}, self.stream)
+                resp = Response(200, headers, self.stream)
                 df.callback(resp)
 
             # Read the response stream to the file
             log.msg('Streaming piece %d from peer %r' % (piece, peer))
             if response.code == 206:
-                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
-                                  PIECE_SIZE).run()
+                df = StreamToFile(self.hash.newPieceHasher(), response.stream,
+                                  self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
             else:
-                df = StreamToFile(response.stream, self.file).run()
+                df = StreamToFile(self.hash.newHasher(), response.stream,
+                                  self.file).run()
             reactor.callLater(0, df.addCallbacks,
                               *(self._gotPiece, self._gotError),
                               **{'callbackArgs': (piece, peer),
@@ -603,10 +413,9 @@ class FileDownload:
         self.getPieces()
         log.err(err)
 
-    def _gotPiece(self, response, piece, peer):
+    def _gotPiece(self, hash, piece, peer):
         """Process the retrieved piece from the peer."""
-        log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
-        if self.pieces[piece] and response != self.pieces[piece]:
+        if self.pieces[piece] and hash.digest() != self.pieces[piece]:
             # Hash doesn't match
             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
             peer.hashError('Piece received from peer does not match expected')
@@ -624,7 +433,7 @@ class FileDownload:
 
     def _gotError(self, err, piece, peer):
         """Piece download failed, try again."""
-        log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
+        log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, err))
         log.err(err)
         self.completePieces[piece] = False
         self.getPieces()
@@ -634,7 +443,7 @@ class PeerManager:
     
     @type cache_dir: L{twisted.python.filepath.FilePath}
     @ivar cache_dir: the directory to use for storing all files
-    @type dht: L{interfaces.IDHT}
+    @type dht: L{DHTManager.DHT}
     @ivar dht: the DHT instance
     @type stats: L{stats.StatsLogger}
     @ivar stats: the statistics logger to record sent data to