X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p_Khashmir%2Factions.py;h=b54632065002dbd2a0dc6c4439451ffa3c2d5ab0;hb=560d950422ec0610ee5ec855c1774bd73ccdd8cb;hp=870e37c80450220d84109077209e8e0905a4b976;hpb=022aebfacf60fa46cf2cef01028c502b811208a6;p=quix0rs-apt-p2p.git diff --git a/apt_p2p_Khashmir/actions.py b/apt_p2p_Khashmir/actions.py index 870e37c..b546320 100644 --- a/apt_p2p_Khashmir/actions.py +++ b/apt_p2p_Khashmir/actions.py @@ -3,7 +3,7 @@ """Details of how to perform actions on remote peers.""" -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.python import log from khash import intify @@ -101,10 +101,7 @@ class ActionBase: def goWithNodes(self, nodes): """Start the action's process with a list of nodes to contact.""" for node in nodes: - if node.id == self.caller.node.id: - continue - else: - self.found[node.id] = node + self.found[node.id] = node self.sortNodes() self.schedule() @@ -125,28 +122,37 @@ class ActionBase: # Loop for each node that should be processed for node in self.getNodesToProcess(): # Don't send requests twice or to ourself - if node.id not in self.queried and node.id != self.caller.node.id: + if node.id not in self.queried: self.queried[node.id] = 1 # Get the action to call on the node - try: - f = getattr(node, self.action) - except AttributeError: - log.msg("%s doesn't have a %s method!" % (node, self.action)) + if node.id == self.caller.node.id: + try: + f = getattr(self.caller, 'krpc_' + self.action) + except AttributeError: + log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action)) + continue else: - # Get the arguments to the action's method try: - args, expected_results = self.generateArgs(node) - except ValueError: - pass - else: - # Call the action on the remote node - self.outstanding += 1 - self.outstanding_results += expected_results - df = f(self.caller.node.id, *args) - df.addCallbacks(self.gotResponse, self.actionFailed, - callbackArgs = (node, expected_results), - errbackArgs = (node, expected_results)) + f = getattr(node, self.action) + except AttributeError: + log.msg("%s doesn't have a %s method!" % (node, self.action)) + continue + + # Get the arguments to the action's method + try: + args, expected_results = self.generateArgs(node) + except ValueError: + continue + + # Call the action on the remote node + self.outstanding += 1 + self.outstanding_results += expected_results + df = defer.maybeDeferred(f, *args) + reactor.callLater(0, df.addCallbacks, + *(self.gotResponse, self.actionFailed), + **{'callbackArgs': (node, expected_results, df), + 'errbackArgs': (node, expected_results, df)}) # We might have to stop for now if (self.outstanding >= self.config['CONCURRENT_REQS'] or @@ -163,7 +169,7 @@ class ActionBase: result = self.generateResult() reactor.callLater(0, self.callback, *result) - def gotResponse(self, dict, node, expected_results): + def gotResponse(self, dict, node, expected_results, df): """Receive a response from a remote node.""" self.caller.insertNode(node) if self.finished or self.answered.has_key(node.id): @@ -172,10 +178,10 @@ class ActionBase: self.outstanding -= 1 self.outstanding_results -= expected_results self.answered[node.id] = 1 - self.processResponse(dict['rsp']) + self.processResponse(dict) self.schedule() - def actionFailed(self, err, node, expected_results): + def actionFailed(self, err, node, expected_results, df): """Receive an error from a remote node.""" log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port)) log.err(err) @@ -217,12 +223,11 @@ class ActionBase: def generateArgs(self, node): """Generate the arguments to the node's action. - These arguments will be appended to our node ID when calling the action. Also return the number of results expected from this request. @raise ValueError: if the node should not be queried """ - return (self.target, ), 0 + return (self.caller.node.id, self.target), 0 def processResponse(self, dict): """Process the response dictionary received from the remote node.""" @@ -272,16 +277,8 @@ class FindValue(ActionBase): class GetValue(ActionBase): """Retrieve values from a list of nodes.""" - def __init__(self, caller, target, local_results, num_results, callback, config, stats, action="get_value"): - """Initialize the action with the locally available results. - - @type local_results: C{list} of C{string} - @param local_results: the values that were available in this node - """ + def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"): ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results) - if local_results: - for result in local_results: - self.results[result] = 1 def getNodesToProcess(self): """Nodes are never added, always return the same sorted node list.""" @@ -295,7 +292,7 @@ class GetValue(ActionBase): assert num_values > 0 if num_values > node.num_values: num_values = 0 - return (self.target, num_values), node.num_values + return (self.caller.node.id, self.target, num_values), node.num_values else: raise ValueError, "Don't try and get values from this node because it doesn't have any" @@ -337,7 +334,7 @@ class StoreValue(ActionBase): def generateArgs(self, node): """Args include the value to store and the node's token.""" if node.token: - return (self.target, self.value, node.token), 1 + return (self.caller.node.id, self.target, self.value, node.token), 1 else: raise ValueError, "Don't store at this node since we don't know it's token"