]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/PeerManager.py
Don't try and add ourself to the routing table during an action.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
index deeb668a54c4ae670a4e501c5e106fe7b6a300ff..cfc0f78103e5a2ce8fed2f0084d71cf2c8f253c3 100644 (file)
@@ -8,219 +8,21 @@ from binascii import b2a_hex, a2b_hex
 import sha
 
 from twisted.internet import reactor, defer
 import sha
 
 from twisted.internet import reactor, defer
-from twisted.python import log, filepath
+from twisted.python import log
 from twisted.trial import unittest
 from twisted.web2 import stream
 from twisted.web2.http import Response, splitHostPort
 
 from HTTPDownloader import Peer
 from twisted.trial import unittest
 from twisted.web2 import stream
 from twisted.web2.http import Response, splitHostPort
 
 from HTTPDownloader import Peer
+from Streams import GrowingFileStream, StreamToFile
 from util import uncompact
 from Hash import PIECE_SIZE
 from apt_p2p_Khashmir.bencode import bdecode
 from apt_p2p_conf import config
 
 from util import uncompact
 from Hash import PIECE_SIZE
 from apt_p2p_Khashmir.bencode import bdecode
 from apt_p2p_conf import config
 
-
 class PeerError(Exception):
     """An error occurred downloading from peers."""
     
 class PeerError(Exception):
     """An error occurred downloading from peers."""
     
-class GrowingFileStream(stream.FileStream):
-    """Modified to stream data from a file as it becomes available.
-    
-    @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
-    @ivar deferred: waiting for the result of the last read attempt
-    @ivar available: the number of bytes that are currently available to read
-    @ivar position: the current position in the file where the next read will begin
-    @ivar finished: True when no more data will be coming available
-    """
-
-    CHUNK_SIZE = 4*1024
-
-    def __init__(self, f, length = None):
-        stream.FileStream.__init__(self, f)
-        self.length = length
-        self.deferred = None
-        self.available = 0L
-        self.position = 0L
-        self.finished = False
-
-    def updateAvailable(self, newlyAvailable):
-        """Update the number of bytes that are available.
-        
-        Call it with 0 to trigger reading of a fully read file.
-        
-        @param newlyAvailable: the number of bytes that just became available
-        """
-        assert not self.finished
-        self.available += newlyAvailable
-        
-        # If a read is pending, let it go
-        if self.deferred and self.position < self.available:
-            # Try to read some data from the file
-            length = self.available - self.position
-            readSize = min(length, self.CHUNK_SIZE)
-            self.f.seek(self.position)
-            b = self.f.read(readSize)
-            bytesRead = len(b)
-            
-            # Check if end of file was reached
-            if bytesRead:
-                self.position += bytesRead
-                deferred = self.deferred
-                self.deferred = None
-                deferred.callback(b)
-
-    def allAvailable(self):
-        """Indicate that no more data will be coming available."""
-        self.finished = True
-
-        # If a read is pending, let it go
-        if self.deferred:
-            if self.position < self.available:
-                # Try to read some data from the file
-                length = self.available - self.position
-                readSize = min(length, self.CHUNK_SIZE)
-                self.f.seek(self.position)
-                b = self.f.read(readSize)
-                bytesRead = len(b)
-    
-                # Check if end of file was reached
-                if bytesRead:
-                    self.position += bytesRead
-                    deferred = self.deferred
-                    self.deferred = None
-                    deferred.callback(b)
-                else:
-                    # We're done
-                    self._close()
-                    deferred = self.deferred
-                    self.deferred = None
-                    deferred.callback(None)
-            else:
-                # We're done
-                self._close()
-                deferred = self.deferred
-                self.deferred = None
-                deferred.callback(None)
-        
-    def read(self, sendfile=False):
-        assert not self.deferred, "A previous read is still deferred."
-
-        if self.f is None:
-            return None
-
-        length = self.available - self.position
-        readSize = min(length, self.CHUNK_SIZE)
-
-        # If we don't have any available, we're done or deferred
-        if readSize <= 0:
-            if self.finished:
-                self._close()
-                return None
-            else:
-                self.deferred = defer.Deferred()
-                return self.deferred
-
-        # Try to read some data from the file
-        self.f.seek(self.position)
-        b = self.f.read(readSize)
-        bytesRead = len(b)
-        if not bytesRead:
-            # End of file was reached, we're done or deferred
-            if self.finished:
-                self._close()
-                return None
-            else:
-                self.deferred = defer.Deferred()
-                return self.deferred
-        else:
-            self.position += bytesRead
-            return b
-
-    def _close(self):
-        """Close the temporary file and remove it."""
-        self.f.close()
-        filepath.FilePath(self.f.name).remove()
-        self.f = None
-        
-class StreamToFile:
-    """Save a stream to a partial file and hash it.
-    
-    @type stream: L{twisted.web2.stream.IByteStream}
-    @ivar stream: the input stream being read
-    @type outFile: L{twisted.python.filepath.FilePath}
-    @ivar outFile: the file being written
-    @type hasher: hashing object, e.g. C{sha1}
-    @ivar hasher: the hash object for the data
-    @type position: C{int}
-    @ivar position: the current file position to write the next data to
-    @type length: C{int}
-    @ivar length: the position in the file to not write beyond
-    @type doneDefer: L{twisted.internet.defer.Deferred}
-    @ivar doneDefer: the deferred that will fire when done writing
-    """
-    
-    def __init__(self, hasher, inputStream, outFile, start = 0, length = None):
-        """Initializes the file.
-        
-        @type hasher: hashing object, e.g. C{sha1}
-        @param hasher: the hash object for the data
-        @type inputStream: L{twisted.web2.stream.IByteStream}
-        @param inputStream: the input stream to read from
-        @type outFile: L{twisted.python.filepath.FilePath}
-        @param outFile: the file to write to
-        @type start: C{int}
-        @param start: the file position to start writing at
-            (optional, defaults to the start of the file)
-        @type length: C{int}
-        @param length: the maximum amount of data to write to the file
-            (optional, defaults to not limiting the writing to the file
-        """
-        self.stream = inputStream
-        self.outFile = outFile
-        self.hasher = hasher
-        self.position = start
-        self.length = None
-        if length is not None:
-            self.length = start + length
-        self.doneDefer = None
-        
-    def run(self):
-        """Start the streaming.
-
-        @rtype: L{twisted.internet.defer.Deferred}
-        """
-        self.doneDefer = stream.readStream(self.stream, self._gotData)
-        self.doneDefer.addCallbacks(self._done, self._error)
-        return self.doneDefer
-
-    def _gotData(self, data):
-        """Process the received data."""
-        if self.outFile.closed:
-            raise PeerError, "outFile was unexpectedly closed"
-        
-        if data is None:
-            raise PeerError, "Data is None?"
-        
-        # Make sure we don't go too far
-        if self.length is not None and self.position + len(data) > self.length:
-            data = data[:(self.length - self.position)]
-        
-        # Write and hash the streamed data
-        self.outFile.seek(self.position)
-        self.outFile.write(data)
-        self.hasher.update(data)
-        self.position += len(data)
-        
-    def _done(self, result):
-        """Return the result."""
-        return self.hasher.digest()
-    
-    def _error(self, err):
-        """Log the error."""
-        log.msg('Streaming error')
-        log.err(err)
-        return err
-    
 class FileDownload:
     """Manage a download from a list of peers or a mirror.
     
 class FileDownload:
     """Manage a download from a list of peers or a mirror.
     
