From dc557e76d53d4cf82769ebf078aaa77fb3c6e74a Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Fri, 18 Apr 2008 14:03:26 -0700 Subject: [PATCH] DHT stats now include completed actions and various elapsed times for actions. --- apt_p2p/stats.py | 8 +++--- apt_p2p_Khashmir/actions.py | 16 +++++++++++- apt_p2p_Khashmir/khashmir.py | 12 ++++++--- apt_p2p_Khashmir/krpc.py | 3 ++- apt_p2p_Khashmir/stats.py | 49 +++++++++++++++++++++++++++--------- 5 files changed, 66 insertions(+), 22 deletions(-) diff --git a/apt_p2p/stats.py b/apt_p2p/stats.py index 06e291b..b76e389 100644 --- a/apt_p2p/stats.py +++ b/apt_p2p/stats.py @@ -95,24 +95,24 @@ class StatsLogger: out.write("This Session") out.write("" + byte_format(self.mirrorDown) + '') out.write("" + byte_format(self.peerDown) + '') - out.write("" + byte_format(self.peerUp) + '') + out.write("" + byte_format(self.peerUp) + '\n') out.write("Session Ratio") out.write("%0.2f%%" % (100.0 * float(self.mirrorDown) / float(max(self.mirrorDown + self.peerDown, 1)), )) out.write("%0.2f%%" % (100.0 * float(self.peerDown) / float(max(self.mirrorDown + self.peerDown, 1)), )) - out.write("%0.2f%%" % + out.write("%0.2f%%\n" % (100.0 * float(self.peerUp) / float(max(self.mirrorDown + self.peerDown, 1)), )) out.write("All-Time") out.write("" + byte_format(self.mirrorAllDown) + '') out.write("" + byte_format(self.peerAllDown) + '') - out.write("" + byte_format(self.peerAllUp) + '') + out.write("" + byte_format(self.peerAllUp) + '\n') out.write("All-Time Ratio") out.write("%0.2f%%" % (100.0 * float(self.mirrorAllDown) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), )) out.write("%0.2f%%" % (100.0 * float(self.peerAllDown) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), )) - out.write("%0.2f%%" % + out.write("%0.2f%%" % (100.0 * float(self.peerAllUp) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), )) out.write("\n") out.write("\n") diff --git a/apt_p2p_Khashmir/actions.py b/apt_p2p_Khashmir/actions.py index 5254fe4..95aef72 100644 --- a/apt_p2p_Khashmir/actions.py +++ b/apt_p2p_Khashmir/actions.py @@ -3,6 +3,8 @@ """Details of how to perform actions on remote peers.""" +from datetime import datetime + from twisted.internet import reactor, defer from twisted.python import log @@ -21,6 +23,8 @@ class ActionBase: @ivar config: the configuration variables for the DHT @type action: C{string} @ivar action: the name of the action to call on remote nodes + @type stats: L{stats.StatsLogger} + @ivar stats: the statistics modules to report to @type num: C{long} @ivar num: the target key in integer form @type queried: C{dictionary} @@ -46,6 +50,8 @@ class ActionBase: the requests that are currently outstanding @type finished: C{boolean} @ivar finished: whether the action is done + @type started: C{datetime.datetime} + @ivar started: the time the action was started at @type sort: C{method} @ivar sort: used to sort nodes by their proximity to the target """ @@ -75,7 +81,8 @@ class ActionBase: self.target = target self.config = config self.action = action - stats.startedAction(action) + self.stats = stats + self.stats.startedAction(action) self.num = intify(target) self.queried = {} self.answered = {} @@ -87,6 +94,7 @@ class ActionBase: self.outstanding = 0 self.outstanding_results = 0 self.finished = False + self.started = datetime.now() def sort(a, b, num=self.num): """Sort nodes relative to the ID we are looking for.""" @@ -101,6 +109,7 @@ class ActionBase: #{ Main operation def goWithNodes(self, nodes): """Start the action's process with a list of nodes to contact.""" + self.started = datetime.now() for node in nodes: self.found[node.id] = node self.sortNodes() @@ -236,6 +245,7 @@ class ActionBase: def generateResult(self, nodes): """Create the final result to return to the L{callback} function.""" + self.stats.completedAction(self.action, self.started) return [] @@ -254,6 +264,7 @@ class FindNode(ActionBase): def generateResult(self): """Result is the K closest nodes to the target.""" self.sortNodes() + self.stats.completedAction(self.action, self.started) return (self.sorted_nodes[:K], ) @@ -272,6 +283,7 @@ class FindValue(ActionBase): def generateResult(self): """Result is the nodes that have values, sorted by proximity to the key.""" self.sortNodes() + self.stats.completedAction(self.action, self.started) return ([node for node in self.sorted_nodes if node.num_values > 0], ) @@ -313,6 +325,7 @@ class GetValue(ActionBase): def generateResult(self): """Results have all been returned, now send the empty list to end the action.""" + self.stats.completedAction(self.action, self.started) return (self.target, []) @@ -345,4 +358,5 @@ class StoreValue(ActionBase): def generateResult(self): """Return all the response IDs received.""" + self.stats.completedAction(self.action, self.started) return (self.target, self.value, self.results.values()) diff --git a/apt_p2p_Khashmir/khashmir.py b/apt_p2p_Khashmir/khashmir.py index c5b199a..580f0f0 100644 --- a/apt_p2p_Khashmir/khashmir.py +++ b/apt_p2p_Khashmir/khashmir.py @@ -198,14 +198,16 @@ class KhashmirBase(protocol.Factory): (datetime.now() - old.lastSeen) > timedelta(seconds=self.config['MIN_PING_INTERVAL'])): - def _staleNodeHandler(err, oldnode = old, newnode = node, self = self): + def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()): """The pinged node never responded, so replace it.""" log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port)) log.err(err) + self.stats.completedAction('ping', start) self.table.replaceStaleNode(oldnode, newnode) - def _notStaleNodeHandler(dict, old=old, self=self): + def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()): """Got a pong from the old node, so update it.""" + self.stats.completedAction('ping', start) if dict['id'] == old.id: self.table.justSeenNode(old.id) @@ -228,17 +230,19 @@ class KhashmirBase(protocol.Factory): (optional, defaults to calling the callback with None) """ - def _pongHandler(dict, node=node, self=self, callback=callback): + def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()): """Node responded properly, callback with response.""" n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) + self.stats.completedAction('join', start) self.insertNode(n) if callback: callback((dict['ip_addr'], dict['port'])) - def _defaultPong(err, node=node, self=self, callback=callback, errback=errback): + def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()): """Error occurred, fail node and errback or callback with error.""" log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port)) log.err(err) + self.stats.completedAction('join', start) self.table.nodeFailed(node) if errback: errback() diff --git a/apt_p2p_Khashmir/krpc.py b/apt_p2p_Khashmir/krpc.py index dadf0f2..acc5fbf 100644 --- a/apt_p2p_Khashmir/krpc.py +++ b/apt_p2p_Khashmir/krpc.py @@ -523,7 +523,8 @@ class KRPC: # Save the conclusion of the action req.addCallbacks(self.stats.responseAction, self.stats.failedAction, - callbackArgs = (method, ), errbackArgs = (method, )) + callbackArgs = (method, datetime.now()), + errbackArgs = (method, datetime.now())) return req diff --git a/apt_p2p_Khashmir/stats.py b/apt_p2p_Khashmir/stats.py index 126099c..5ed7e6d 100644 --- a/apt_p2p_Khashmir/stats.py +++ b/apt_p2p_Khashmir/stats.py @@ -141,13 +141,24 @@ class StatsLogger: # Actions out.write("\n") - out.write("\n") + out.write("") + out.write("") + out.write("\n") actions = self.actions.keys() actions.sort() for action in actions: out.write("") - for i in xrange(6): + for i in xrange(7): out.write("") + for i in xrange(3): + count = self.actions[action][i+2] + if count > 0: + total_delay = self.actions[action][i+7] + avg_delay = total_delay / count + avg_delay_sec = avg_delay.days*86400.0 + avg_delay.seconds + avg_delay.microseconds/1000000.0 + else: + avg_delay_sec = 0.0 + out.write("" % avg_delay_sec) out.write('\n') out.write("

