"""Details of how to perform actions on remote peers."""
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
from twisted.python import log
from khash import intify
def goWithNodes(self, nodes):
"""Start the action's process with a list of nodes to contact."""
for node in nodes:
- if node.id == self.caller.node.id:
- continue
- else:
- self.found[node.id] = node
+ self.found[node.id] = node
self.sortNodes()
self.schedule()
# Loop for each node that should be processed
for node in self.getNodesToProcess():
# Don't send requests twice or to ourself
- if node.id not in self.queried and node.id != self.caller.node.id:
+ if node.id not in self.queried:
self.queried[node.id] = 1
# Get the action to call on the node
- try:
- f = getattr(node, self.action)
- except AttributeError:
- log.msg("%s doesn't have a %s method!" % (node, self.action))
+ if node.id == self.caller.node.id:
+ try:
+ f = getattr(self.caller, 'krpc_' + self.action)
+ except AttributeError:
+ log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
+ continue
else:
- # Get the arguments to the action's method
try:
- args, expected_results = self.generateArgs(node)
- except ValueError:
- pass
- else:
- # Call the action on the remote node
- self.outstanding += 1
- self.outstanding_results += expected_results
- df = f(self.caller.node.id, *args)
- df.addCallbacks(self.gotResponse, self.actionFailed,
- callbackArgs = (node, expected_results),
- errbackArgs = (node, expected_results))
+ f = getattr(node, self.action)
+ except AttributeError:
+ log.msg("%s doesn't have a %s method!" % (node, self.action))
+ continue
+
+ # Get the arguments to the action's method
+ try:
+ args, expected_results = self.generateArgs(node)
+ except ValueError:
+ continue
+
+ # Call the action on the remote node
+ self.outstanding += 1
+ self.outstanding_results += expected_results
+ df = defer.maybeDeferred(f, *args)
+ reactor.callLater(0, df.addCallbacks,
+ *(self.gotResponse, self.actionFailed),
+ **{'callbackArgs': (node, expected_results, df),
+ 'errbackArgs': (node, expected_results, df)})
# We might have to stop for now
if (self.outstanding >= self.config['CONCURRENT_REQS'] or
result = self.generateResult()
reactor.callLater(0, self.callback, *result)
- def gotResponse(self, dict, node, expected_results):
+ def gotResponse(self, dict, node, expected_results, df):
"""Receive a response from a remote node."""
self.caller.insertNode(node)
if self.finished or self.answered.has_key(node.id):
self.processResponse(dict)
self.schedule()
- def actionFailed(self, err, node, expected_results):
+ def actionFailed(self, err, node, expected_results, df):
"""Receive an error from a remote node."""
log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
log.err(err)
def generateArgs(self, node):
"""Generate the arguments to the node's action.
- These arguments will be appended to our node ID when calling the action.
Also return the number of results expected from this request.
@raise ValueError: if the node should not be queried
"""
- return (self.target, ), 0
+ return (self.caller.node.id, self.target), 0
def processResponse(self, dict):
"""Process the response dictionary received from the remote node."""
class GetValue(ActionBase):
"""Retrieve values from a list of nodes."""
- def __init__(self, caller, target, local_results, num_results, callback, config, stats, action="get_value"):
- """Initialize the action with the locally available results.
-
- @type local_results: C{list} of C{string}
- @param local_results: the values that were available in this node
- """
+ def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
- if local_results:
- for result in local_results:
- self.results[result] = 1
def getNodesToProcess(self):
"""Nodes are never added, always return the same sorted node list."""
assert num_values > 0
if num_values > node.num_values:
num_values = 0
- return (self.target, num_values), node.num_values
+ return (self.caller.node.id, self.target, num_values), node.num_values
else:
raise ValueError, "Don't try and get values from this node because it doesn't have any"
def generateArgs(self, node):
"""Args include the value to store and the node's token."""
if node.token:
- return (self.target, self.value, node.token), 1
+ return (self.caller.node.id, self.target, self.value, node.token), 1
else:
raise ValueError, "Don't store at this node since we don't know it's token"