Make the downloader statistics work.
authorCameron Dale <camrdale@gmail.com>
Wed, 16 Apr 2008 03:36:39 +0000 (20:36 -0700)
committerCameron Dale <camrdale@gmail.com>
Wed, 16 Apr 2008 03:36:39 +0000 (20:36 -0700)
The previous (untested) commit was not working at all, this one now does,
though the TODO mentions some future work.

TODO
apt_p2p/HTTPDownloader.py
apt_p2p/HTTPServer.py
apt_p2p/PeerManager.py
apt_p2p/apt_p2p.py
apt_p2p/stats.py
apt_p2p/util.py
apt_p2p_Khashmir/util.py

diff --git a/TODO b/TODO
index dd9524a..7a586fe 100644 (file)
--- a/TODO
+++ b/TODO
@@ -15,6 +15,21 @@ distributions. They need to be dealt with properly by
 adding them to the tracking done by the AptPackages module.
 
 
+Improve the downloaded and uploaded data measurements.
+
+There are 2 places that this data is measured: for statistics, and for
+limiting the upload bandwidth. They both have deficiencies as they
+sometimes miss the headers or the requests sent out. The upload
+bandwidth calculation only considers the stream in the upload and not
+the headers sent, and it also doesn't consider the upload bandwidth
+from requesting downloads from peers (though that may be a good thing).
+The statistics calculations for downloads include the headers of
+downloaded files, but not the requests received from peers for upload
+files. The statistics for uploaded data only includes the files sent
+and not the headers, and also misses the requests for downloads sent to
+other peers.
+
+
 Consider storing deltas of packages.
 
 Instead of downloading full package files when a previous version of
index 13cb237..f1488e5 100644 (file)
@@ -17,6 +17,24 @@ from zope.interface import implements
 
 from apt_p2p_conf import version
 
+class LoggingHTTPClientProtocol(HTTPClientProtocol):
+    """A modified client protocol that logs the number of bytes received."""
+    
+    def __init__(self, factory, stats = None, mirror = False):
+        HTTPClientProtocol.__init__(self, factory)
+        self.stats = stats
+        self.mirror = mirror
+    
+    def lineReceived(self, line):
+        if self.stats:
+            self.stats.receivedBytes(len(line) + 2, self.mirror)
+        HTTPClientProtocol.lineReceived(self, line)
+
+    def rawDataReceived(self, data):
+        if self.stats:
+            self.stats.receivedBytes(len(data), self.mirror)
+        HTTPClientProtocol.rawDataReceived(self, data)
+
 class Peer(ClientFactory):
     """A manager for all HTTP requests to a single peer.
     
@@ -28,9 +46,10 @@ class Peer(ClientFactory):
 
     implements(IHTTPClientManager)
     
-    def __init__(self, host, port=80):
+    def __init__(self, host, port = 80, stats = None):
         self.host = host
         self.port = port
+        self.stats = stats
         self.mirror = False
         self.rank = 0.5
         self.busy = False
@@ -55,7 +74,8 @@ class Peer(ClientFactory):
         """Connect to the peer."""
         assert self.closed and not self.connecting
         self.connecting = True
-        d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
+        d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
+                                   stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
         d.addCallbacks(self.connected, self.connectionError)
 
     def connected(self, proto):
index 3d43fa0..87e235f 100644 (file)
@@ -159,16 +159,16 @@ class UploadThrottlingProtocol(ThrottlingProtocol):
     def write(self, data):
         if self.throttle:
             ThrottlingProtocol.write(self, data)
-            if stats:
-                stats.sentBytes(len(data))
+            if self.stats:
+                self.stats.sentBytes(len(data))
         else:
             ProtocolWrapper.write(self, data)
 
     def writeSequence(self, seq):
         if self.throttle:
             ThrottlingProtocol.writeSequence(self, seq)
-            if stats:
-                stats.sentBytes(reduce(operator.add, map(len, seq)))
+            if self.stats:
+                self.stats.sentBytes(reduce(operator.add, map(len, seq)))
         else:
             ProtocolWrapper.writeSequence(self, seq)
 
@@ -220,7 +220,8 @@ class TopLevel(resource.Resource):
                                                   'betweenRequestsTimeOut': 60})
             self.factory = ThrottlingFactory(self.factory, writeLimit = self.uploadLimit)
             self.factory.protocol = UploadThrottlingProtocol
-            self.factory.protocol.stats = self.manager.stats
+            if self.manager:
+                self.factory.protocol.stats = self.manager.stats
         return self.factory
 
     def render(self, ctx):
index 9108fde..7376697 100644 (file)
@@ -141,13 +141,11 @@ class StreamToFile:
     @ivar position: the current file position to write the next data to
     @type length: C{int}
     @ivar length: the position in the file to not write beyond
-    @type inform: C{method}
-    @ivar inform: a function to call with the length of data received
     @type doneDefer: L{twisted.internet.defer.Deferred}
     @ivar doneDefer: the deferred that will fire when done writing
     """
     
