X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;ds=sidebyside;f=apt_dht_Khashmir%2Factions.py;fp=apt_dht_Khashmir%2Factions.py;h=7822579733b55a9ec1e0234767dcf1d422daf74b;hb=63c013ac1c397bfc19cbed986a09113efada0eeb;hp=6766cd9c6169dd8301cd6ff6c052d663175b0541;hpb=8f102eb1964db2ac18d4bac9e399c069a4cb616e;p=quix0rs-apt-p2p.git diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index 6766cd9..7822579 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -31,6 +31,13 @@ class ActionBase: return 0 self.sort = sort + def actionFailed(self, err, node): + log.msg("action %s failed (%s) %s/%s" % (self.__class__.__name__, self.config['PORT'], node.host, node.port)) + log.err(err) + self.caller.table.nodeFailed(node) + self.outstanding = self.outstanding - 1 + self.schedule() + def goWithNodes(self, t): pass @@ -75,7 +82,7 @@ class FindNode(ActionBase): if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id: #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT df = node.findNode(self.target, self.caller.node.id) - df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) + df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, )) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding >= self.config['CONCURRENT_REQS']: @@ -86,15 +93,6 @@ class FindNode(ActionBase): self.finished=1 reactor.callLater(0, self.callback, l[:self.config['K']]) - def makeMsgFailed(self, node): - def defaultGotNodes(err, self=self, node=node): - log.msg("find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port)) - log.err(err) - self.caller.table.nodeFailed(node) - self.outstanding = self.outstanding - 1 - self.schedule() - return defaultGotNodes - def goWithNodes(self, nodes): """ this starts the process, our argument is a transaction with t.extras being our list of nodes @@ -163,8 +161,7 @@ class GetValue(FindNode): log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue)) else: df = f(self.target, self.caller.node.id) - df.addCallback(self.handleGotNodes) - df.addErrback(self.makeMsgFailed(node)) + df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, )) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding >= self.config['CONCURRENT_REQS']: @@ -211,15 +208,6 @@ class StoreValue(ActionBase): self.schedule() return t - def storeFailed(self, t, node): - log.msg("store failed %s/%s" % (node.host, node.port)) - self.caller.nodeFailed(node) - self.outstanding -= 1 - if self.finished: - return t - self.schedule() - return t - def schedule(self): if self.finished: return @@ -242,8 +230,7 @@ class StoreValue(ActionBase): log.msg("%s doesn't have a %s method!" % (node, self.store)) else: df = f(self.target, self.value, node.token, self.caller.node.id) - df.addCallback(self.storedValue, node=node) - df.addErrback(self.storeFailed, node=node) + df.addCallbacks(self.storedValue, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, )) def goWithNodes(self, nodes): self.nodes = nodes