X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_p2p_Khashmir%2Factions.py;h=883bccbd29ee5c126be3d10f2c607d4429936fd7;hp=dfb085241df0dac73e8e83eee74610a3572b8dbf;hb=96ea4dd2eaf55c5e94f1a9f13ec36fac10387158;hpb=ff8f24ee57f4c415191660c381c27159b1342357 diff --git a/apt_p2p_Khashmir/actions.py b/apt_p2p_Khashmir/actions.py index dfb0852..883bccb 100644 --- a/apt_p2p_Khashmir/actions.py +++ b/apt_p2p_Khashmir/actions.py @@ -1,12 +1,13 @@ -## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information """Details of how to perform actions on remote peers.""" -from twisted.internet import reactor +from datetime import datetime + +from twisted.internet import reactor, defer from twisted.python import log from khash import intify +from ktable import K from util import uncompact class ActionBase: @@ -20,6 +21,8 @@ class ActionBase: @ivar config: the configuration variables for the DHT @type action: C{string} @ivar action: the name of the action to call on remote nodes + @type stats: L{stats.StatsLogger} + @ivar stats: the statistics modules to report to @type num: C{long} @ivar num: the target key in integer form @type queried: C{dictionary} @@ -27,6 +30,8 @@ class ActionBase: keys are node IDs, values are the node itself @type answered: C{dictionary} @ivar answered: the nodes that have answered the queries + @type failed: C{dictionary} + @ivar failed: the nodes that have failed to answer the queries @type found: C{dictionary} @ivar found: nodes that have been found so far by the action @type sorted_nodes: C{list} of L{node.Node} @@ -38,13 +43,16 @@ class ActionBase: before the action should stop @type callback: C{method} @ivar callback: the method to call with the results - @type outstanding: C{int} - @ivar outstanding: the number of requests currently outstanding + @type outstanding: C{dictionary} + @ivar outstanding: the nodes that have outstanding requests for this action, + keys are node IDs, values are the number of outstanding results from the node @type outstanding_results: C{int} - @ivar outstanding_results: the number of results that are expected from + @ivar outstanding_results: the total number of results that are expected from the requests that are currently outstanding @type finished: C{boolean} @ivar finished: whether the action is done + @type started: C{datetime.datetime} + @ivar started: the time the action was started at @type sort: C{method} @ivar sort: used to sort nodes by their proximity to the target """ @@ -74,18 +82,21 @@ class ActionBase: self.target = target self.config = config self.action = action - stats.startedAction(action) + self.stats = stats + self.stats.startedAction(action) self.num = intify(target) self.queried = {} self.answered = {} + self.failed = {} self.found = {} self.sorted_nodes = [] self.results = {} self.desired_results = num_results self.callback = callback - self.outstanding = 0 + self.outstanding = {} self.outstanding_results = 0 self.finished = False + self.started = datetime.now() def sort(a, b, num=self.num): """Sort nodes relative to the ID we are looking for.""" @@ -100,88 +111,116 @@ class ActionBase: #{ Main operation def goWithNodes(self, nodes): """Start the action's process with a list of nodes to contact.""" + self.started = datetime.now() for node in nodes: - if node.id == self.caller.node.id: - continue - else: - self.found[node.id] = node + self.found[node.id] = node self.sortNodes() self.schedule() def schedule(self): """Schedule requests to be sent to remote nodes.""" + if self.finished: + return + + # Get the nodes to be processed + nodes = self.getNodesToProcess() + # Check if we are already done - if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or - (self.desired_results < 0 and - len(self.answered) >= self.config['STORE_REDUNDANCY'])): + if nodes is None or (self.desired_results and + ((len(self.results) >= abs(self.desired_results)) or + (self.desired_results < 0 and + len(self.answered) >= self.config['STORE_REDUNDANCY']))): self.finished = True result = self.generateResult() reactor.callLater(0, self.callback, *result) + return - if self.finished or (self.desired_results and - len(self.results) + self.outstanding_results >= abs(self.desired_results)): + # Check if we have enough outstanding results coming + if (self.desired_results and + len(self.results) + self.outstanding_results >= abs(self.desired_results)): return # Loop for each node that should be processed - for node in self.getNodesToProcess(): + for node in nodes: # Don't send requests twice or to ourself - if node.id not in self.queried and node.id != self.caller.node.id: + if node.id not in self.queried: self.queried[node.id] = 1 # Get the action to call on the node - try: - f = getattr(node, self.action) - except AttributeError: - log.msg("%s doesn't have a %s method!" % (node, self.action)) + if node.id == self.caller.node.id: + try: + f = getattr(self.caller, 'krpc_' + self.action) + except AttributeError: + log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action)) + continue else: - # Get the arguments to the action's method try: - args, expected_results = self.generateArgs(node) - except ValueError: - pass - else: - # Call the action on the remote node - self.outstanding += 1 - self.outstanding_results += expected_results - df = f(self.caller.node.id, *args) - df.addCallbacks(self.gotResponse, self.actionFailed, - callbackArgs = (node, expected_results), - errbackArgs = (node, expected_results)) + f = getattr(node, self.action) + except AttributeError: + log.msg("%s doesn't have a %s method!" % (node, self.action)) + continue + + # Get the arguments to the action's method + try: + args, expected_results = self.generateArgs(node) + except ValueError: + continue + + # Call the action on the remote node + self.outstanding[node.id] = expected_results + self.outstanding_results += expected_results + df = defer.maybeDeferred(f, *args) + reactor.callLater(0, df.addCallbacks, + *(self.gotResponse, self.actionFailed), + **{'callbackArgs': (node, ), + 'errbackArgs': (node, )}) # We might have to stop for now - if (self.outstanding >= self.config['CONCURRENT_REQS'] or + if (len(self.outstanding) >= self.config['CONCURRENT_REQS'] or (self.desired_results and len(self.results) + self.outstanding_results >= abs(self.desired_results))): break - assert self.outstanding >= 0 assert self.outstanding_results >= 0 # If no requests are outstanding, then we are done - if self.outstanding == 0: + if len(self.outstanding) == 0: self.finished = True result = self.generateResult() reactor.callLater(0, self.callback, *result) - def gotResponse(self, dict, node, expected_results): + def gotResponse(self, dict, node): """Receive a response from a remote node.""" - self.caller.insertNode(node) - if self.finished or self.answered.has_key(node.id): + if self.finished or self.answered.has_key(node.id) or self.failed.has_key(node.id): # a day late and a dollar short return - self.outstanding -= 1 - self.outstanding_results -= expected_results - self.answered[node.id] = 1 - self.processResponse(dict) + try: + # Process the response + self.processResponse(dict) + except Exception, e: + # Unexpected error with the response + log.msg("action %s failed on %s/%s: %r" % (self.action, node.host, node.port, e)) + if node.id != self.caller.node.id: + self.caller.nodeFailed(node) + self.failed[node.id] = 1 + else: + self.answered[node.id] = 1 + if node.id != self.caller.node.id: + reactor.callLater(0, self.caller.insertNode, node) + if self.outstanding.has_key(node.id): + self.outstanding_results -= self.outstanding[node.id] + del self.outstanding[node.id] self.schedule() - def actionFailed(self, err, node, expected_results): + def actionFailed(self, err, node): """Receive an error from a remote node.""" - log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port)) - log.err(err) - self.caller.table.nodeFailed(node) - self.outstanding -= 1 - self.outstanding_results -= expected_results + log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage())) + if node.id != self.caller.node.id: + self.caller.nodeFailed(node) + self.failed[node.id] = 1 + if self.outstanding.has_key(node.id): + self.outstanding_results -= self.outstanding[node.id] + del self.outstanding[node.id] self.schedule() def handleGotNodes(self, nodes): @@ -190,6 +229,8 @@ class ActionBase: Not called by default, but suitable for being called by L{processResponse} in a recursive node search. """ + if nodes and type(nodes) != list: + raise ValueError, "got a malformed response, from bittorrent perhaps" for compact_node in nodes: node_contact = uncompact(compact_node) node = self.caller.Node(node_contact) @@ -210,19 +251,47 @@ class ActionBase: """Generate a list of nodes to process next. This implementation is suitable for a recurring search over all nodes. + It will stop the search when the closest K nodes have been queried. + It also prematurely drops requests to nodes that have fallen way behind. + + @return: sorted list of nodes to query, or None if we are done """ + # Find the K closest nodes that haven't failed, count how many answered self.sortNodes() - return self.sorted_nodes[:self.config['K']] + closest_K = [] + ans = 0 + for node in self.sorted_nodes: + if node.id not in self.failed: + closest_K.append(node) + if node.id in self.answered: + ans += 1 + if len(closest_K) >= K: + break + + # If we have responses from the K closest nodes, then we are done + if ans >= K: + log.msg('Got the answers we need, aborting search') + return None + + # Check the oustanding requests to see if they are still closest + for id in self.outstanding.keys(): + if self.found[id] not in closest_K: + # Request is not important, allow another to go + log.msg("Request to %s/%s is taking too long, moving on" % + (self.found[id].host, self.found[id].port)) + self.outstanding_results -= self.outstanding[id] + del self.outstanding[id] + + return closest_K def generateArgs(self, node): """Generate the arguments to the node's action. - These arguments will be appended to our node ID when calling the action. Also return the number of results expected from this request. @raise ValueError: if the node should not be queried """ - return (self.target, ), 0 + return (self.caller.node.id, self.target), 0 def processResponse(self, dict): """Process the response dictionary received from the remote node.""" @@ -230,6 +299,7 @@ class ActionBase: def generateResult(self, nodes): """Create the final result to return to the L{callback} function.""" + self.stats.completedAction(self.action, self.started) return [] @@ -248,7 +318,14 @@ class FindNode(ActionBase): def generateResult(self): """Result is the K closest nodes to the target.""" self.sortNodes() - return (self.sorted_nodes[:self.config['K']], ) + self.stats.completedAction(self.action, self.started) + closest_K = [] + for node in self.sorted_nodes: + if node.id not in self.failed: + closest_K.append(node) + if len(closest_K) >= K: + break + return (closest_K, ) class FindValue(ActionBase): @@ -266,22 +343,15 @@ class FindValue(ActionBase): def generateResult(self): """Result is the nodes that have values, sorted by proximity to the key.""" self.sortNodes() + self.stats.completedAction(self.action, self.started) return ([node for node in self.sorted_nodes if node.num_values > 0], ) class GetValue(ActionBase): """Retrieve values from a list of nodes.""" - def __init__(self, caller, target, local_results, num_results, callback, config, stats, action="get_value"): - """Initialize the action with the locally available results. - - @type local_results: C{list} of C{string} - @param local_results: the values that were available in this node - """ + def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"): ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results) - if local_results: - for result in local_results: - self.results[result] = 1 def getNodesToProcess(self): """Nodes are never added, always return the same sorted node list.""" @@ -295,7 +365,7 @@ class GetValue(ActionBase): assert num_values > 0 if num_values > node.num_values: num_values = 0 - return (self.target, num_values), node.num_values + return (self.caller.node.id, self.target, num_values), node.num_values else: raise ValueError, "Don't try and get values from this node because it doesn't have any" @@ -315,6 +385,7 @@ class GetValue(ActionBase): def generateResult(self): """Results have all been returned, now send the empty list to end the action.""" + self.stats.completedAction(self.action, self.started) return (self.target, []) @@ -337,7 +408,7 @@ class StoreValue(ActionBase): def generateArgs(self, node): """Args include the value to store and the node's token.""" if node.token: - return (self.target, self.value, node.token), 1 + return (self.caller.node.id, self.target, self.value, node.token), 1 else: raise ValueError, "Don't store at this node since we don't know it's token" @@ -347,4 +418,5 @@ class StoreValue(ActionBase): def generateResult(self): """Return all the response IDs received.""" + self.stats.completedAction(self.action, self.started) return (self.target, self.value, self.results.values())