]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/PeerManager.py
Make the downloader statistics work.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
index 5df37fe4b11594c4eb272a287b73a090390d5d48..737669751da78ee9a3242996aed50e41dc5aa18b 100644 (file)
@@ -498,8 +498,7 @@ class FileDownload:
             if parsed[0] == "http":
                 site = splitHostPort(parsed[0], parsed[1])
                 self.mirror_path = urlunparse(('', '') + parsed[2:])
-                peer = self.manager.getPeer(site)
-                peer.mirror = True
+                peer = self.manager.getPeer(site, mirror = True)
                 self.peerlist.append(peer)
         
         # Special case if there's only one good peer left
@@ -565,11 +564,14 @@ class FileDownload:
             # Read the response stream to the file
             log.msg('Streaming piece %d from peer %r' % (piece, peer))
             if response.code == 206:
-                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
+                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
+                                  PIECE_SIZE).run()
             else:
                 df = StreamToFile(response.stream, self.file).run()
-            df.addCallbacks(self._gotPiece, self._gotError,
-                            callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+            reactor.callLater(0, df.addCallbacks,
+                              *(self._gotPiece, self._gotError),
+                              **{'callbackArgs': (piece, peer),
+                                 'errbackArgs': (piece, peer)})
 
         self.outstanding -= 1
         self.peerlist.append(peer)
@@ -617,17 +619,20 @@ class PeerManager:
     @ivar cache_dir: the directory to use for storing all files
     @type dht: L{interfaces.IDHT}
     @ivar dht: the DHT instance
+    @type stats: L{stats.StatsLogger}
+    @ivar stats: the statistics logger to record sent data to
     @type clients: C{dictionary}
     @ivar clients: the available peers that have been previously contacted
     """
 
-    def __init__(self, cache_dir, dht):
+    def __init__(self, cache_dir, dht, stats):
         """Initialize the instance."""
         self.cache_dir = cache_dir
         self.cache_dir.restat(False)
         if not self.cache_dir.exists():
             self.cache_dir.makedirs()
         self.dht = dht
+        self.stats = stats
         self.clients = {}
         
     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
@@ -653,8 +658,7 @@ class PeerManager:
             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
             site = splitHostPort(parsed[0], parsed[1])
             path = urlunparse(('', '') + parsed[2:])
-            peer = self.getPeer(site)
-            peer.mirror = True
+            peer = self.getPeer(site, mirror = True)
             return peer.get(path, method, modtime)
 #        elif len(peers) == 1:
 #            site = uncompact(peers[0]['c'])
@@ -666,14 +670,18 @@ class PeerManager:
             tmpfile = self.cache_dir.child(hash.hexexpected())
             return FileDownload(self, hash, mirror, peers, tmpfile).run()
         
-    def getPeer(self, site):
+    def getPeer(self, site, mirror = False):
         """Create a new peer if necessary and return it.
         
         @type site: (C{string}, C{int})
         @param site: the IP address and port of the peer
+        @param mirror: whether the peer is actually a mirror
+            (optional, defaults to False)
         """
         if site not in self.clients:
-            self.clients[site] = Peer(site[0], site[1])
+            self.clients[site] = Peer(site[0], site[1], self.stats)
+            if mirror:
+                self.clients[site].mirror = True
         return self.clients[site]
     
     def close(self):