- # go through nodes
- # if we have any closer than what we already got, query them
- if dict.has_key('nodes'):
- for compact_node in dict['nodes']:
- node = uncompact(compact_node)
- n = self.caller.Node(node)
- if not self.found.has_key(n.id):
- self.found[n.id] = n
- elif dict.has_key('values'):
+ for compact_node in l:
+ node = uncompact(compact_node)
+ n = self.caller.Node(node)
+ if not self.found.has_key(n.id):
+ self.found[n.id] = n
+ self.schedule()
+
+ def schedule(self):
+ """
+ send messages to new peers, if necessary
+ """
+ if self.finished:
+ return
+ l = self.found.values()
+ l.sort(self.sort)
+ for node in l[:self.config['K']]:
+ if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
+ df = node.findValue(self.target, self.caller.node.id)
+ df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
+ self.outstanding = self.outstanding + 1
+ self.queried[node.id] = 1
+ if self.outstanding >= self.config['CONCURRENT_REQS']:
+ break
+ assert self.outstanding >=0
+ if self.outstanding == 0:
+ ## all done!!
+ self.finished=1
+ l = [node for node in self.found.values() if node.num_values > 0]
+ reactor.callLater(0, self.callback, l)
+
+ def goWithNodes(self, nodes):
+ """
+ this starts the process, our argument is a transaction with t.extras being our list of nodes
+ it's a transaction since we got called from the dispatcher
+ """
+ for node in nodes:
+ if node.id == self.caller.node.id:
+ continue
+ else:
+ self.found[node.id] = node
+
+ self.schedule()
+
+
+class GetValue(ActionBase):
+ def __init__(self, caller, target, num, callback, config, action="getValue"):
+ ActionBase.__init__(self, caller, target, callback, config)
+ self.num_values = num
+ self.outstanding_gets = 0
+ self.action = action
+
+ def gotValues(self, dict, node):
+ dict = dict['rsp']
+ self.outstanding -= 1
+ self.caller.insertNode(node)
+ if self.finished:
+ return
+ if dict.has_key('values'):