"""Details of how to perform actions on remote peers."""
+from datetime import datetime
+
from twisted.internet import reactor, defer
from twisted.python import log
@ivar config: the configuration variables for the DHT
@type action: C{string}
@ivar action: the name of the action to call on remote nodes
+ @type stats: L{stats.StatsLogger}
+ @ivar stats: the statistics modules to report to
@type num: C{long}
@ivar num: the target key in integer form
@type queried: C{dictionary}
the requests that are currently outstanding
@type finished: C{boolean}
@ivar finished: whether the action is done
+ @type started: C{datetime.datetime}
+ @ivar started: the time the action was started at
@type sort: C{method}
@ivar sort: used to sort nodes by their proximity to the target
"""
self.target = target
self.config = config
self.action = action
- stats.startedAction(action)
+ self.stats = stats
+ self.stats.startedAction(action)
self.num = intify(target)
self.queried = {}
self.answered = {}
self.outstanding = 0
self.outstanding_results = 0
self.finished = False
+ self.started = datetime.now()
def sort(a, b, num=self.num):
"""Sort nodes relative to the ID we are looking for."""
#{ Main operation
def goWithNodes(self, nodes):
"""Start the action's process with a list of nodes to contact."""
+ self.started = datetime.now()
for node in nodes:
self.found[node.id] = node
self.sortNodes()
def schedule(self):
"""Schedule requests to be sent to remote nodes."""
+ if self.finished:
+ return
+
# Check if we are already done
if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
(self.desired_results < 0 and
self.finished = True
result = self.generateResult()
reactor.callLater(0, self.callback, *result)
+ return
- if self.finished or (self.desired_results and
- len(self.results) + self.outstanding_results >= abs(self.desired_results)):
+ # Check if we have enough outstanding results coming
+ if (self.desired_results and
+ len(self.results) + self.outstanding_results >= abs(self.desired_results)):
return
# Loop for each node that should be processed
def gotResponse(self, dict, node, expected_results, df):
"""Receive a response from a remote node."""
- self.caller.insertNode(node)
+ reactor.callLater(0, self.caller.insertNode, node)
if self.finished or self.answered.has_key(node.id):
# a day late and a dollar short
return
def actionFailed(self, err, node, expected_results, df):
"""Receive an error from a remote node."""
- log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
- log.err(err)
+ log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage()))
self.caller.table.nodeFailed(node)
self.outstanding -= 1
self.outstanding_results -= expected_results
def generateResult(self, nodes):
"""Create the final result to return to the L{callback} function."""
+ self.stats.completedAction(self.action, self.started)
return []
def generateResult(self):
"""Result is the K closest nodes to the target."""
self.sortNodes()
+ self.stats.completedAction(self.action, self.started)
return (self.sorted_nodes[:K], )
def generateResult(self):
"""Result is the nodes that have values, sorted by proximity to the key."""
self.sortNodes()
+ self.stats.completedAction(self.action, self.started)
return ([node for node in self.sorted_nodes if node.num_values > 0], )
def generateResult(self):
"""Results have all been returned, now send the empty list to end the action."""
+ self.stats.completedAction(self.action, self.started)
return (self.target, [])
def generateResult(self):
"""Return all the response IDs received."""
+ self.stats.completedAction(self.action, self.started)
return (self.target, self.value, self.results.values())