X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_dht_Khashmir%2Factions.py;h=72637110860cd8fbcb6acbf1c630cae2783d8efa;hb=9400d298505b852ab82e78e1f9b9d82c3f6aeb34;hp=4214e4eb932580230aa0552a5270d86eba10b124;hpb=4bf5774eb039f6064b830c0a65a9d62a1e4bc62f;p=quix0rs-apt-p2p.git diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index 4214e4e..7263711 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -1,16 +1,15 @@ ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information -from time import time - from twisted.internet import reactor +from twisted.python import log from khash import intify class ActionBase: """ base class for some long running asynchronous proccesses like finding nodes or values """ - def __init__(self, table, target, callback, config): - self.table = table + def __init__(self, caller, target, callback, config): + self.caller = caller self.target = target self.config = config self.num = intify(target) @@ -43,21 +42,16 @@ class FindNode(ActionBase): def handleGotNodes(self, dict): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] + n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1]) + self.caller.insertNode(n) l = dict["nodes"] - sender = {'id' : dict["id"]} - sender['port'] = _krpc_sender[1] - sender['host'] = _krpc_sender[0] - sender = self.table.Node().initWithDict(sender) - sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) - self.table.table.insertNode(sender) - if self.finished or self.answered.has_key(sender.id): + if self.finished or self.answered.has_key(dict["id"]): # a day late and a dollar short return self.outstanding = self.outstanding - 1 - self.answered[sender.id] = 1 + self.answered[dict["id"]] = 1 for node in l: - n = self.table.Node().initWithDict(node) - n.conn = self.table.udp.connectionForAddr((n.host, n.port)) + n = self.caller.Node(node) if not self.found.has_key(n.id): self.found[n.id] = n self.schedule() @@ -74,15 +68,15 @@ class FindNode(ActionBase): if node.id == self.target: self.finished=1 return self.callback([node]) - if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: + if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id: #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.id) + df = node.findNode(self.target, self.caller.node.id) df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding >= self.config['CONCURRENT_REQS']: break - assert(self.outstanding) >=0 + assert self.outstanding >=0 if self.outstanding == 0: ## all done!! self.finished=1 @@ -90,8 +84,9 @@ class FindNode(ActionBase): def makeMsgFailed(self, node): def defaultGotNodes(err, self=self, node=node): - print ">>> find failed %s/%s" % (node.host, node.port), err - self.table.table.nodeFailed(node) + log.msg("find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port)) + log.err(err) + self.caller.table.nodeFailed(node) self.outstanding = self.outstanding - 1 self.schedule() return defaultGotNodes @@ -102,7 +97,7 @@ class FindNode(ActionBase): it's a transaction since we got called from the dispatcher """ for node in nodes: - if node.id == self.table.node.id: + if node.id == self.caller.node.id: continue else: self.found[node.id] = node @@ -112,31 +107,26 @@ class FindNode(ActionBase): get_value_timeout = 15 class GetValue(FindNode): - def __init__(self, table, target, callback, config, find="findValue"): - FindNode.__init__(self, table, target, callback, config) + def __init__(self, caller, target, callback, config, find="findValue"): + FindNode.__init__(self, caller, target, callback, config) self.findValue = find """ get value task """ def handleGotNodes(self, dict): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] - sender = {'id' : dict["id"]} - sender['port'] = _krpc_sender[1] - sender['host'] = _krpc_sender[0] - sender = self.table.Node().initWithDict(sender) - sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) - self.table.table.insertNode(sender) - if self.finished or self.answered.has_key(sender.id): + n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1]) + self.caller.insertNode(n) + if self.finished or self.answered.has_key(dict["id"]): # a day late and a dollar short return self.outstanding = self.outstanding - 1 - self.answered[sender.id] = 1 + self.answered[dict["id"]] = 1 # go through nodes # if we have any closer than what we already got, query them if dict.has_key('nodes'): for node in dict['nodes']: - n = self.table.Node().initWithDict(node) - n.conn = self.table.udp.connectionForAddr((n.host, n.port)) + n = self.caller.Node(node) if not self.found.has_key(n.id): self.found[n.id] = n elif dict.has_key('values'): @@ -160,21 +150,21 @@ class GetValue(FindNode): l.sort(self.sort) for node in l[:self.config['K']]: - if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: + if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id: #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT try: f = getattr(node, self.findValue) except AttributeError: - print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue) + log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue)) else: - df = f(self.target, self.table.node.id) + df = f(self.target, self.caller.node.id) df.addCallback(self.handleGotNodes) df.addErrback(self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding >= self.config['CONCURRENT_REQS']: break - assert(self.outstanding) >=0 + assert self.outstanding >=0 if self.outstanding == 0: ## all done, didn't find it!! self.finished=1 @@ -187,7 +177,7 @@ class GetValue(FindNode): for n in found: self.results[n] = 1 for node in nodes: - if node.id == self.table.node.id: + if node.id == self.caller.node.id: continue else: self.found[node.id] = node @@ -196,15 +186,16 @@ class GetValue(FindNode): class StoreValue(ActionBase): - def __init__(self, table, target, value, callback, config, store="storeValue"): - ActionBase.__init__(self, table, target, callback, config) + def __init__(self, caller, target, value, originated, callback, config, store="storeValue"): + ActionBase.__init__(self, caller, target, callback, config) self.value = value + self.originated = originated self.stored = [] self.store = store def storedValue(self, t, node): self.outstanding -= 1 - self.table.insertNode(node) + self.caller.insertNode(node) if self.finished: return self.stored.append(t) @@ -217,8 +208,8 @@ class StoreValue(ActionBase): return t def storeFailed(self, t, node): - print ">>> store failed %s/%s" % (node.host, node.port) - self.table.nodeFailed(node) + log.msg("store failed %s/%s" % (node.host, node.port)) + self.caller.nodeFailed(node) self.outstanding -= 1 if self.finished: return t @@ -239,14 +230,14 @@ class StoreValue(ActionBase): self.finished = 1 self.callback(self.target, self.value, self.stored) else: - if not node.id == self.table.node.id: + if not node.id == self.caller.node.id: self.outstanding += 1 try: f = getattr(node, self.store) except AttributeError: - print ">>> %s doesn't have a %s method!" % (node, self.store) + log.msg("%s doesn't have a %s method!" % (node, self.store)) else: - df = f(self.target, self.value, self.table.node.id) + df = f(self.target, self.value, self.originated, self.caller.node.id) df.addCallback(self.storedValue, node=node) df.addErrback(self.storeFailed, node=node) @@ -263,7 +254,7 @@ class KeyExpirer: self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire) def doExpire(self): - self.store.expireValues(time() - self.config['KE_AGE']) + self.store.expireValues(self.config['KE_AGE']) self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire) def shutdown(self):