-    def __init__(self, inputStream, outFile, start = 0, length = None, inform = None):
+    def __init__(self, inputStream, outFile, start = 0, length = None):
         """Initializes the file.
         
         @type inputStream: L{twisted.web2.stream.IByteStream}
@@ -160,9 +158,6 @@ class StreamToFile:
         @type length: C{int}
         @param length: the maximum amount of data to write to the file
             (optional, defaults to not limiting the writing to the file
-        @type inform: C{method}
-        @param inform: a function to call with the length of data received
-            (optional, defaults to calling nothing)
         """
         self.stream = inputStream
         self.outFile = outFile
@@ -171,7 +166,6 @@ class StreamToFile:
         self.length = None
         if length is not None:
             self.length = start + length
-        self.inform = inform
         self.doneDefer = None
         
     def run(self):
@@ -201,7 +195,6 @@ class StreamToFile:
         self.outFile.write(data)
         self.hash.update(data)
         self.position += len(data)
-        self.inform(len(data))
         
     def _done(self, result):
         """Return the result."""
@@ -505,8 +498,7 @@ class FileDownload:
             if parsed[0] == "http":
                 site = splitHostPort(parsed[0], parsed[1])
                 self.mirror_path = urlunparse(('', '') + parsed[2:])
-                peer = self.manager.getPeer(site)
-                peer.mirror = True
+                peer = self.manager.getPeer(site, mirror = True)
                 self.peerlist.append(peer)
         
         # Special case if there's only one good peer left
@@ -571,14 +563,11 @@ class FileDownload:
         else:
             # Read the response stream to the file
             log.msg('Streaming piece %d from peer %r' % (piece, peer))
-            def statUpdate(bytes, stats = self.manager.stats, mirror = peer.mirror):
-                stats.receivedBytes(bytes, mirror)
             if response.code == 206:
                 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
-                                  PIECE_SIZE, statUpdate).run()
+                                  PIECE_SIZE).run()
             else:
