More multiple peer downloading work, finished but still untested.
authorCameron Dale <camrdale@gmail.com>
Thu, 10 Apr 2008 00:27:29 +0000 (17:27 -0700)
committerCameron Dale <camrdale@gmail.com>
Thu, 10 Apr 2008 00:27:29 +0000 (17:27 -0700)
apt_p2p/HTTPDownloader.py
apt_p2p/PeerManager.py

index 2a53897..e3eb66e 100644 (file)
@@ -105,8 +105,6 @@ class Peer(ClientFactory):
         req = self.response_queue.pop(0)
         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
         self._completed += 1
-        if resp.code >= 400:
-            self._errors += 1
         now = datetime.now()
         self._responseTimes.append((now, now - req.submissionTime))
         self._lastResponse = (now, resp.stream.length)
index 90a35c5..0f64e8f 100644 (file)
@@ -11,7 +11,7 @@ from twisted.internet import reactor, defer
 from twisted.python import log
 from twisted.trial import unittest
 from twisted.web2 import stream
-from twisted.web2.http import splitHostPort
+from twisted.web2.http import Response, splitHostPort
 
 from HTTPDownloader import Peer
 from util import uncompact
@@ -65,7 +65,7 @@ class GrowingFileStream(stream.FileStream):
                 deferred.callback(b)
 
     def allAvailable(self):
-        """Indicate that no more data is coming available."""
+        """Indicate that no more data will be coming available."""
         self.finished = True
 
         # If a read is pending, let it go
@@ -124,74 +124,126 @@ class GrowingFileStream(stream.FileStream):
             return b
 
 class StreamToFile(defer.Deferred):
-    """Saves a stream to a file.
+    """Save a stream to a partial file and hash it.
     
     @type stream: L{twisted.web2.stream.IByteStream}
     @ivar stream: the input stream being read
     @type outFile: L{twisted.python.filepath.FilePath}
     @ivar outFile: the file being written
-    @type hash: L{Hash.HashObject}
-    @ivar hash: the hash object for the file
+    @type hash: C{sha1}
+    @ivar hash: the hash object for the data
+    @type position: C{int}
+    @ivar position: the current file position to write the next data to
     @type length: C{int}
-    @ivar length: the length of the original (compressed) file
+    @ivar length: the position in the file to not write beyond
     @type doneDefer: L{twisted.internet.defer.Deferred}
-    @ivar doneDefer: the deferred that will fire when done streaming
+    @ivar doneDefer: the deferred that will fire when done writing
     """
     
-    def __init__(self, inputStream, outFile, hash, start, length):
+    def __init__(self, inputStream, outFile, start = 0, length = None):
         """Initializes the file.
         
         @type inputStream: L{twisted.web2.stream.IByteStream}
         @param inputStream: the input stream to read from
         @type outFile: L{twisted.python.filepath.FilePath}
         @param outFile: the file to write to
