X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_dht_Khashmir%2Factions.py;h=8da4431dcd1d110e05c9cf429ea58fc4ab3bb435;hb=d1a2027b26e3791e7e05f1247f00c279ce46ce99;hp=dc743de2e7658bcb2e598bc3ccb503964b702c6c;hpb=7dab2471d589ea439ccbfe794258863c2af858e9;p=quix0rs-apt-p2p.git diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index dc743de..8da4431 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -1,16 +1,14 @@ ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information -from time import time - from twisted.internet import reactor from khash import intify class ActionBase: """ base class for some long running asynchronous proccesses like finding nodes or values """ - def __init__(self, table, target, callback, config): - self.table = table + def __init__(self, caller, target, callback, config): + self.caller = caller self.target = target self.config = config self.num = intify(target) @@ -43,21 +41,16 @@ class FindNode(ActionBase): def handleGotNodes(self, dict): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] + n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1]) + self.caller.insertNode(n) l = dict["nodes"] - sender = {'id' : dict["id"]} - sender['port'] = _krpc_sender[1] - sender['host'] = _krpc_sender[0] - sender = self.table.Node().initWithDict(sender) - sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) - self.table.table.insertNode(sender) - if self.finished or self.answered.has_key(sender.id): + if self.finished or self.answered.has_key(dict["id"]): # a day late and a dollar short return self.outstanding = self.outstanding - 1 - self.answered[sender.id] = 1 + self.answered[dict["id"]] = 1 for node in l: - n = self.table.Node().initWithDict(node) - n.conn = self.table.udp.connectionForAddr((n.host, n.port)) + n = self.caller.Node(node) if not self.found.has_key(n.id): self.found[n.id] = n self.schedule() @@ -74,15 +67,15 @@ class FindNode(ActionBase): if node.id == self.target: self.finished=1 return self.callback([node]) - if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: + if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id: #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.id) + df = node.findNode(self.target, self.caller.node.id) df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding >= self.config['CONCURRENT_REQS']: break - assert(self.outstanding) >=0 + assert self.outstanding >=0 if self.outstanding == 0: ## all done!! self.finished=1 @@ -90,8 +83,8 @@ class FindNode(ActionBase): def makeMsgFailed(self, node): def defaultGotNodes(err, self=self, node=node): - print ">>> find failed %s/%s" % (node.host, node.port), err - self.table.table.nodeFailed(node) + print ">>> find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port), err + self.caller.table.nodeFailed(node) self.outstanding = self.outstanding - 1 self.schedule() return defaultGotNodes @@ -102,7 +95,7 @@ class FindNode(ActionBase): it's a transaction since we got called from the dispatcher """ for node in nodes: - if node.id == self.table.node.id: + if node.id == self.caller.node.id: continue else: self.found[node.id] = node @@ -112,31 +105,26 @@ class FindNode(ActionBase): get_value_timeout = 15 class GetValue(FindNode): - def __init__(self, table, target, callback, config, find="findValue"): - FindNode.__init__(self, table, target, callback, config) + def __init__(self, caller, target, callback, config, find="findValue"): + FindNode.__init__(self, caller, target, callback, config) self.findValue = find """ get value task """ def handleGotNodes(self, dict): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] - sender = {'id' : dict["id"]} - sender['port'] = _krpc_sender[1] - sender['host'] = _krpc_sender[0] - sender = self.table.Node().initWithDict(sender) - sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) - self.table.table.insertNode(sender) - if self.finished or self.answered.has_key(sender.id): + n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1]) + self.caller.insertNode(n) + if self.finished or self.answered.has_key(dict["id"]): # a day late and a dollar short return self.outstanding = self.outstanding - 1 - self.answered[sender.id] = 1 + self.answered[dict["id"]] = 1 # go through nodes # if we have any closer than what we already got, query them if dict.has_key('nodes'): for node in dict['nodes']: - n = self.table.Node().initWithDict(node) - n.conn = self.table.udp.connectionForAddr((n.host, n.port)) + n = self.caller.Node(node) if not self.found.has_key(n.id): self.found[n.id] = n elif dict.has_key('values'): @@ -149,7 +137,7 @@ class GetValue(FindNode): z = len(dict['values']) v = filter(None, map(x, dict['values'])) if(len(v)): - reactor.callLater(0, self.callback, v) + reactor.callLater(0, self.callback, self.target, v) self.schedule() ## get value @@ -160,25 +148,25 @@ class GetValue(FindNode): l.sort(self.sort) for node in l[:self.config['K']]: - if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: + if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id: #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT try: f = getattr(node, self.findValue) except AttributeError: print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue) else: - df = f(self.target, self.table.node.id) + df = f(self.target, self.caller.node.id) df.addCallback(self.handleGotNodes) df.addErrback(self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding >= self.config['CONCURRENT_REQS']: break - assert(self.outstanding) >=0 + assert self.outstanding >=0 if self.outstanding == 0: ## all done, didn't find it!! self.finished=1 - reactor.callLater(0, self.callback,[]) + reactor.callLater(0, self.callback, self.target, []) ## get value def goWithNodes(self, nodes, found=None): @@ -187,7 +175,7 @@ class GetValue(FindNode): for n in found: self.results[n] = 1 for node in nodes: - if node.id == self.table.node.id: + if node.id == self.caller.node.id: continue else: self.found[node.id] = node @@ -196,21 +184,22 @@ class GetValue(FindNode): class StoreValue(ActionBase): - def __init__(self, table, target, value, callback, config, store="storeValue"): - ActionBase.__init__(self, table, target, callback, config) + def __init__(self, caller, target, value, originated, callback, config, store="storeValue"): + ActionBase.__init__(self, caller, target, callback, config) self.value = value + self.originated = originated self.stored = [] self.store = store def storedValue(self, t, node): self.outstanding -= 1 - self.table.insertNode(node) + self.caller.insertNode(node) if self.finished: return self.stored.append(t) if len(self.stored) >= self.config['STORE_REDUNDANCY']: self.finished=1 - self.callback(self.stored) + self.callback(self.target, self.value, self.stored) else: if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']: self.schedule() @@ -218,7 +207,7 @@ class StoreValue(ActionBase): def storeFailed(self, t, node): print ">>> store failed %s/%s" % (node.host, node.port) - self.table.nodeFailed(node) + self.caller.nodeFailed(node) self.outstanding -= 1 if self.finished: return t @@ -237,16 +226,16 @@ class StoreValue(ActionBase): except IndexError: if self.outstanding == 0: self.finished = 1 - self.callback(self.stored) + self.callback(self.target, self.value, self.stored) else: - if not node.id == self.table.node.id: + if not node.id == self.caller.node.id: self.outstanding += 1 try: f = getattr(node, self.store) except AttributeError: print ">>> %s doesn't have a %s method!" % (node, self.store) else: - df = f(self.target, self.value, self.table.node.id) + df = f(self.target, self.value, self.originated, self.caller.node.id) df.addCallback(self.storedValue, node=node) df.addErrback(self.storeFailed, node=node) @@ -260,14 +249,14 @@ class KeyExpirer: def __init__(self, store, config): self.store = store self.config = config - reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire) + self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire) def doExpire(self): - self.cut = "%0.6f" % (time() - self.config['KE_AGE']) - self._expire() - - def _expire(self): - c = self.store.cursor() - s = "delete from kv where time < '%s';" % self.cut - c.execute(s) - reactor.callLater(self.config['KE_DELAY'], self.doExpire) + self.store.expireValues(self.config['KE_AGE']) + self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire) + + def shutdown(self): + try: + self.next_expire.cancel() + except: + pass