From: Cameron Dale Date: Thu, 27 Mar 2008 23:55:08 +0000 (-0700) Subject: Allow the actions to call the local node's remote interface. X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=commitdiff_plain;h=81a89f3b32dde69a97426daab2b672ba2a9b3d47 Allow the actions to call the local node's remote interface. The find_* actions now start with only the local node instead of the nodes from the routing table. --- diff --git a/apt_p2p_Khashmir/actions.py b/apt_p2p_Khashmir/actions.py index dfb0852..b546320 100644 --- a/apt_p2p_Khashmir/actions.py +++ b/apt_p2p_Khashmir/actions.py @@ -3,7 +3,7 @@ """Details of how to perform actions on remote peers.""" -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.python import log from khash import intify @@ -101,10 +101,7 @@ class ActionBase: def goWithNodes(self, nodes): """Start the action's process with a list of nodes to contact.""" for node in nodes: - if node.id == self.caller.node.id: - continue - else: - self.found[node.id] = node + self.found[node.id] = node self.sortNodes() self.schedule() @@ -125,28 +122,37 @@ class ActionBase: # Loop for each node that should be processed for node in self.getNodesToProcess(): # Don't send requests twice or to ourself - if node.id not in self.queried and node.id != self.caller.node.id: + if node.id not in self.queried: self.queried[node.id] = 1 # Get the action to call on the node - try: - f = getattr(node, self.action) - except AttributeError: - log.msg("%s doesn't have a %s method!" % (node, self.action)) + if node.id == self.caller.node.id: + try: + f = getattr(self.caller, 'krpc_' + self.action) + except AttributeError: + log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action)) + continue else: - # Get the arguments to the action's method try: - args, expected_results = self.generateArgs(node) - except ValueError: - pass - else: - # Call the action on the remote node - self.outstanding += 1 - self.outstanding_results += expected_results - df = f(self.caller.node.id, *args) - df.addCallbacks(self.gotResponse, self.actionFailed, - callbackArgs = (node, expected_results), - errbackArgs = (node, expected_results)) + f = getattr(node, self.action) + except AttributeError: + log.msg("%s doesn't have a %s method!" % (node, self.action)) + continue + + # Get the arguments to the action's method + try: + args, expected_results = self.generateArgs(node) + except ValueError: + continue + + # Call the action on the remote node + self.outstanding += 1 + self.outstanding_results += expected_results + df = defer.maybeDeferred(f, *args) + reactor.callLater(0, df.addCallbacks, + *(self.gotResponse, self.actionFailed), + **{'callbackArgs': (node, expected_results, df), + 'errbackArgs': (node, expected_results, df)}) # We might have to stop for now if (self.outstanding >= self.config['CONCURRENT_REQS'] or @@ -163,7 +169,7 @@ class ActionBase: result = self.generateResult() reactor.callLater(0, self.callback, *result) - def gotResponse(self, dict, node, expected_results): + def gotResponse(self, dict, node, expected_results, df): """Receive a response from a remote node.""" self.caller.insertNode(node) if self.finished or self.answered.has_key(node.id): @@ -175,7 +181,7 @@ class ActionBase: self.processResponse(dict) self.schedule() - def actionFailed(self, err, node, expected_results): + def actionFailed(self, err, node, expected_results, df): """Receive an error from a remote node.""" log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port)) log.err(err) @@ -217,12 +223,11 @@ class ActionBase: def generateArgs(self, node): """Generate the arguments to the node's action. - These arguments will be appended to our node ID when calling the action. Also return the number of results expected from this request. @raise ValueError: if the node should not be queried """ - return (self.target, ), 0 + return (self.caller.node.id, self.target), 0 def processResponse(self, dict): """Process the response dictionary received from the remote node.""" @@ -272,16 +277,8 @@ class FindValue(ActionBase): class GetValue(ActionBase): """Retrieve values from a list of nodes.""" - def __init__(self, caller, target, local_results, num_results, callback, config, stats, action="get_value"): - """Initialize the action with the locally available results. - - @type local_results: C{list} of C{string} - @param local_results: the values that were available in this node - """ + def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"): ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results) - if local_results: - for result in local_results: - self.results[result] = 1 def getNodesToProcess(self): """Nodes are never added, always return the same sorted node list.""" @@ -295,7 +292,7 @@ class GetValue(ActionBase): assert num_values > 0 if num_values > node.num_values: num_values = 0 - return (self.target, num_values), node.num_values + return (self.caller.node.id, self.target, num_values), node.num_values else: raise ValueError, "Don't try and get values from this node because it doesn't have any" @@ -337,7 +334,7 @@ class StoreValue(ActionBase): def generateArgs(self, node): """Args include the value to store and the node's token.""" if node.token: - return (self.target, self.value, node.token), 1 + return (self.caller.node.id, self.target, self.value, node.token), 1 else: raise ValueError, "Don't store at this node since we don't know it's token" diff --git a/apt_p2p_Khashmir/khashmir.py b/apt_p2p_Khashmir/khashmir.py index 554608f..7946523 100644 --- a/apt_p2p_Khashmir/khashmir.py +++ b/apt_p2p_Khashmir/khashmir.py @@ -174,22 +174,18 @@ class KhashmirBase(protocol.Factory): @param errback: the method to call if an error occurs (optional, defaults to doing nothing when an error occurs) """ - # Get K nodes out of local table/cache - nodes = self.table.findNodes(id) - nodes = [copy(node) for node in nodes] + # Start with our node + nodes = [copy(self.node)] + d = Deferred() if errback: d.addCallbacks(callback, errback) else: d.addCallback(callback) - # If the target ID was found - if len(nodes) == 1 and nodes[0].id == id: - d.callback(nodes) - else: - # Start the finding nodes action - state = FindNode(self, id, d.callback, self.config, self.stats) - reactor.callLater(0, state.goWithNodes, nodes) + # Start the finding nodes action + state = FindNode(self, id, d.callback, self.config, self.stats) + reactor.callLater(0, state.goWithNodes, nodes) def insertNode(self, node, contacted = True): """Try to insert a node in our local table, pinging oldest contact if necessary. @@ -380,9 +376,9 @@ class KhashmirRead(KhashmirBase): @param errback: the method to call if an error occurs (optional, defaults to doing nothing when an error occurs) """ - # Get K nodes out of local table/cache - nodes = self.table.findNodes(key) - nodes = [copy(node) for node in nodes] + # Start with ourself + nodes = [copy(self.node)] + d = Deferred() if errback: d.addCallbacks(callback, errback) @@ -407,17 +403,18 @@ class KhashmirRead(KhashmirBase): @type searchlocal: C{boolean} @param searchlocal: whether to also look for any local values """ - # Get any local values - if searchlocal: - l = self.store.retrieveValues(key) - if len(l) > 0: - reactor.callLater(0, callback, key, l) - else: - l = [] - def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self): + def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal): """Use the found nodes to send requests for values to.""" - state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats) + # Get any local values + if searchlocal: + l = self.store.retrieveValues(key) + if len(l) > 0: + node = copy(self.node) + node.updateNumValues(len(l)) + nodes = nodes + [node] + + state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats) reactor.callLater(0, state.goWithNodes, nodes) # First lookup nodes that have values for the key