-                df = StreamToFile(response.stream, self.file,
-                                  inform = statUpdate).run()
+                df = StreamToFile(response.stream, self.file).run()
             reactor.callLater(0, df.addCallbacks,
                               *(self._gotPiece, self._gotError),
                               **{'callbackArgs': (piece, peer),
@@ -669,8 +658,7 @@ class PeerManager:
             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
             site = splitHostPort(parsed[0], parsed[1])
             path = urlunparse(('', '') + parsed[2:])
-            peer = self.getPeer(site)
-            peer.mirror = True
+            peer = self.getPeer(site, mirror = True)
             return peer.get(path, method, modtime)
 #        elif len(peers) == 1:
 #            site = uncompact(peers[0]['c'])
@@ -682,14 +670,18 @@ class PeerManager:
             tmpfile = self.cache_dir.child(hash.hexexpected())
             return FileDownload(self, hash, mirror, peers, tmpfile).run()
         
-    def getPeer(self, site):
+    def getPeer(self, site, mirror = False):
         """Create a new peer if necessary and return it.
         
         @type site: (C{string}, C{int})
         @param site: the IP address and port of the peer
+        @param mirror: whether the peer is actually a mirror
+            (optional, defaults to False)
         """
         if site not in self.clients:
-            self.clients[site] = Peer(site[0], site[1])
+            self.clients[site] = Peer(site[0], site[1], self.stats)
+            if mirror:
+                self.clients[site].mirror = True
         return self.clients[site]
     
     def close(self):
index 889ebd9..f88fd79 100644 (file)
@@ -88,8 +88,13 @@ class AptP2P:
         self.mirrors = MirrorManager(self.cache_dir)
         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
         self.my_contact = None
+        reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
 
-    #{ DHT maintenance
+    #{ Maintenance
+    def shutdown(self):
+        self.stats.save()
+        self.db.close()
+        
     def joinComplete(self, result):
         """Complete the DHT join process and determine our download information.
         
index 640c57f..06e291b 100644 (file)
@@ -4,7 +4,7 @@
 from datetime import datetime, timedelta
 from StringIO import StringIO
 
-from util import byte_format
+from util import uncompact, byte_format
 
 class StatsLogger:
     """Store the statistics for the Khashmir DHT.
@@ -39,8 +39,7 @@ class StatsLogger:
         """
         # Database
         self.db = db
-        self.lastDBUpdate = datetime.now()
-        self.hashes, self.files = self.db.dbStats()
+        self.hashes, self.files = 0, 0
         
         # Transport
         self.mirrorDown = 0L
@@ -61,17 +60,6 @@ class StatsLogger:
                  }
         self.db.saveStats(stats)
     
-    def dbStats(self):
-        """Collect some statistics about the database.
-        
-        @rtype: (C{int}, C{int})
-        @return: the number of keys and values in the database
-        """
-        if datetime.now() - self.lastDBUpdate > timedelta(minutes = 1):
-            self.lastDBUpdate = datetime.now()
-            self.hashes, self.files = self.db.keyStats()
-        return (self.hashes, self.files)
-    
     def formatHTML(self, contactAddress):
         """Gather statistics for the DHT and format them for display in a browser.
         
@@ -79,7 +67,7 @@ class StatsLogger:
         @rtype: C{string}
         @return: the stats, formatted for display in the body of an HTML page
         """
-        self.dbStats()
+        self.hashes, self.files = self.db.dbStats()
 
         out = StringIO()
         out.write('<h2>Downloader Statistics</h2>\n')
@@ -89,7 +77,7 @@ class StatsLogger:
         # General
         out.write("<table border='1' cellpadding='4px'>\n")
         out.write("<tr><th><h3>General</h3></th><th>Value</th></tr>\n")
-        out.write("<tr title='Contact address for this peer'><td>Contact</td><td>" + str(contactAdress) + '</td></tr>\n')
+        out.write("<tr title='Contact address for this peer'><td>Contact</td><td>" + str(uncompact(contactAddress)) + '</td></tr>\n')
         out.write("</table>\n")
         out.write('</td><td>\n')
         
@@ -110,22 +98,22 @@ class StatsLogger:
         out.write("<td title='Amount uploaded to peers'>" + byte_format(self.peerUp) + '</td></tr>')
         out.write("<tr><td title='Since the program was last restarted'>Session Ratio</td>")
         out.write("<td title='Percent of download from mirrors'>%0.2f%%</td>" %
-                  (float(self.mirrorDown) / float(self.mirrorDown + self.peerDown), ))
+                  (100.0 * float(self.mirrorDown) / float(max(self.mirrorDown + self.peerDown, 1)), ))
         out.write("<td title='Percent of download from peers'>%0.2f%%</td>" %
-                  (float(self.peerDown) / float(self.mirrorDown + self.peerDown), ))
-        out.write("<td title='Percent uploaded to peers compared with downloaded from peers'>%0.2f%%</td></tr>" %
-                  (float(self.peerUp) / float(self.peerDown), ))
+                  (100.0 * float(self.peerDown) / float(max(self.mirrorDown + self.peerDown, 1)), ))
+        out.write("<td title='Percent uploaded to peers compared with all downloaded'>%0.2f%%</td></tr>" %
+                  (100.0 * float(self.peerUp) / float(max(self.mirrorDown + self.peerDown, 1)), ))
         out.write("<tr><td title='Since the program was installed'>All-Time</td>")
         out.write("<td title='Amount downloaded from mirrors'>" + byte_format(self.mirrorAllDown) + '</td>')
         out.write("<td title='Amount downloaded from peers'>" + byte_format(self.peerAllDown) + '</td>')
         out.write("<td title='Amount uploaded to peers'>" + byte_format(self.peerAllUp) + '</td></tr>')
         out.write("<tr><td title='Since the program was installed'>All-Time Ratio</td>")
         out.write("<td title='Percent of download from mirrors'>%0.2f%%</td>" %
-                  (float(self.mirrorAllDown) / float(self.mirrorAllDown + self.peerAllDown), ))
+                  (100.0 * float(self.mirrorAllDown) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), ))
         out.write("<td title='Percent of download from peers'>%0.2f%%</td>" %
-                  (float(self.peerAllDown) / float(self.mirrorAllDown + self.peerAllDown), ))
-        out.write("<td title='Percent uploaded to peers compared with downloaded from peers'>%0.2f%%</td></tr>" %
-                  (float(self.peerAllUp) / float(self.peerAllDown), ))
+                  (100.0 * float(self.peerAllDown) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), ))
+        out.write("<td title='Percent uploaded to peers compared with all downloaded'>%0.2f%%</td></tr>" %
+                  (100.0 * float(self.peerAllUp) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), ))
         out.write("</table>\n")
         out.write("</td></tr>\n")
         out.write("</table>\n")
index 3d1a50e..ec0287c 100644 (file)
@@ -160,11 +160,15 @@ def byte_format(s):
     @param s: the number of bytes
     @rtype: C{string}
     @return: the formatted size with appropriate units
-    
     """
-    
-    if (s < 1024):
-        r = str(s) + 'B'
+    if (s < 1):
+        r = str(int(s*1000.0)/1000.0) + 'B'
+    elif (s < 10):
+        r = str(int(s*100.0)/100.0) + 'B'
+    elif (s < 102):
+        r = str(int(s*10.0)/10.0) + 'B'
+    elif (s < 1024):
+        r = str(int(s)) + 'B'
     elif (s < 10485):
         r = str(int((s/1024.0)*100.0)/100.0) + 'KiB'
     elif (s < 104857):
index 39b4ce0..55bd45a 100644 (file)
@@ -69,11 +69,15 @@ def byte_format(s):
     @param s: the number of bytes
     @rtype: C{string}
     @return: the formatted size with appropriate units
-    
     """
-    
-    if (s < 1024):
-        r = str(s) + 'B'
+    if (s < 1):
+        r = str(int(s*1000.0)/1000.0) + 'B'
+    elif (s < 10):
+        r = str(int(s*100.0)/100.0) + 'B'
+    elif (s < 102):
+        r = str(int(s*10.0)/10.0) + 'B'
+    elif (s < 1024):
+        r = str(int(s)) + 'B'
     elif (s < 10485):
         r = str(int((s/1024.0)*100.0)/100.0) + 'KiB'
     elif (s < 104857):