Add statistics reporting to the main program (untested).
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
index 5df37fe4b11594c4eb272a287b73a090390d5d48..9108fde198128ad1531c9ba7861e2962255ab1ce 100644 (file)
@@ -141,11 +141,13 @@ class StreamToFile:
     @ivar position: the current file position to write the next data to
     @type length: C{int}
     @ivar length: the position in the file to not write beyond
+    @type inform: C{method}
+    @ivar inform: a function to call with the length of data received
     @type doneDefer: L{twisted.internet.defer.Deferred}
     @ivar doneDefer: the deferred that will fire when done writing
     """
     
-    def __init__(self, inputStream, outFile, start = 0, length = None):
+    def __init__(self, inputStream, outFile, start = 0, length = None, inform = None):
         """Initializes the file.
         
         @type inputStream: L{twisted.web2.stream.IByteStream}
@@ -158,6 +160,9 @@ class StreamToFile:
         @type length: C{int}
         @param length: the maximum amount of data to write to the file
             (optional, defaults to not limiting the writing to the file
+        @type inform: C{method}
+        @param inform: a function to call with the length of data received
+            (optional, defaults to calling nothing)
         """
         self.stream = inputStream
         self.outFile = outFile
@@ -166,6 +171,7 @@ class StreamToFile:
         self.length = None
         if length is not None:
             self.length = start + length
+        self.inform = inform
         self.doneDefer = None
         
     def run(self):
@@ -195,6 +201,7 @@ class StreamToFile:
         self.outFile.write(data)
         self.hash.update(data)
         self.position += len(data)
+        self.inform(len(data))
         
     def _done(self, result):
         """Return the result."""
@@ -564,12 +571,18 @@ class FileDownload:
         else:
             # Read the response stream to the file
             log.msg('Streaming piece %d from peer %r' % (piece, peer))
+            def statUpdate(bytes, stats = self.manager.stats, mirror = peer.mirror):
+                stats.receivedBytes(bytes, mirror)
             if response.code == 206:
-                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
+                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
+                                  PIECE_SIZE, statUpdate).run()
             else:
-                df = StreamToFile(response.stream, self.file).run()
-            df.addCallbacks(self._gotPiece, self._gotError,
-                            callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+                df = StreamToFile(response.stream, self.file,
+                                  inform = statUpdate).run()
+            reactor.callLater(0, df.addCallbacks,
+                              *(self._gotPiece, self._gotError),
+                              **{'callbackArgs': (piece, peer),
+                                 'errbackArgs': (piece, peer)})
 
         self.outstanding -= 1
         self.peerlist.append(peer)
@@ -617,17 +630,20 @@ class PeerManager:
     @ivar cache_dir: the directory to use for storing all files
     @type dht: L{interfaces.IDHT}
     @ivar dht: the DHT instance
+    @type stats: L{stats.StatsLogger}
+    @ivar stats: the statistics logger to record sent data to
     @type clients: C{dictionary}
     @ivar clients: the available peers that have been previously contacted
     """
 
-    def __init__(self, cache_dir, dht):
+    def __init__(self, cache_dir, dht, stats):
         """Initialize the instance."""
         self.cache_dir = cache_dir
         self.cache_dir.restat(False)
         if not self.cache_dir.exists():
             self.cache_dir.makedirs()
         self.dht = dht
+        self.stats = stats
         self.clients = {}
         
     def get(self, hash, mirror, peers = [], method="GET", modtime=None):