- def makeMsgFailed(self, node):
- def defaultGotNodes(err, self=self, node=node):
- self.table.table.nodeFailed(node)
- if self.finished:
- return
- self.outstanding = self.outstanding - 1
- self.schedule()
- return defaultGotNodes
+ for node in l[:K]:
+ if node.id == self.target:
+ self.finished=1
+ return self.callback([node])
+ if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
+ #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
+ df = node.findNode(self.target, self.table.node.senderDict())
+ df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
+ self.outstanding = self.outstanding + 1
+ self.queried[node.id] = 1
+ if self.outstanding >= const.CONCURRENT_REQS:
+ break
+ assert(self.outstanding) >=0
+ if self.outstanding == 0:
+ ## all done!!
+ self.finished=1
+ reactor.callFromThread(self.callback, l[:K])
+
+ def makeMsgFailed(self, node):
+ def defaultGotNodes(err, self=self, node=node):
+ self.table.table.nodeFailed(node)
+ self.outstanding = self.outstanding - 1
+ self.schedule()
+ return defaultGotNodes
+
+ def goWithNodes(self, nodes):
+ """
+ this starts the process, our argument is a transaction with t.extras being our list of nodes
+ it's a transaction since we got called from the dispatcher
+ """
+ for node in nodes:
+ if node.id == self.table.node.id:
+ continue
+ else:
+ self.found[node.id] = node
+
+ self.schedule()