from twisted.internet import reactor, defer
from twisted.python import log
from khash import intify
from twisted.internet import reactor, defer
from twisted.python import log
from khash import intify
@ivar config: the configuration variables for the DHT
@type action: C{string}
@ivar action: the name of the action to call on remote nodes
@ivar config: the configuration variables for the DHT
@type action: C{string}
@ivar action: the name of the action to call on remote nodes
the requests that are currently outstanding
@type finished: C{boolean}
@ivar finished: whether the action is done
the requests that are currently outstanding
@type finished: C{boolean}
@ivar finished: whether the action is done
def sort(a, b, num=self.num):
"""Sort nodes relative to the ID we are looking for."""
def sort(a, b, num=self.num):
"""Sort nodes relative to the ID we are looking for."""
#{ Main operation
def goWithNodes(self, nodes):
"""Start the action's process with a list of nodes to contact."""
#{ Main operation
def goWithNodes(self, nodes):
"""Start the action's process with a list of nodes to contact."""
# Check if we are already done
if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
(self.desired_results < 0 and
# Check if we are already done
if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
(self.desired_results < 0 and
self.finished = True
result = self.generateResult()
reactor.callLater(0, self.callback, *result)
self.finished = True
result = self.generateResult()
reactor.callLater(0, self.callback, *result)
- if self.finished or (self.desired_results and
- len(self.results) + self.outstanding_results >= abs(self.desired_results)):
+ # Check if we have enough outstanding results coming
+ if (self.desired_results and
+ len(self.results) + self.outstanding_results >= abs(self.desired_results)):
def gotResponse(self, dict, node, expected_results, df):
"""Receive a response from a remote node."""
def gotResponse(self, dict, node, expected_results, df):
"""Receive a response from a remote node."""
def actionFailed(self, err, node, expected_results, df):
"""Receive an error from a remote node."""
def actionFailed(self, err, node, expected_results, df):
"""Receive an error from a remote node."""
- log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
- log.err(err)
+ log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage()))
def generateArgs(self, node):
"""Generate the arguments to the node's action.
def generateArgs(self, node):
"""Generate the arguments to the node's action.
def generateResult(self, nodes):
"""Create the final result to return to the L{callback} function."""
def generateResult(self, nodes):
"""Create the final result to return to the L{callback} function."""
- return (self.sorted_nodes[:self.config['K']], )
+ self.stats.completedAction(self.action, self.started)
+ return (self.sorted_nodes[:K], )
def generateResult(self):
"""Result is the nodes that have values, sorted by proximity to the key."""
self.sortNodes()
def generateResult(self):
"""Result is the nodes that have values, sorted by proximity to the key."""
self.sortNodes()
def generateResult(self):
"""Results have all been returned, now send the empty list to end the action."""
def generateResult(self):
"""Results have all been returned, now send the empty list to end the action."""