- def actionFailed(self, err, node):
- log.msg("action %s failed (%s) %s/%s" % (self.__class__.__name__, self.config['PORT'], node.host, node.port))
- log.err(err)
- self.caller.table.nodeFailed(node)
- self.outstanding = self.outstanding - 1
- self.schedule()
-
- def goWithNodes(self, t):
- pass
-
-
-class FindNode(ActionBase):
- """ find node action merits it's own class as it is a long running stateful process """
- def handleGotNodes(self, dict):
- _krpc_sender = dict['_krpc_sender']
- dict = dict['rsp']
- n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
- self.caller.insertNode(n)
- if dict["id"] in self.found:
- self.found[dict["id"]].updateToken(dict.get('token', ''))
- l = dict["nodes"]
- if self.finished or self.answered.has_key(dict["id"]):
- # a day late and a dollar short
- return
- self.outstanding = self.outstanding - 1
- self.answered[dict["id"]] = 1
- for compact_node in l:
- node = uncompact(compact_node)
- n = self.caller.Node(node)
- if not self.found.has_key(n.id):
- self.found[n.id] = n
- self.schedule()
-
- def schedule(self):
- """
- send messages to new peers, if necessary
- """
- if self.finished:
- return
- l = self.found.values()
- l.sort(self.sort)
- for node in l[:self.config['K']]:
- if node.id == self.target:
- self.finished=1
- return self.callback([node])
- if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
- #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
- df = node.findNode(self.target, self.caller.node.id)
- df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
- self.outstanding = self.outstanding + 1
- self.queried[node.id] = 1
- if self.outstanding >= self.config['CONCURRENT_REQS']:
- break
- assert self.outstanding >=0
- if self.outstanding == 0:
- ## all done!!
- self.finished=1
- reactor.callLater(0, self.callback, l[:self.config['K']])
-