def schedule(self):
"""Schedule requests to be sent to remote nodes."""
# Check if we are already done
- if self.desired_results and len(self.results) >= self.desired_results:
+ if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
+ (self.desired_results < 0 and
+ len(self.answered) >= self.config['STORE_REDUNDANCY'])):
self.finished=1
result = self.generateResult()
reactor.callLater(0, self.callback, *result)
if self.finished or (self.desired_results and
- len(self.results) + self.outstanding_results >= self.desired_results):
+ len(self.results) + self.outstanding_results >= abs(self.desired_results)):
return
for node in self.getNodesToProcess():
# We might have to stop for now
if (self.outstanding >= self.config['CONCURRENT_REQS'] or
(self.desired_results and
- self.outstanding_results >= self.desired_results)):
+ len(self.results) + self.outstanding_results >= abs(self.desired_results))):
break
+ assert self.outstanding >= 0
+ assert self.outstanding_results >= 0
+
# If no requests are outstanding, then we are done
- assert self.outstanding >=0
if self.outstanding == 0:
self.finished = 1
result = self.generateResult()
log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
log.err(err)
self.caller.table.nodeFailed(node)
- self.answered[node.id] = 1
self.outstanding -= 1
self.outstanding_results -= expected_results
self.schedule()
def processResponse(self, dict):
"""Process the response dictionary received from the remote node."""
- pass
+ self.handleGotNodes(dict['nodes'])
def generateResult(self, nodes):
"""Create the result to return to the callback function."""
def generateArgs(self, node):
"""Args include the number of values to request."""
if node.num_values > 0:
- return (self.target, 0), node.num_values
+ # Request all desired results from each node, just to be sure.
+ num_values = abs(self.desired_results) - len(self.results)
+ assert num_values > 0
+ if num_values > node.num_values:
+ num_values = 0
+ return (self.target, num_values), node.num_values
else:
raise ValueError, "Don't try and get values from this node because it doesn't have any"