Standardize the number of values retrieved from the DHT.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
index c80591a4dcfbb87e1889734955be6c7ba33f7351..e244c49399a76f1a0598fe27c8499cadbfe4972f 100644 (file)
@@ -50,13 +50,15 @@ class ActionBase:
     def schedule(self):
         """Schedule requests to be sent to remote nodes."""
         # Check if we are already done
-        if self.desired_results and len(self.results) >= self.desired_results:
+        if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
+                                     (self.desired_results < 0 and
+                                      len(self.answered) >= self.config['STORE_REDUNDANCY'])):
             self.finished=1
             result = self.generateResult()
             reactor.callLater(0, self.callback, *result)
 
         if self.finished or (self.desired_results and 
-                             len(self.results) + self.outstanding_results >= self.desired_results):
+                             len(self.results) + self.outstanding_results >= abs(self.desired_results)):
             return
         
         for node in self.getNodesToProcess():
@@ -86,11 +88,13 @@ class ActionBase:
             # We might have to stop for now
             if (self.outstanding >= self.config['CONCURRENT_REQS'] or
                 (self.desired_results and
-                 self.outstanding_results >= self.desired_results)):
+                 len(self.results) + self.outstanding_results >= abs(self.desired_results))):
                 break
             
+        assert self.outstanding >= 0
+        assert self.outstanding_results >= 0
+
         # If no requests are outstanding, then we are done
-        assert self.outstanding >=0
         if self.outstanding == 0:
             self.finished = 1
             result = self.generateResult()
@@ -113,7 +117,6 @@ class ActionBase:
         log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
         log.err(err)
         self.caller.table.nodeFailed(node)
-        self.answered[node.id] = 1
         self.outstanding -= 1
         self.outstanding_results -= expected_results
         self.schedule()
@@ -156,7 +159,7 @@ class ActionBase:
     
     def processResponse(self, dict):
         """Process the response dictionary received from the remote node."""
-        pass
+        self.handleGotNodes(dict['nodes'])
 
     def generateResult(self, nodes):
         """Create the result to return to the callback function."""
@@ -213,7 +216,12 @@ class GetValue(ActionBase):
     def generateArgs(self, node):
         """Args include the number of values to request."""
         if node.num_values > 0:
-            return (self.target, 0), node.num_values
+            # Request all desired results from each node, just to be sure.
+            num_values = abs(self.desired_results) - len(self.results)
+            assert num_values > 0
+            if num_values > node.num_values:
+                num_values = 0
+            return (self.target, num_values), node.num_values
         else:
             raise ValueError, "Don't try and get values from this node because it doesn't have any"