"""Details of how to perform actions on remote peers."""
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
from twisted.python import log
from khash import intify
def goWithNodes(self, nodes):
"""Start the action's process with a list of nodes to contact."""
for node in nodes:
- if node.id == self.caller.node.id:
- continue
- else:
- self.found[node.id] = node
+ self.found[node.id] = node
self.sortNodes()
self.schedule()
# Loop for each node that should be processed
for node in self.getNodesToProcess():
# Don't send requests twice or to ourself
- if node.id not in self.queried and node.id != self.caller.node.id:
+ if node.id not in self.queried:
self.queried[node.id] = 1
# Get the action to call on the node
- try:
- f = getattr(node, self.action)
- except AttributeError:
- log.msg("%s doesn't have a %s method!" % (node, self.action))
+ if node.id == self.caller.node.id:
+ try:
+ f = getattr(self.caller, 'krpc_' + self.action)
+ except AttributeError:
+ log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
+ continue
else:
- # Get the arguments to the action's method
try:
- args, expected_results = self.generateArgs(node)
- except ValueError:
- pass
- else:
- # Call the action on the remote node
- self.outstanding += 1
- self.outstanding_results += expected_results
- df = f(self.caller.node.id, *args)
- df.addCallbacks(self.gotResponse, self.actionFailed,
- callbackArgs = (node, expected_results),
- errbackArgs = (node, expected_results))
+ f = getattr(node, self.action)
+ except AttributeError:
+ log.msg("%s doesn't have a %s method!" % (node, self.action))
+ continue
+
+ # Get the arguments to the action's method
+ try:
+ args, expected_results = self.generateArgs(node)
+ except ValueError:
+ continue
+
+ # Call the action on the remote node
+ self.outstanding += 1
+ self.outstanding_results += expected_results
+ df = defer.maybeDeferred(f, *args)
+ reactor.callLater(0, df.addCallbacks,
+ *(self.gotResponse, self.actionFailed),
+ **{'callbackArgs': (node, expected_results, df),
+ 'errbackArgs': (node, expected_results, df)})
# We might have to stop for now
if (self.outstanding >= self.config['CONCURRENT_REQS'] or
result = self.generateResult()
reactor.callLater(0, self.callback, *result)
- def gotResponse(self, dict, node, expected_results):
+ def gotResponse(self, dict, node, expected_results, df):
"""Receive a response from a remote node."""
self.caller.insertNode(node)
if self.finished or self.answered.has_key(node.id):
self.processResponse(dict)
self.schedule()
- def actionFailed(self, err, node, expected_results):
+ def actionFailed(self, err, node, expected_results, df):
"""Receive an error from a remote node."""
log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
log.err(err)
def generateArgs(self, node):
"""Generate the arguments to the node's action.
- These arguments will be appended to our node ID when calling the action.
Also return the number of results expected from this request.
@raise ValueError: if the node should not be queried
"""
- return (self.target, ), 0
+ return (self.caller.node.id, self.target), 0
def processResponse(self, dict):
"""Process the response dictionary received from the remote node."""
class GetValue(ActionBase):
"""Retrieve values from a list of nodes."""
- def __init__(self, caller, target, local_results, num_results, callback, config, stats, action="get_value"):
- """Initialize the action with the locally available results.
-
- @type local_results: C{list} of C{string}
- @param local_results: the values that were available in this node
- """
+ def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
- if local_results:
- for result in local_results:
- self.results[result] = 1
def getNodesToProcess(self):
"""Nodes are never added, always return the same sorted node list."""
assert num_values > 0
if num_values > node.num_values:
num_values = 0
- return (self.target, num_values), node.num_values
+ return (self.caller.node.id, self.target, num_values), node.num_values
else:
raise ValueError, "Don't try and get values from this node because it doesn't have any"
def generateArgs(self, node):
"""Args include the value to store and the node's token."""
if node.token:
- return (self.target, self.value, node.token), 1
+ return (self.caller.node.id, self.target, self.value, node.token), 1
else:
raise ValueError, "Don't store at this node since we don't know it's token"
@param errback: the method to call if an error occurs
(optional, defaults to doing nothing when an error occurs)
"""
- # Get K nodes out of local table/cache
- nodes = self.table.findNodes(id)
- nodes = [copy(node) for node in nodes]
+ # Start with our node
+ nodes = [copy(self.node)]
+
d = Deferred()
if errback:
d.addCallbacks(callback, errback)
else:
d.addCallback(callback)
- # If the target ID was found
- if len(nodes) == 1 and nodes[0].id == id:
- d.callback(nodes)
- else:
- # Start the finding nodes action
- state = FindNode(self, id, d.callback, self.config, self.stats)
- reactor.callLater(0, state.goWithNodes, nodes)
+ # Start the finding nodes action
+ state = FindNode(self, id, d.callback, self.config, self.stats)
+ reactor.callLater(0, state.goWithNodes, nodes)
def insertNode(self, node, contacted = True):
"""Try to insert a node in our local table, pinging oldest contact if necessary.
@param errback: the method to call if an error occurs
(optional, defaults to doing nothing when an error occurs)
"""
- # Get K nodes out of local table/cache
- nodes = self.table.findNodes(key)
- nodes = [copy(node) for node in nodes]
+ # Start with ourself
+ nodes = [copy(self.node)]
+
d = Deferred()
if errback:
d.addCallbacks(callback, errback)
@type searchlocal: C{boolean}
@param searchlocal: whether to also look for any local values
"""
- # Get any local values
- if searchlocal:
- l = self.store.retrieveValues(key)
- if len(l) > 0:
- reactor.callLater(0, callback, key, l)
- else:
- l = []
- def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
+ def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
"""Use the found nodes to send requests for values to."""
- state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
+ # Get any local values
+ if searchlocal:
+ l = self.store.retrieveValues(key)
+ if len(l) > 0:
+ node = copy(self.node)
+ node.updateNumValues(len(l))
+ nodes = nodes + [node]
+
+ state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
reactor.callLater(0, state.goWithNodes, nodes)
# First lookup nodes that have values for the key