DHT stats now include completed actions and various elapsed times for actions.
authorCameron Dale <camrdale@gmail.com>
Fri, 18 Apr 2008 21:03:26 +0000 (14:03 -0700)
committerCameron Dale <camrdale@gmail.com>
Fri, 18 Apr 2008 21:03:26 +0000 (14:03 -0700)
apt_p2p/stats.py
apt_p2p_Khashmir/actions.py
apt_p2p_Khashmir/khashmir.py
apt_p2p_Khashmir/krpc.py
apt_p2p_Khashmir/stats.py

index 06e291b..b76e389 100644 (file)
@@ -95,24 +95,24 @@ class StatsLogger:
         out.write("<tr><td title='Since the program was last restarted'>This Session</td>")
         out.write("<td title='Amount downloaded from mirrors'>" + byte_format(self.mirrorDown) + '</td>')
         out.write("<td title='Amount downloaded from peers'>" + byte_format(self.peerDown) + '</td>')
-        out.write("<td title='Amount uploaded to peers'>" + byte_format(self.peerUp) + '</td></tr>')
+        out.write("<td title='Amount uploaded to peers'>" + byte_format(self.peerUp) + '</td></tr>\n')
         out.write("<tr><td title='Since the program was last restarted'>Session Ratio</td>")
         out.write("<td title='Percent of download from mirrors'>%0.2f%%</td>" %
                   (100.0 * float(self.mirrorDown) / float(max(self.mirrorDown + self.peerDown, 1)), ))
         out.write("<td title='Percent of download from peers'>%0.2f%%</td>" %
                   (100.0 * float(self.peerDown) / float(max(self.mirrorDown + self.peerDown, 1)), ))
-        out.write("<td title='Percent uploaded to peers compared with all downloaded'>%0.2f%%</td></tr>" %
+        out.write("<td title='Percent uploaded to peers compared with all downloaded'>%0.2f%%</td></tr>\n" %
                   (100.0 * float(self.peerUp) / float(max(self.mirrorDown + self.peerDown, 1)), ))
         out.write("<tr><td title='Since the program was installed'>All-Time</td>")
         out.write("<td title='Amount downloaded from mirrors'>" + byte_format(self.mirrorAllDown) + '</td>')
         out.write("<td title='Amount downloaded from peers'>" + byte_format(self.peerAllDown) + '</td>')
-        out.write("<td title='Amount uploaded to peers'>" + byte_format(self.peerAllUp) + '</td></tr>')
+        out.write("<td title='Amount uploaded to peers'>" + byte_format(self.peerAllUp) + '</td></tr>\n')
         out.write("<tr><td title='Since the program was installed'>All-Time Ratio</td>")
         out.write("<td title='Percent of download from mirrors'>%0.2f%%</td>" %
                   (100.0 * float(self.mirrorAllDown) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), ))
         out.write("<td title='Percent of download from peers'>%0.2f%%</td>" %
                   (100.0 * float(self.peerAllDown) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), ))
-        out.write("<td title='Percent uploaded to peers compared with all downloaded'>%0.2f%%</td></tr>" %
+        out.write("<td title='Percent uploaded to peers compared with all downloaded'>%0.2f%%</td></tr\n>" %
                   (100.0 * float(self.peerAllUp) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), ))
         out.write("</table>\n")
         out.write("</td></tr>\n")
index 5254fe4..95aef72 100644 (file)
@@ -3,6 +3,8 @@
 
 """Details of how to perform actions on remote peers."""
 
+from datetime import datetime
+
 from twisted.internet import reactor, defer
 from twisted.python import log
 
@@ -21,6 +23,8 @@ class ActionBase:
     @ivar config: the configuration variables for the DHT
     @type action: C{string}
     @ivar action: the name of the action to call on remote nodes
+    @type stats: L{stats.StatsLogger}
+    @ivar stats: the statistics modules to report to
     @type num: C{long}
     @ivar num: the target key in integer form
     @type queried: C{dictionary}
@@ -46,6 +50,8 @@ class ActionBase:
         the requests that are currently outstanding
     @type finished: C{boolean}
     @ivar finished: whether the action is done
+    @type started: C{datetime.datetime}
+    @ivar started: the time the action was started at
     @type sort: C{method}
     @ivar sort: used to sort nodes by their proximity to the target
     """
@@ -75,7 +81,8 @@ class ActionBase:
         self.target = target
         self.config = config
         self.action = action
-        stats.startedAction(action)
+        self.stats = stats
+        self.stats.startedAction(action)
         self.num = intify(target)
         self.queried = {}
         self.answered = {}
@@ -87,6 +94,7 @@ class ActionBase:
         self.outstanding = 0
         self.outstanding_results = 0
         self.finished = False
+        self.started = datetime.now()
     
         def sort(a, b, num=self.num):
             """Sort nodes relative to the ID we are looking for."""
@@ -101,6 +109,7 @@ class ActionBase:
     #{ Main operation
     def goWithNodes(self, nodes):
         """Start the action's process with a list of nodes to contact."""
+        self.started = datetime.now()
         for node in nodes:
             self.found[node.id] = node
         self.sortNodes()
@@ -236,6 +245,7 @@ class ActionBase:
 
     def generateResult(self, nodes):
         """Create the final result to return to the L{callback} function."""
+        self.stats.completedAction(self.action, self.started)
         return []
         
 
@@ -254,6 +264,7 @@ class FindNode(ActionBase):
     def generateResult(self):
         """Result is the K closest nodes to the target."""
         self.sortNodes()
+        self.stats.completedAction(self.action, self.started)
         return (self.sorted_nodes[:K], )
     
 
