X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p_Khashmir%2Factions.py;h=865561cc0ceb5e160598ec8f524afd7adf7d0bc7;hb=f453f48bb025970f13d1ec10c98b95b9e7f338bc;hp=fc8746cc86b2d572d5a49e11aed79a2266eeb1ff;hpb=394d446ab8d4bdb7f010a81e97b7ff7898a99e4a;p=quix0rs-apt-p2p.git diff --git a/apt_p2p_Khashmir/actions.py b/apt_p2p_Khashmir/actions.py index fc8746c..865561c 100644 --- a/apt_p2p_Khashmir/actions.py +++ b/apt_p2p_Khashmir/actions.py @@ -30,6 +30,8 @@ class ActionBase: keys are node IDs, values are the node itself @type answered: C{dictionary} @ivar answered: the nodes that have answered the queries + @type failed: C{dictionary} + @ivar failed: the nodes that have failed to answer the queries @type found: C{dictionary} @ivar found: nodes that have been found so far by the action @type sorted_nodes: C{list} of L{node.Node} @@ -41,10 +43,11 @@ class ActionBase: before the action should stop @type callback: C{method} @ivar callback: the method to call with the results - @type outstanding: C{int} - @ivar outstanding: the number of requests currently outstanding + @type outstanding: C{dictionary} + @ivar outstanding: the nodes that have outstanding requests for this action, + keys are node IDs, values are the number of outstanding results from the node @type outstanding_results: C{int} - @ivar outstanding_results: the number of results that are expected from + @ivar outstanding_results: the total number of results that are expected from the requests that are currently outstanding @type finished: C{boolean} @ivar finished: whether the action is done @@ -84,12 +87,13 @@ class ActionBase: self.num = intify(target) self.queried = {} self.answered = {} + self.failed = {} self.found = {} self.sorted_nodes = [] self.results = {} self.desired_results = num_results self.callback = callback - self.outstanding = 0 + self.outstanding = {} self.outstanding_results = 0 self.finished = False self.started = datetime.now() @@ -118,10 +122,14 @@ class ActionBase: if self.finished: return + # Get the nodes to be processed + nodes = self.getNodesToProcess() + # Check if we are already done - if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or - (self.desired_results < 0 and - len(self.answered) >= self.config['STORE_REDUNDANCY'])): + if nodes is None or (self.desired_results and + ((len(self.results) >= abs(self.desired_results)) or + (self.desired_results < 0 and + len(self.answered) >= self.config['STORE_REDUNDANCY']))): self.finished = True result = self.generateResult() reactor.callLater(0, self.callback, *result) @@ -133,7 +141,7 @@ class ActionBase: return # Loop for each node that should be processed - for node in self.getNodesToProcess(): + for node in nodes: # Don't send requests twice or to ourself if node.id not in self.queried: self.queried[node.id] = 1 @@ -159,47 +167,51 @@ class ActionBase: continue # Call the action on the remote node - self.outstanding += 1 + self.outstanding[node.id] = expected_results self.outstanding_results += expected_results df = defer.maybeDeferred(f, *args) reactor.callLater(0, df.addCallbacks, *(self.gotResponse, self.actionFailed), - **{'callbackArgs': (node, expected_results, df), - 'errbackArgs': (node, expected_results, df)}) + **{'callbackArgs': (node, ), + 'errbackArgs': (node, )}) # We might have to stop for now - if (self.outstanding >= self.config['CONCURRENT_REQS'] or + if (len(self.outstanding) >= self.config['CONCURRENT_REQS'] or (self.desired_results and len(self.results) + self.outstanding_results >= abs(self.desired_results))): break - assert self.outstanding >= 0 assert self.outstanding_results >= 0 # If no requests are outstanding, then we are done - if self.outstanding == 0: + if len(self.outstanding) == 0: self.finished = True result = self.generateResult() reactor.callLater(0, self.callback, *result) - def gotResponse(self, dict, node, expected_results, df): + def gotResponse(self, dict, node): """Receive a response from a remote node.""" - reactor.callLater(0, self.caller.insertNode, node) + if node.id != self.caller.node.id: + reactor.callLater(0, self.caller.insertNode, node) if self.finished or self.answered.has_key(node.id): # a day late and a dollar short return - self.outstanding -= 1 - self.outstanding_results -= expected_results self.answered[node.id] = 1 self.processResponse(dict) + if self.outstanding.has_key(node.id): + self.outstanding_results -= self.outstanding[node.id] + del self.outstanding[node.id] self.schedule() - def actionFailed(self, err, node, expected_results, df): + def actionFailed(self, err, node): """Receive an error from a remote node.""" log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage())) - self.caller.table.nodeFailed(node) - self.outstanding -= 1 - self.outstanding_results -= expected_results + if node.id != self.caller.node.id: + self.caller.table.nodeFailed(node) + self.failed[node.id] = 1 + if self.outstanding.has_key(node.id): + self.outstanding_results -= self.outstanding[node.id] + del self.outstanding[node.id] self.schedule() def handleGotNodes(self, nodes): @@ -228,9 +240,38 @@ class ActionBase: """Generate a list of nodes to process next. This implementation is suitable for a recurring search over all nodes. + It will stop the search when the closest K nodes have been queried. + It also prematurely drops requests to nodes that have fallen way behind. + + @return: sorted list of nodes to query, or None if we are done """ + # Find the K closest nodes that haven't failed, count how many answered self.sortNodes() - return self.sorted_nodes[:K] + closest_K = [] + ans = 0 + for node in self.sorted_nodes: + if node.id not in self.failed: + closest_K.append(node) + if node.id in self.answered: + ans += 1 + if len(closest_K) >= K: + break + + # If we have responses from the K closest nodes, then we are done + if ans >= K: + log.msg('Got the answers we need, aborting search') + return None + + # Check the oustanding requests to see if they are still closest + for id in self.outstanding.keys(): + if self.found[id] not in closest_K: + # Request is not important, allow another to go + log.msg("Request to %s/%s is taking too long, moving on" % + (self.found[id].host, self.found[id].port)) + self.outstanding_results -= self.outstanding[id] + del self.outstanding[id] + + return closest_K def generateArgs(self, node): """Generate the arguments to the node's action. @@ -267,7 +308,13 @@ class FindNode(ActionBase): """Result is the K closest nodes to the target.""" self.sortNodes() self.stats.completedAction(self.action, self.started) - return (self.sorted_nodes[:K], ) + closest_K = [] + for node in self.sorted_nodes: + if node.id not in self.failed: + closest_K.append(node) + if len(closest_K) >= K: + break + return (closest_K, ) class FindValue(ActionBase):