X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_p2p_Khashmir%2Factions.py;h=865561cc0ceb5e160598ec8f524afd7adf7d0bc7;hp=95aef72aaa9a1055fe1bc74664f50aa27ce589b6;hb=f8dcf10a0d4dac1f40651758cb5d6b42a3b8e61e;hpb=dc557e76d53d4cf82769ebf078aaa77fb3c6e74a diff --git a/apt_p2p_Khashmir/actions.py b/apt_p2p_Khashmir/actions.py index 95aef72..865561c 100644 --- a/apt_p2p_Khashmir/actions.py +++ b/apt_p2p_Khashmir/actions.py @@ -1,5 +1,3 @@ -## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information """Details of how to perform actions on remote peers.""" @@ -32,6 +30,8 @@ class ActionBase: keys are node IDs, values are the node itself @type answered: C{dictionary} @ivar answered: the nodes that have answered the queries + @type failed: C{dictionary} + @ivar failed: the nodes that have failed to answer the queries @type found: C{dictionary} @ivar found: nodes that have been found so far by the action @type sorted_nodes: C{list} of L{node.Node} @@ -43,10 +43,11 @@ class ActionBase: before the action should stop @type callback: C{method} @ivar callback: the method to call with the results - @type outstanding: C{int} - @ivar outstanding: the number of requests currently outstanding + @type outstanding: C{dictionary} + @ivar outstanding: the nodes that have outstanding requests for this action, + keys are node IDs, values are the number of outstanding results from the node @type outstanding_results: C{int} - @ivar outstanding_results: the number of results that are expected from + @ivar outstanding_results: the total number of results that are expected from the requests that are currently outstanding @type finished: C{boolean} @ivar finished: whether the action is done @@ -86,12 +87,13 @@ class ActionBase: self.num = intify(target) self.queried = {} self.answered = {} + self.failed = {} self.found = {} self.sorted_nodes = [] self.results = {} self.desired_results = num_results self.callback = callback - self.outstanding = 0 + self.outstanding = {} self.outstanding_results = 0 self.finished = False self.started = datetime.now() @@ -117,20 +119,29 @@ class ActionBase: def schedule(self): """Schedule requests to be sent to remote nodes.""" + if self.finished: + return + + # Get the nodes to be processed + nodes = self.getNodesToProcess() + # Check if we are already done - if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or - (self.desired_results < 0 and - len(self.answered) >= self.config['STORE_REDUNDANCY'])): + if nodes is None or (self.desired_results and + ((len(self.results) >= abs(self.desired_results)) or + (self.desired_results < 0 and + len(self.answered) >= self.config['STORE_REDUNDANCY']))): self.finished = True result = self.generateResult() reactor.callLater(0, self.callback, *result) + return - if self.finished or (self.desired_results and - len(self.results) + self.outstanding_results >= abs(self.desired_results)): + # Check if we have enough outstanding results coming + if (self.desired_results and + len(self.results) + self.outstanding_results >= abs(self.desired_results)): return # Loop for each node that should be processed - for node in self.getNodesToProcess(): + for node in nodes: # Don't send requests twice or to ourself if node.id not in self.queried: self.queried[node.id] = 1 @@ -156,48 +167,51 @@ class ActionBase: continue # Call the action on the remote node - self.outstanding += 1 + self.outstanding[node.id] = expected_results self.outstanding_results += expected_results df = defer.maybeDeferred(f, *args) reactor.callLater(0, df.addCallbacks, *(self.gotResponse, self.actionFailed), - **{'callbackArgs': (node, expected_results, df), - 'errbackArgs': (node, expected_results, df)}) + **{'callbackArgs': (node, ), + 'errbackArgs': (node, )}) # We might have to stop for now - if (self.outstanding >= self.config['CONCURRENT_REQS'] or + if (len(self.outstanding) >= self.config['CONCURRENT_REQS'] or (self.desired_results and len(self.results) + self.outstanding_results >= abs(self.desired_results))): break - assert self.outstanding >= 0 assert self.outstanding_results >= 0 # If no requests are outstanding, then we are done - if self.outstanding == 0: + if len(self.outstanding) == 0: self.finished = True result = self.generateResult() reactor.callLater(0, self.callback, *result) - def gotResponse(self, dict, node, expected_results, df): + def gotResponse(self, dict, node): """Receive a response from a remote node.""" - self.caller.insertNode(node) + if node.id != self.caller.node.id: + reactor.callLater(0, self.caller.insertNode, node) if self.finished or self.answered.has_key(node.id): # a day late and a dollar short return - self.outstanding -= 1 - self.outstanding_results -= expected_results self.answered[node.id] = 1 self.processResponse(dict) + if self.outstanding.has_key(node.id): + self.outstanding_results -= self.outstanding[node.id] + del self.outstanding[node.id] self.schedule() - def actionFailed(self, err, node, expected_results, df): + def actionFailed(self, err, node): """Receive an error from a remote node.""" - log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port)) - log.err(err) - self.caller.table.nodeFailed(node) - self.outstanding -= 1 - self.outstanding_results -= expected_results + log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage())) + if node.id != self.caller.node.id: + self.caller.table.nodeFailed(node) + self.failed[node.id] = 1 + if self.outstanding.has_key(node.id): + self.outstanding_results -= self.outstanding[node.id] + del self.outstanding[node.id] self.schedule() def handleGotNodes(self, nodes): @@ -226,9 +240,38 @@ class ActionBase: """Generate a list of nodes to process next. This implementation is suitable for a recurring search over all nodes. + It will stop the search when the closest K nodes have been queried. + It also prematurely drops requests to nodes that have fallen way behind. + + @return: sorted list of nodes to query, or None if we are done """ + # Find the K closest nodes that haven't failed, count how many answered self.sortNodes() - return self.sorted_nodes[:K] + closest_K = [] + ans = 0 + for node in self.sorted_nodes: + if node.id not in self.failed: + closest_K.append(node) + if node.id in self.answered: + ans += 1 + if len(closest_K) >= K: + break + + # If we have responses from the K closest nodes, then we are done + if ans >= K: + log.msg('Got the answers we need, aborting search') + return None + + # Check the oustanding requests to see if they are still closest + for id in self.outstanding.keys(): + if self.found[id] not in closest_K: + # Request is not important, allow another to go + log.msg("Request to %s/%s is taking too long, moving on" % + (self.found[id].host, self.found[id].port)) + self.outstanding_results -= self.outstanding[id] + del self.outstanding[id] + + return closest_K def generateArgs(self, node): """Generate the arguments to the node's action. @@ -265,7 +308,13 @@ class FindNode(ActionBase): """Result is the K closest nodes to the target.""" self.sortNodes() self.stats.completedAction(self.action, self.started) - return (self.sorted_nodes[:K], ) + closest_K = [] + for node in self.sorted_nodes: + if node.id not in self.failed: + closest_K.append(node) + if len(closest_K) >= K: + break + return (closest_K, ) class FindValue(ActionBase):