]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/PeerManager.py
HTTP client no longer keeps a response queue of requests.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
index d4c7aace9ad04dce45fceb1ac944c61803a7cc96..deeb668a54c4ae670a4e501c5e106fe7b6a300ff 100644 (file)
@@ -8,7 +8,7 @@ from binascii import b2a_hex, a2b_hex
 import sha
 
 from twisted.internet import reactor, defer
-from twisted.python import log
+from twisted.python import log, filepath
 from twisted.trial import unittest
 from twisted.web2 import stream
 from twisted.web2.http import Response, splitHostPort
@@ -91,11 +91,13 @@ class GrowingFileStream(stream.FileStream):
                     deferred.callback(b)
                 else:
                     # We're done
+                    self._close()
                     deferred = self.deferred
                     self.deferred = None
                     deferred.callback(None)
             else:
                 # We're done
+                self._close()
                 deferred = self.deferred
                 self.deferred = None
                 deferred.callback(None)
@@ -112,6 +114,7 @@ class GrowingFileStream(stream.FileStream):
         # If we don't have any available, we're done or deferred
         if readSize <= 0:
             if self.finished:
+                self._close()
                 return None
             else:
                 self.deferred = defer.Deferred()
@@ -124,6 +127,7 @@ class GrowingFileStream(stream.FileStream):
         if not bytesRead:
             # End of file was reached, we're done or deferred
             if self.finished:
+                self._close()
                 return None
             else:
                 self.deferred = defer.Deferred()
@@ -132,6 +136,12 @@ class GrowingFileStream(stream.FileStream):
             self.position += bytesRead
             return b
 
+    def _close(self):
+        """Close the temporary file and remove it."""
+        self.f.close()
+        filepath.FilePath(self.f.name).remove()
+        self.f = None
+        
 class StreamToFile:
     """Save a stream to a partial file and hash it.
     
@@ -139,8 +149,8 @@ class StreamToFile:
     @ivar stream: the input stream being read
     @type outFile: L{twisted.python.filepath.FilePath}
     @ivar outFile: the file being written
-    @type hash: C{sha1}
-    @ivar hash: the hash object for the data
+    @type hasher: hashing object, e.g. C{sha1}
+    @ivar hasher: the hash object for the data
     @type position: C{int}
     @ivar position: the current file position to write the next data to
     @type length: C{int}
@@ -149,9 +159,11 @@ class StreamToFile:
     @ivar doneDefer: the deferred that will fire when done writing
     """
     
-    def __init__(self, inputStream, outFile, start = 0, length = None):
+    def __init__(self, hasher, inputStream, outFile, start = 0, length = None):
         """Initializes the file.
         
+        @type hasher: hashing object, e.g. C{sha1}
+        @param hasher: the hash object for the data
         @type inputStream: L{twisted.web2.stream.IByteStream}
         @param inputStream: the input stream to read from
         @type outFile: L{twisted.python.filepath.FilePath}
@@ -165,7 +177,7 @@ class StreamToFile:
         """
         self.stream = inputStream
         self.outFile = outFile
-        self.hash = sha.new()
+        self.hasher = hasher
         self.position = start
         self.length = None
         if length is not None:
@@ -177,7 +189,6 @@ class StreamToFile:
 
         @rtype: L{twisted.internet.defer.Deferred}
         """
-        log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
         self.doneDefer = stream.readStream(self.stream, self._gotData)
         self.doneDefer.addCallbacks(self._done, self._error)
         return self.doneDefer
@@ -197,13 +208,12 @@ class StreamToFile:
         # Write and hash the streamed data
         self.outFile.seek(self.position)
         self.outFile.write(data)
-        self.hash.update(data)
+        self.hasher.update(data)
         self.position += len(data)
         
     def _done(self, result):
         """Return the result."""
-        log.msg('Streaming is complete')
-        return self.hash.digest()
+        return self.hasher.digest()
     
     def _error(self, err):
         """Log the error."""
@@ -266,6 +276,7 @@ class FileDownload:
         self.compact_peers = compact_peers
         
         self.path = '/~/' + quote_plus(hash.expected())
+        self.defer = None
         self.mirror_path = None
         self.pieces = None
         self.started = False
@@ -508,11 +519,6 @@ class FileDownload:
 #            self.defer.callback(self.peerlist[0].get(self.path))
 #            return
         
-        # Start sending the return file
-        self.stream = GrowingFileStream(self.file, self.hash.expSize)
-        resp = Response(200, {}, self.stream)
-        self.defer.callback(resp)
-
         # Begin to download the pieces
         self.outstanding = 0
         self.nextFinish = 0
@@ -532,10 +538,13 @@ class FileDownload:
                 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
                 
                 self.outstanding += 1
+                path = self.path
                 if peer.mirror:
-                    df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+                    path = self.mirror_path
+                if len(self.completePieces) > 1:
+                    df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
                 else:
-                    df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+                    df = peer.get(path)
                 reactor.callLater(0, df.addCallbacks,
                                   *(self._getPiece, self._getError),
                                   **{'callbackArgs': (piece, peer),
@@ -544,12 +553,11 @@ class FileDownload:
                 
         # Check if we're done
         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
-            log.msg('We seem to be done with all pieces')
+            log.msg('Download is complete for %s' % self.path)
             self.stream.allAvailable()
     
     def _getPiece(self, response, piece, peer):
         """Process the retrieved headers from the peer."""
-        log.msg('Got response for piece %d from peer %r' % (piece, peer))
         if ((len(self.completePieces) > 1 and response.code != 206) or
             (response.code not in (200, 206))):
             # Request failed, try a different peer
@@ -559,13 +567,27 @@ class FileDownload:
             if response.stream and response.stream.length:
                 stream.readAndDiscard(response.stream)
         else:
+            if self.defer:
+                # Start sending the return file
+                df = self.defer
+                self.defer = None
+                self.stream = GrowingFileStream(self.file, self.hash.expSize)
+
+                # Get the headers from the peer's response
+                headers = {}
+                if response.headers.hasHeader('last-modified'):
+                    headers['last-modified'] = response.headers.getHeader('last-modified')
+                resp = Response(200, headers, self.stream)
+                df.callback(resp)
+
             # Read the response stream to the file
             log.msg('Streaming piece %d from peer %r' % (piece, peer))
             if response.code == 206:
-                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
-                                  PIECE_SIZE).run()
+                df = StreamToFile(self.hash.newPieceHasher(), response.stream,
+                                  self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
             else:
-                df = StreamToFile(response.stream, self.file).run()
+                df = StreamToFile(self.hash.newHasher(), response.stream,
+                                  self.file).run()
             reactor.callLater(0, df.addCallbacks,
                               *(self._gotPiece, self._gotError),
                               **{'callbackArgs': (piece, peer),
@@ -586,7 +608,6 @@ class FileDownload:
 
     def _gotPiece(self, response, piece, peer):
         """Process the retrieved piece from the peer."""
-        log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
         if self.pieces[piece] and response != self.pieces[piece]:
             # Hash doesn't match
             log.msg('Hash error for piece %d from peer %r' % (piece, peer))