X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_p2p_Khashmir%2Factions.py;h=bd1cd2f6f1905672717b1b90e0358040afbdf8e9;hp=5254fe49bd6f5c51db135a620eb69bb9d3f4dbc3;hb=c5ae5c9ad8eb68a8ec01cfa1b19d81cbad49617a;hpb=2f3c17ab7b51cbad0abf11fa66f6cafcb5a41c5e diff --git a/apt_p2p_Khashmir/actions.py b/apt_p2p_Khashmir/actions.py index 5254fe4..bd1cd2f 100644 --- a/apt_p2p_Khashmir/actions.py +++ b/apt_p2p_Khashmir/actions.py @@ -3,6 +3,8 @@ """Details of how to perform actions on remote peers.""" +from datetime import datetime + from twisted.internet import reactor, defer from twisted.python import log @@ -21,6 +23,8 @@ class ActionBase: @ivar config: the configuration variables for the DHT @type action: C{string} @ivar action: the name of the action to call on remote nodes + @type stats: L{stats.StatsLogger} + @ivar stats: the statistics modules to report to @type num: C{long} @ivar num: the target key in integer form @type queried: C{dictionary} @@ -46,6 +50,8 @@ class ActionBase: the requests that are currently outstanding @type finished: C{boolean} @ivar finished: whether the action is done + @type started: C{datetime.datetime} + @ivar started: the time the action was started at @type sort: C{method} @ivar sort: used to sort nodes by their proximity to the target """ @@ -75,7 +81,8 @@ class ActionBase: self.target = target self.config = config self.action = action - stats.startedAction(action) + self.stats = stats + self.stats.startedAction(action) self.num = intify(target) self.queried = {} self.answered = {} @@ -87,6 +94,7 @@ class ActionBase: self.outstanding = 0 self.outstanding_results = 0 self.finished = False + self.started = datetime.now() def sort(a, b, num=self.num): """Sort nodes relative to the ID we are looking for.""" @@ -101,6 +109,7 @@ class ActionBase: #{ Main operation def goWithNodes(self, nodes): """Start the action's process with a list of nodes to contact.""" + self.started = datetime.now() for node in nodes: self.found[node.id] = node self.sortNodes() @@ -108,6 +117,9 @@ class ActionBase: def schedule(self): """Schedule requests to be sent to remote nodes.""" + if self.finished: + return + # Check if we are already done if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or (self.desired_results < 0 and @@ -115,9 +127,11 @@ class ActionBase: self.finished = True result = self.generateResult() reactor.callLater(0, self.callback, *result) + return - if self.finished or (self.desired_results and - len(self.results) + self.outstanding_results >= abs(self.desired_results)): + # Check if we have enough outstanding results coming + if (self.desired_results and + len(self.results) + self.outstanding_results >= abs(self.desired_results)): return # Loop for each node that should be processed @@ -172,7 +186,7 @@ class ActionBase: def gotResponse(self, dict, node, expected_results, df): """Receive a response from a remote node.""" - self.caller.insertNode(node) + reactor.callLater(0, self.caller.insertNode, node) if self.finished or self.answered.has_key(node.id): # a day late and a dollar short return @@ -184,8 +198,7 @@ class ActionBase: def actionFailed(self, err, node, expected_results, df): """Receive an error from a remote node.""" - log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port)) - log.err(err) + log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage())) self.caller.table.nodeFailed(node) self.outstanding -= 1 self.outstanding_results -= expected_results @@ -236,6 +249,7 @@ class ActionBase: def generateResult(self, nodes): """Create the final result to return to the L{callback} function.""" + self.stats.completedAction(self.action, self.started) return [] @@ -254,6 +268,7 @@ class FindNode(ActionBase): def generateResult(self): """Result is the K closest nodes to the target.""" self.sortNodes() + self.stats.completedAction(self.action, self.started) return (self.sorted_nodes[:K], ) @@ -272,6 +287,7 @@ class FindValue(ActionBase): def generateResult(self): """Result is the nodes that have values, sorted by proximity to the key.""" self.sortNodes() + self.stats.completedAction(self.action, self.started) return ([node for node in self.sorted_nodes if node.num_values > 0], ) @@ -313,6 +329,7 @@ class GetValue(ActionBase): def generateResult(self): """Results have all been returned, now send the empty list to end the action.""" + self.stats.completedAction(self.action, self.started) return (self.target, []) @@ -345,4 +362,5 @@ class StoreValue(ActionBase): def generateResult(self): """Return all the response IDs received.""" + self.stats.completedAction(self.action, self.started) return (self.target, self.value, self.results.values())