-        @type hash: L{Hash.HashObject}
-        @param hash: the hash object to use for the file
+        @type start: C{int}
+        @param start: the file position to start writing at
+            (optional, defaults to the start of the file)
+        @type length: C{int}
+        @param length: the maximum amount of data to write to the file
+            (optional, defaults to not limiting the writing to the file
         """
         self.stream = inputStream
         self.outFile = outFile
-        self.hash = hash
-        self.hash.new()
-        self.length = self.stream.length
+        self.hash = sha.new()
+        self.position = start
+        self.length = None
+        if length is not None:
+            self.length = start + length
         self.doneDefer = None
         
     def run(self):
-        """Start the streaming."""
-        self.doneDefer = stream.readStream(self.stream, _gotData)
+        """Start the streaming.
+
+        @rtype: L{twisted.internet.defer.Deferred}
+        """
+        self.doneDefer = stream.readStream(self.stream, self._gotData)
         self.doneDefer.addCallbacks(self._done, self._error)
         return self.doneDefer
 
-    def _done(self):
-        """Close all the output files, return the result."""
-        if not self.outFile.closed:
-            self.outFile.close()
-            self.hash.digest()
-            self.doneDefer.callback(self.hash)
-    
     def _gotData(self, data):
+        """Process the received data."""
         if self.outFile.closed:
-            return
+            raise Exception, "outFile was unexpectedly closed"
         
         if data is None:
-            self._done()
+            raise Exception, "Data is None?"
+        
+        # Make sure we don't go too far
+        if self.length is not None and self.position + len(data) > self.length:
+            data = data[:(self.length - self.position)]
         
         # Write and hash the streamed data
+        self.outFile.seek(self.position)
         self.outFile.write(data)
         self.hash.update(data)
+        self.position += len(data)
         
+    def _done(self, result):
+        """Return the result."""
+        return self.hash.digest()
+    
+    def _error(self, err):
+        """Log the error."""
+        log.err(err)
+        return err
+    
 class FileDownload:
     """Manage a download from a list of peers or a mirror.
     
-    
+    @type manager: L{PeerManager}
+    @ivar manager: the manager to send requests for peers to
+    @type hash: L{Hash.HashObject}
+    @ivar hash: the hash object containing the expected hash for the file
+    @ivar mirror: the URI of the file on the mirror
+    @type compact_peers: C{list} of C{dictionary}
+    @ivar compact_peers: a list of the peer info where the file can be found
+    @type file: C{file}
+    @ivar file: the open file to right the download to
+    @type path: C{string}
+    @ivar path: the path to request from peers to access the file
+    @type pieces: C{list} of C{string} 
+    @ivar pieces: the hashes of the pieces in the file
+    @type started: C{boolean}
+    @ivar started: whether the download has begun yet
+    @type defer: L{twisted.internet.defer.Deferred}
+    @ivar defer: the deferred that will callback with the result of the download
+    @type peers: C{dictionary}
+    @ivar peers: information about each of the peers available to download from
+    @type outstanding: C{int}
+    @ivar outstanding: the number of requests to peers currently outstanding
+    @type peerlist: C{list} of L{HTTPDownloader.Peer}
+    @ivar peerlist: the sorted list of peers for this download
+    @type stream: L{GrowingFileStream}
+    @ivar stream: the stream of resulting data from the download
+    @type nextFinish: C{int}
+    @ivar nextFinish: the next piece that is needed to finish for the stream
+    @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
+    @ivar completePieces: one per piece, will be False if no requests are
+        outstanding for the piece, True if the piece has been successfully
+        downloaded, or the Peer that a request for this piece has been sent  
     """
     
     def __init__(self, manager, hash, mirror, compact_peers, file):
         """Initialize the instance and check for piece hashes.
         
+        @type manager: L{PeerManager}
+        @param manager: the manager to send requests for peers to
         @type hash: L{Hash.HashObject}
         @param hash: the hash object containing the expected hash for the file
         @param mirror: the URI of the file on the mirror
-        @type compact_peers: C{list} of C{string}
+        @type compact_peers: C{list} of C{dictionary}
         @param compact_peers: a list of the peer info where the file can be found
         @type file: L{twisted.python.filepath.FilePath}
         @param file: the temporary file to use to store the downloaded file
@@ -259,6 +311,7 @@ class FileDownload:
                 # Find the most popular piece string
                 if num == max_found:
                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+                    log.msg('Peer info contained %d piece hashes' % len(self.pieces))
                     self.startDownload()
                     break
         elif max_found == max(pieces_hash.values()):
@@ -412,39 +465,110 @@ class FileDownload:
             self.defer.callback(self.peerlist[0].get(self.path))
             return
         
-        self.sort()
+        # Start sending the return file
+        self.stream = GrowingFileStream(self.file)
+        resp = Response(200, {}, self.stream)
+        self.defer.callback(resp)
+
+        # Begin to download the pieces
         self.outstanding = 0
-        self.next_piece = 0
+        self.nextFinish = 0
+        if self.pieces:
+            self.completePieces = [False for piece in self.pieces]
+        else:
+            self.completePieces = [False]
+        self.getPieces()
         
-        while self.outstanding < 3 and self.peerlist and self.next_piece < len(self.pieces):
-            peer = self.peerlist.pop()
-            piece = self.next_piece
-            self.next_piece += 1
-            
-            self.outstanding += 1
-            df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
-            df.addCallbacks(self._gotPiece, self._gotError,
-                            callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+    #{ Downloading the pieces
+    def getPieces(self):
+        """Download the next pieces from the peers."""
+        self.sort()
+        piece = self.nextFinish
+        while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
+            if self.completePieces[piece] == False:
+                # Send a request to the highest ranked peer
+                peer = self.peerlist.pop()
+                self.completePieces[piece] = peer
+                
+                self.outstanding += 1
+                if self.pieces:
+                    df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+                else:
+                    df = peer.get(self.path)
+                reactor.callLater(0, df.addCallbacks,
+                                  *(self._getPiece, self._getError),
+                                  **{'callbackArgs': (piece, peer),
+                                     'errbackArgs': (piece, peer)})
+                piece += 1
+                
+        # Check if we're don
+        if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
+            self.stream.allAvailable()
     
-    def _gotPiece(self, response, piece, peer):
-        """Process the retrieved piece from the peer."""
-        if response.code != 206:
+    def _getPiece(self, response, piece, peer):
+        """Process the retrieved headers from the peer."""
+        if ((len(self.completePieces) > 1 and response.code != 206) or
+            (response.code not in (200, 206))):
             # Request failed, try a different peer
-            self.getPeerPieces(key, site)
+            peer.hashError()
+            self.completePieces[piece] = False
+            if response.stream and response.stream.length:
+                stream.readAndDiscard(response.stream)
         else:
             # Read the response stream to the file
-            df = StreamToFile(response.stream, self.file, self.hash, piece*PIECE_SIZE, PIECE_SIZE).run()
-            df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
-                            callbackArgs=(key, site), errbackArgs=(key, site))
+            if response.code == 206:
+                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
+            else:
+                df = StreamToFile(response.stream, self.file).run()
+            df.addCallbacks(self._gotPiece, self._gotError,
+                            callbackArgs=(piece, peer), errbackArgs=(piece, peer))
 
-    def _gotError(self, err, piece, peer):
+        self.outstanding -= 1
+        self.peerlist.append(peer)
+        self.getPieces()
+
+    def _getError(self, err, piece, peer):
         """Peer failed, try again."""
+        self.outstanding -= 1
+        self.peerlist.append(peer)
+        self.completePieces[piece] = False
+        self.getPieces()
         log.err(err)
 
+    def _gotPiece(self, response, piece, peer):
+        """Process the retrieved piece from the peer."""
+        if ((self.pieces and response != self.pieces[piece]) or
+            (len(self.pieces) == 0 and response == self.hash.expected())):
+            # Hash doesn't match
+            peer.hashError()
+            self.completePieces[piece] = False
+        elif self.pieces:
+            # Successfully completed one of several pieces
+            self.completePieces[piece] = True
+            while (self.nextFinish < len(self.completePieces) and
+                   self.completePieces[self.nextFinish] == True):
+                self.nextFinish += 1
+                self.stream.updateAvailable(PIECE_SIZE)
+        else:
+            # Whole download (only one piece) is complete
+            self.completePieces[piece] = True
+            self.stream.updateAvailable(2**30)
+
+        self.getPieces()
+
+    def _gotError(self, err, piece, peer):
+        """Piece download failed, try again."""
+        log.err(err)
+        self.completePieces[piece] = False
+        self.getPieces()
         
 class PeerManager:
     """Manage a set of peers and the requests to them.
     
+    @type cache_dir: L{twisted.python.filepath.FilePath}
+    @ivar cache_dir: the directory to use for storing all files
+    @type dht: L{interfaces.IDHT}
+    @ivar dht: the DHT instance
     @type clients: C{dictionary}
     @ivar clients: the available peers that have been previously contacted
     """