Actions

StartedSentOKFailedReceivedError

Actions

StartedSentSuccessfulFailedCompletedReceivedErrorSuccessful DelayFailed DelayTotal Delay
" + action + "" + str(self.actions[action][i]) + "%0.2f
\n") out.write("\n") @@ -161,7 +172,7 @@ class StatsLogger: @param action: the name of the action """ - act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0]) + act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()]) act[0] += 1 #{ Called by the transport @@ -170,47 +181,61 @@ class StatsLogger: @param action: the name of the action """ - act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0]) + act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()]) act[1] += 1 - def responseAction(self, response, action): + def responseAction(self, response, action, start): """Record that a response to an action was received. @param response: the response @param action: the name of the action + @param start: the time the action was started @return: the response (for use in deferreds) """ - act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0]) + act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()]) act[2] += 1 + act[7] += datetime.now() - start return response - def failedAction(self, response, action): + def failedAction(self, response, action, start): """Record that a failed response to an action was received. @param response: the response @param action: the name of the action + @param start: the time the action was started @return: the response (for use in deferreds) """ - act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0]) + act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()]) act[3] += 1 + act[8] += datetime.now() - start return response + def completedAction(self, action, start): + """Record that an action was completed. + + @param action: the name of the action + @param start: the time the action was started + """ + act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()]) + act[4] += 1 + act[9] += datetime.now() - start + def receivedAction(self, action): """Record that an action was received. @param action: the name of the action """ self.reachable = True - act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0]) - act[4] += 1 + act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()]) + act[5] += 1 def errorAction(self, action): """Record that a received action resulted in an error. @param action: the name of the action """ - act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0]) - act[5] += 1 + act = self.actions.setdefault(action, [0, 0, 0, 0, 0, 0, 0, timedelta(), timedelta(), timedelta()]) + act[6] += 1 def sentBytes(self, bytes): """Record that a single packet of some bytes was sent. -- 2.39.2