X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_dht_Khashmir%2Factions.py;fp=apt_dht_Khashmir%2Factions.py;h=e244c49399a76f1a0598fe27c8499cadbfe4972f;hp=c80591a4dcfbb87e1889734955be6c7ba33f7351;hb=6df4eb289b9d83bd19dad8de3ca3de7283af7833;hpb=d8b63cce3887dfd61f3a8321d0c45327c4a1808b diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index c80591a..e244c49 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -50,13 +50,15 @@ class ActionBase: def schedule(self): """Schedule requests to be sent to remote nodes.""" # Check if we are already done - if self.desired_results and len(self.results) >= self.desired_results: + if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or + (self.desired_results < 0 and + len(self.answered) >= self.config['STORE_REDUNDANCY'])): self.finished=1 result = self.generateResult() reactor.callLater(0, self.callback, *result) if self.finished or (self.desired_results and - len(self.results) + self.outstanding_results >= self.desired_results): + len(self.results) + self.outstanding_results >= abs(self.desired_results)): return for node in self.getNodesToProcess(): @@ -86,11 +88,13 @@ class ActionBase: # We might have to stop for now if (self.outstanding >= self.config['CONCURRENT_REQS'] or (self.desired_results and - self.outstanding_results >= self.desired_results)): + len(self.results) + self.outstanding_results >= abs(self.desired_results))): break + assert self.outstanding >= 0 + assert self.outstanding_results >= 0 + # If no requests are outstanding, then we are done - assert self.outstanding >=0 if self.outstanding == 0: self.finished = 1 result = self.generateResult() @@ -113,7 +117,6 @@ class ActionBase: log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port)) log.err(err) self.caller.table.nodeFailed(node) - self.answered[node.id] = 1 self.outstanding -= 1 self.outstanding_results -= expected_results self.schedule() @@ -156,7 +159,7 @@ class ActionBase: def processResponse(self, dict): """Process the response dictionary received from the remote node.""" - pass + self.handleGotNodes(dict['nodes']) def generateResult(self, nodes): """Create the result to return to the callback function.""" @@ -213,7 +216,12 @@ class GetValue(ActionBase): def generateArgs(self, node): """Args include the number of values to request.""" if node.num_values > 0: - return (self.target, 0), node.num_values + # Request all desired results from each node, just to be sure. + num_values = abs(self.desired_results) - len(self.results) + assert num_values > 0 + if num_values > node.num_values: + num_values = 0 + return (self.target, num_values), node.num_values else: raise ValueError, "Don't try and get values from this node because it doesn't have any"