@@ -272,6 +283,7 @@ class FindValue(ActionBase):
     def generateResult(self):
         """Result is the nodes that have values, sorted by proximity to the key."""
         self.sortNodes()
+        self.stats.completedAction(self.action, self.started)
         return ([node for node in self.sorted_nodes if node.num_values > 0], )
     
 
@@ -313,6 +325,7 @@ class GetValue(ActionBase):
 
     def generateResult(self):
         """Results have all been returned, now send the empty list to end the action."""
+        self.stats.completedAction(self.action, self.started)
         return (self.target, [])
         
 
@@ -345,4 +358,5 @@ class StoreValue(ActionBase):
     
     def generateResult(self):
         """Return all the response IDs received."""
+        self.stats.completedAction(self.action, self.started)
         return (self.target, self.value, self.results.values())
index c5b199a..580f0f0 100644 (file)
@@ -198,14 +198,16 @@ class KhashmirBase(protocol.Factory):
             (datetime.now() - old.lastSeen) > 
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
-            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
+            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
                 """The pinged node never responded, so replace it."""
                 log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
                 log.err(err)
+                self.stats.completedAction('ping', start)
                 self.table.replaceStaleNode(oldnode, newnode)
             
-            def _notStaleNodeHandler(dict, old=old, self=self):
+            def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
                 """Got a pong from the old node, so update it."""
+                self.stats.completedAction('ping', start)
                 if dict['id'] == old.id:
                     self.table.justSeenNode(old.id)
             
@@ -228,17 +230,19 @@ class KhashmirBase(protocol.Factory):
             (optional, defaults to calling the callback with None)
         """
 
-        def _pongHandler(dict, node=node, self=self, callback=callback):
+        def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
             """Node responded properly, callback with response."""
             n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+            self.stats.completedAction('join', start)
             self.insertNode(n)
             if callback:
                 callback((dict['ip_addr'], dict['port']))
 
-        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
+        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
             """Error occurred, fail node and errback or callback with error."""
             log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
             log.err(err)
+            self.stats.completedAction('join', start)
             self.table.nodeFailed(node)
             if errback:
                 errback()
index dadf0f2..acc5fbf 100644 (file)
@@ -523,7 +523,8 @@ class KRPC:
         
         # Save the conclusion of the action
         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
-                         callbackArgs = (method, ), errbackArgs = (method, ))
+                         callbackArgs = (method, datetime.now()),
+                         errbackArgs = (method, datetime.now()))
 
         return req
     
index 126099c..5ed7e6d 100644 (file)
@@ -141,13 +141,24 @@ class StatsLogger:
         
         # Actions
         out.write("<table border='1' cellpadding='4px'>\n")
-        out.write("<tr><th><h3>Actions</h3></th><th>Started</th><th>Sent</th><th>OK</th><th>Failed</th><th>Received</th><th>Error</th></tr>\n")
+        out.write("<tr><th><h3>Actions</h3></th><th>Started</th><th>Sent</th>")
+        out.write("<th>Successful</th><th>Failed</th><th>Completed</th><th>Received</th><th>Error</th>")
+        out.write("<th>Successful Delay</th><th>Failed Delay</th><th>Total Delay</th></tr>\n")
         actions = self.actions.keys()
         actions.sort()
         for action in actions:
             out.write("<tr><td>" + action + "</td>")
-            for i in xrange(6):
+            for i in xrange(7):
                 out.write("<td>" + str(self.actions[action][i]) + "</td>")
+            for i in xrange(3):
+                count = self.actions[action][i+2]
+                if count > 0:
+                    total_delay = self.actions[action][i+7]
+                    avg_delay = total_delay / count
+                    avg_delay_sec = avg_delay.days*86400.0 + avg_delay.seconds + avg_delay.microseconds/1000000.0
+                else:
+                    avg_delay_sec = 0.0
+                out.write("<td>%0.2f</td>" % avg_delay_sec)
             out.write('</tr>\n')
         out.write("</table>\n")
         out.write("</td></tr>\n")
@@ -161,7 +172,7 @@ class StatsLogger:
         
         @param action: the name of the action
         """
-        act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0])
+        act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()])
         act[0] += 1
     
     #{ Called by the transport
@@ -170,47 +181,61 @@ class StatsLogger:
         
         @param action: the name of the action
         """
-        act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0])
+        act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()])
         act[1] += 1
         
-    def responseAction(self, response, action):
+    def responseAction(self, response, action, start):
         """Record that a response to an action was received.
         
         @param response: the response
         @param action: the name of the action
+        @param start: the time the action was started
         @return: the response (for use in deferreds)
         """
-        act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0])
+        act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()])
         act[2] += 1
+        act[7] += datetime.now() - start
         return response
         
-    def failedAction(self, response, action):
+    def failedAction(self, response, action, start):
         """Record that a failed response to an action was received.
         
         @param response: the response
         @param action: the name of the action
+        @param start: the time the action was started
         @return: the response (for use in deferreds)
         """
-        act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0])
+        act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()])
         act[3] += 1
+        act[8] += datetime.now() - start
         return response
         
+    def completedAction(self, action, start):
+        """Record that an action was completed.
+        
+        @param action: the name of the action
+        @param start: the time the action was started
+        """
+        act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()])
+        act[4] += 1
+        act[9] += datetime.now() - start
+        
     def receivedAction(self, action):
         """Record that an action was received.
         
         @param action: the name of the action
         """
         self.reachable = True
-        act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0])
-        act[4] += 1
+        act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()])
+        act[5] += 1
     
     def errorAction(self, action):
         """Record that a received action resulted in an error.
         
         @param action: the name of the action
         """
-        act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0])
-        act[5] += 1
+        act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()])
+        act[6] += 1
     
     def sentBytes(self, bytes):
         """Record that a single packet of some bytes was sent.