@@ -370,7 +172,7 @@ class FileDownload:
         #        del self.peers[site]
 
         # Start the DHT lookup
         #        del self.peers[site]
 
         # Start the DHT lookup
-        lookupDefer = self.manager.dht.getValue(key)
+        lookupDefer = self.manager.dht.get(key)
         lookupDefer.addBoth(self._getDHTPieces, key)
         
     def _getDHTPieces(self, results, key):
         lookupDefer.addBoth(self._getDHTPieces, key)
         
     def _getDHTPieces(self, results, key):
@@ -528,6 +330,11 @@ class FileDownload:
     #{ Downloading the pieces
     def getPieces(self):
         """Download the next pieces from the peers."""
     #{ Downloading the pieces
     def getPieces(self):
         """Download the next pieces from the peers."""
+        if self.file.closed:
+            log.msg('Download has been aborted for %s' % self.path)
+            self.stream.allAvailable(remove = True)
+            return
+            
         self.sort()
         piece = self.nextFinish
         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
         self.sort()
         piece = self.nextFinish
         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
@@ -554,7 +361,7 @@ class FileDownload:
         # Check if we're done
         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
             log.msg('Download is complete for %s' % self.path)
         # Check if we're done
         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
             log.msg('Download is complete for %s' % self.path)
-            self.stream.allAvailable()
+            self.stream.allAvailable(remove = True)
     
     def _getPiece(self, response, piece, peer):
         """Process the retrieved headers from the peer."""
     
     def _getPiece(self, response, piece, peer):
         """Process the retrieved headers from the peer."""
@@ -606,9 +413,9 @@ class FileDownload:
         self.getPieces()
         log.err(err)
 
         self.getPieces()
         log.err(err)
 
-    def _gotPiece(self, response, piece, peer):
+    def _gotPiece(self, hash, piece, peer):
         """Process the retrieved piece from the peer."""
         """Process the retrieved piece from the peer."""
-        if self.pieces[piece] and response != self.pieces[piece]:
+        if self.pieces[piece] and hash.digest() != self.pieces[piece]:
             # Hash doesn't match
             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
             peer.hashError('Piece received from peer does not match expected')
             # Hash doesn't match
             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
             peer.hashError('Piece received from peer does not match expected')
@@ -626,7 +433,7 @@ class FileDownload:
 
     def _gotError(self, err, piece, peer):
         """Piece download failed, try again."""
 
     def _gotError(self, err, piece, peer):
         """Piece download failed, try again."""
-        log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
+        log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, err))
         log.err(err)
         self.completePieces[piece] = False
         self.getPieces()
         log.err(err)
         self.completePieces[piece] = False
         self.getPieces()
@@ -636,7 +443,7 @@ class PeerManager:
     
     @type cache_dir: L{twisted.python.filepath.FilePath}
     @ivar cache_dir: the directory to use for storing all files
     
     @type cache_dir: L{twisted.python.filepath.FilePath}
     @ivar cache_dir: the directory to use for storing all files
-    @type dht: L{interfaces.IDHT}
+    @type dht: L{DHTManager.DHT}
     @ivar dht: the DHT instance
     @type stats: L{stats.StatsLogger}
     @ivar stats: the statistics logger to record sent data to
     @ivar dht: the DHT instance
     @type stats: L{stats.StatsLogger}
     @ivar stats: the statistics logger to record sent data to