]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/PeerManager.py
Remove some unnecessary log messages and use better Exceptions.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
index 9108fde198128ad1531c9ba7861e2962255ab1ce..d4c7aace9ad04dce45fceb1ac944c61803a7cc96 100644 (file)
@@ -19,6 +19,10 @@ from Hash import PIECE_SIZE
 from apt_p2p_Khashmir.bencode import bdecode
 from apt_p2p_conf import config
 
+
+class PeerError(Exception):
+    """An error occurred downloading from peers."""
+    
 class GrowingFileStream(stream.FileStream):
     """Modified to stream data from a file as it becomes available.
     
@@ -141,13 +145,11 @@ class StreamToFile:
     @ivar position: the current file position to write the next data to
     @type length: C{int}
     @ivar length: the position in the file to not write beyond
-    @type inform: C{method}
-    @ivar inform: a function to call with the length of data received
     @type doneDefer: L{twisted.internet.defer.Deferred}
     @ivar doneDefer: the deferred that will fire when done writing
     """
     
-    def __init__(self, inputStream, outFile, start = 0, length = None, inform = None):
+    def __init__(self, inputStream, outFile, start = 0, length = None):
         """Initializes the file.
         
         @type inputStream: L{twisted.web2.stream.IByteStream}
@@ -160,9 +162,6 @@ class StreamToFile:
         @type length: C{int}
         @param length: the maximum amount of data to write to the file
             (optional, defaults to not limiting the writing to the file
-        @type inform: C{method}
-        @param inform: a function to call with the length of data received
-            (optional, defaults to calling nothing)
         """
         self.stream = inputStream
         self.outFile = outFile
@@ -171,7 +170,6 @@ class StreamToFile:
         self.length = None
         if length is not None:
             self.length = start + length
-        self.inform = inform
         self.doneDefer = None
         
     def run(self):
@@ -187,10 +185,10 @@ class StreamToFile:
     def _gotData(self, data):
         """Process the received data."""
         if self.outFile.closed:
-            raise Exception, "outFile was unexpectedly closed"
+            raise PeerError, "outFile was unexpectedly closed"
         
         if data is None:
-            raise Exception, "Data is None?"
+            raise PeerError, "Data is None?"
         
         # Make sure we don't go too far
         if self.length is not None and self.position + len(data) > self.length:
@@ -201,7 +199,6 @@ class StreamToFile:
         self.outFile.write(data)
         self.hash.update(data)
         self.position += len(data)
-        self.inform(len(data))
         
     def _done(self, result):
         """Return the result."""
@@ -405,7 +402,6 @@ class FileDownload:
 
         if self.pieces is None:
             # Send a request to one or more peers
-            log.msg('Checking for a peer to request piece hashes from')
             for site in self.peers:
                 if self.peers[site].get('failed', False) != True:
                     log.msg('Sending a piece hash request to %r' % (site, ))
@@ -419,7 +415,6 @@ class FileDownload:
                     if self.outstanding >= 4:
                         break
         
-        log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
         if self.pieces is None and self.outstanding <= 0:
             # Continue without the piece hashes
             log.msg('Could not retrieve the piece hashes from the peers')
@@ -431,7 +426,6 @@ class FileDownload:
         log.msg('Got a piece hash response %d from %r' % (response.code, site))
         if response.code != 200:
             # Request failed, try a different peer
-            log.msg('Did not like response %d from %r' % (response.code, site))
             self.getPeerPieces(key, site)
         else:
             # Read the response stream to a string
@@ -505,8 +499,7 @@ class FileDownload:
             if parsed[0] == "http":
                 site = splitHostPort(parsed[0], parsed[1])
                 self.mirror_path = urlunparse(('', '') + parsed[2:])
-                peer = self.manager.getPeer(site)
-                peer.mirror = True
+                peer = self.manager.getPeer(site, mirror = True)
                 self.peerlist.append(peer)
         
         # Special case if there's only one good peer left
@@ -529,11 +522,9 @@ class FileDownload:
     #{ Downloading the pieces
     def getPieces(self):
         """Download the next pieces from the peers."""
-        log.msg('Checking for more piece requests to send')
         self.sort()
         piece = self.nextFinish
         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
-            log.msg('Checking piece %d' % piece)
             if self.completePieces[piece] == False:
                 # Send a request to the highest ranked peer
                 peer = self.peerlist.pop()
@@ -551,7 +542,6 @@ class FileDownload:
                                      'errbackArgs': (piece, peer)})
             piece += 1
                 
-        log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
         # Check if we're done
         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
             log.msg('We seem to be done with all pieces')
@@ -571,14 +561,11 @@ class FileDownload:
         else:
             # Read the response stream to the file
             log.msg('Streaming piece %d from peer %r' % (piece, peer))
-            def statUpdate(bytes, stats = self.manager.stats, mirror = peer.mirror):
-                stats.receivedBytes(bytes, mirror)
             if response.code == 206:
                 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
-                                  PIECE_SIZE, statUpdate).run()
+                                  PIECE_SIZE).run()
             else:
-                df = StreamToFile(response.stream, self.file,
-                                  inform = statUpdate).run()
+                df = StreamToFile(response.stream, self.file).run()
             reactor.callLater(0, df.addCallbacks,
                               *(self._gotPiece, self._gotError),
                               **{'callbackArgs': (piece, peer),
@@ -669,8 +656,7 @@ class PeerManager:
             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
             site = splitHostPort(parsed[0], parsed[1])
             path = urlunparse(('', '') + parsed[2:])
-            peer = self.getPeer(site)
-            peer.mirror = True
+            peer = self.getPeer(site, mirror = True)
             return peer.get(path, method, modtime)
 #        elif len(peers) == 1:
 #            site = uncompact(peers[0]['c'])
@@ -682,14 +668,18 @@ class PeerManager:
             tmpfile = self.cache_dir.child(hash.hexexpected())
             return FileDownload(self, hash, mirror, peers, tmpfile).run()
         
-    def getPeer(self, site):
+    def getPeer(self, site, mirror = False):
         """Create a new peer if necessary and return it.
         
         @type site: (C{string}, C{int})
         @param site: the IP address and port of the peer
+        @param mirror: whether the peer is actually a mirror
+            (optional, defaults to False)
         """
         if site not in self.clients:
-            self.clients[site] = Peer(site[0], site[1])
+            self.clients[site] = Peer(site[0], site[1], self.stats)
+            if mirror:
+                self.clients[site].mirror = True
         return self.clients[site]
     
     def close(self):