Speed up the recursive DHT actions when timeouts occur.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / actions.py
index 2d389b41a7b8c65b108eeb7986414b41dbe749e2..865561cc0ceb5e160598ec8f524afd7adf7d0bc7 100644 (file)
@@ -30,6 +30,8 @@ class ActionBase:
         keys are node IDs, values are the node itself
     @type answered: C{dictionary}
     @ivar answered: the nodes that have answered the queries
+    @type failed: C{dictionary}
+    @ivar failed: the nodes that have failed to answer the queries
     @type found: C{dictionary}
     @ivar found: nodes that have been found so far by the action
     @type sorted_nodes: C{list} of L{node.Node}
@@ -41,10 +43,11 @@ class ActionBase:
         before the action should stop
     @type callback: C{method}
     @ivar callback: the method to call with the results
-    @type outstanding: C{int}
-    @ivar outstanding: the number of requests currently outstanding
+    @type outstanding: C{dictionary}
+    @ivar outstanding: the nodes that have outstanding requests for this action,
+        keys are node IDs, values are the number of outstanding results from the node
     @type outstanding_results: C{int}
-    @ivar outstanding_results: the number of results that are expected from
+    @ivar outstanding_results: the total number of results that are expected from
         the requests that are currently outstanding
     @type finished: C{boolean}
     @ivar finished: whether the action is done
@@ -84,12 +87,13 @@ class ActionBase:
         self.num = intify(target)
         self.queried = {}
         self.answered = {}
+        self.failed = {}
         self.found = {}
         self.sorted_nodes = []
         self.results = {}
         self.desired_results = num_results
         self.callback = callback
-        self.outstanding = 0
+        self.outstanding = {}
         self.outstanding_results = 0
         self.finished = False
         self.started = datetime.now()
@@ -118,10 +122,14 @@ class ActionBase:
         if self.finished:
             return
         
+        # Get the nodes to be processed
+        nodes = self.getNodesToProcess()
+        
         # Check if we are already done
-        if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
-                                     (self.desired_results < 0 and
-                                      len(self.answered) >= self.config['STORE_REDUNDANCY'])):
+        if nodes is None or (self.desired_results and
+                             ((len(self.results) >= abs(self.desired_results)) or
+                              (self.desired_results < 0 and
+                               len(self.answered) >= self.config['STORE_REDUNDANCY']))):
             self.finished = True
             result = self.generateResult()
             reactor.callLater(0, self.callback, *result)
@@ -133,7 +141,7 @@ class ActionBase:
             return
         
         # Loop for each node that should be processed
-        for node in self.getNodesToProcess():
+        for node in nodes:
             # Don't send requests twice or to ourself
             if node.id not in self.queried:
                 self.queried[node.id] = 1
@@ -159,49 +167,51 @@ class ActionBase:
                     continue
 
                 # Call the action on the remote node
-                self.outstanding += 1
+                self.outstanding[node.id] = expected_results
                 self.outstanding_results += expected_results
                 df = defer.maybeDeferred(f, *args)
                 reactor.callLater(0, df.addCallbacks,
                                   *(self.gotResponse, self.actionFailed),
-                                  **{'callbackArgs': (node, expected_results, df),
-                                     'errbackArgs': (node, expected_results, df)})
+                                  **{'callbackArgs': (node, ),
+                                     'errbackArgs': (node, )})
                         
             # We might have to stop for now
-            if (self.outstanding >= self.config['CONCURRENT_REQS'] or
+            if (len(self.outstanding) >= self.config['CONCURRENT_REQS'] or
                 (self.desired_results and
                  len(self.results) + self.outstanding_results >= abs(self.desired_results))):
                 break
             
-        assert self.outstanding >= 0
         assert self.outstanding_results >= 0
 
         # If no requests are outstanding, then we are done
-        if self.outstanding == 0:
+        if len(self.outstanding) == 0:
             self.finished = True
             result = self.generateResult()
             reactor.callLater(0, self.callback, *result)
 
-    def gotResponse(self, dict, node, expected_results, df):
+    def gotResponse(self, dict, node):
         """Receive a response from a remote node."""
         if node.id != self.caller.node.id:
             reactor.callLater(0, self.caller.insertNode, node)
         if self.finished or self.answered.has_key(node.id):
             # a day late and a dollar short
             return
-        self.outstanding -= 1
-        self.outstanding_results -= expected_results
         self.answered[node.id] = 1
         self.processResponse(dict)
+        if self.outstanding.has_key(node.id):
+            self.outstanding_results -= self.outstanding[node.id]
+            del self.outstanding[node.id]
         self.schedule()
 
-    def actionFailed(self, err, node, expected_results, df):
+    def actionFailed(self, err, node):
         """Receive an error from a remote node."""
         log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage()))
         if node.id != self.caller.node.id:
             self.caller.table.nodeFailed(node)
-        self.outstanding -= 1
-        self.outstanding_results -= expected_results
+        self.failed[node.id] = 1
+        if self.outstanding.has_key(node.id):
+            self.outstanding_results -= self.outstanding[node.id]
+            del self.outstanding[node.id]
         self.schedule()
     
     def handleGotNodes(self, nodes):
@@ -230,9 +240,38 @@ class ActionBase:
         """Generate a list of nodes to process next.
         
         This implementation is suitable for a recurring search over all nodes.
+        It will stop the search when the closest K nodes have been queried.
+        It also prematurely drops requests to nodes that have fallen way behind.
+        
+        @return: sorted list of nodes to query, or None if we are done
         """
+        # Find the K closest nodes that haven't failed, count how many answered
         self.sortNodes()
-        return self.sorted_nodes[:K]
+        closest_K = []
+        ans = 0
+        for node in self.sorted_nodes:
+            if node.id not in self.failed:
+                closest_K.append(node)
+                if node.id in self.answered:
+                    ans += 1
+                if len(closest_K) >= K:
+                    break
+        
+        # If we have responses from the K closest nodes, then we are done
+        if ans >= K:
+            log.msg('Got the answers we need, aborting search')
+            return None
+        
+        # Check the oustanding requests to see if they are still closest
+        for id in self.outstanding.keys():
+            if self.found[id] not in closest_K:
+                # Request is not important, allow another to go
+                log.msg("Request to %s/%s is taking too long, moving on" %
+                        (self.found[id].host, self.found[id].port))
+                self.outstanding_results -= self.outstanding[id]
+                del self.outstanding[id]
+
+        return closest_K
     
     def generateArgs(self, node):
         """Generate the arguments to the node's action.
@@ -269,7 +308,13 @@ class FindNode(ActionBase):
         """Result is the K closest nodes to the target."""
         self.sortNodes()
         self.stats.completedAction(self.action, self.started)
-        return (self.sorted_nodes[:K], )
+        closest_K = []
+        for node in self.sorted_nodes:
+            if node.id not in self.failed:
+                closest_K.append(node)
+                if len(closest_K) >= K:
+                    break
+        return (closest_K, )
     
 
 class FindValue(ActionBase):