X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_dht_Khashmir%2Factions.py;h=a99b7ea7d9c5810e7cf443f4e9673a3ea1e936f7;hp=013a9a7544dcd79189efb010a0afe3cab9c430d8;hb=74e8a91349d0b55447de62ce72db692c177bcec7;hpb=dd75e47b4d4ee40dae492753a226d5a42ac73c1c diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index 013a9a7..a99b7ea 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -1,18 +1,16 @@ ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information -from time import time - from twisted.internet import reactor -import const from khash import intify class ActionBase: """ base class for some long running asynchronous proccesses like finding nodes or values """ - def __init__(self, table, target, callback): - self.table = table + def __init__(self, caller, target, callback, config): + self.caller = caller self.target = target + self.config = config self.num = intify(target) self.found = {} self.queried = {} @@ -43,21 +41,16 @@ class FindNode(ActionBase): def handleGotNodes(self, dict): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] + n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1]) + self.caller.insertNode(n) l = dict["nodes"] - sender = {'id' : dict["id"]} - sender['port'] = _krpc_sender[1] - sender['host'] = _krpc_sender[0] - sender = self.table.Node().initWithDict(sender) - sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) - self.table.table.insertNode(sender) - if self.finished or self.answered.has_key(sender.id): + if self.finished or self.answered.has_key(dict["id"]): # a day late and a dollar short return self.outstanding = self.outstanding - 1 - self.answered[sender.id] = 1 + self.answered[dict["id"]] = 1 for node in l: - n = self.table.Node().initWithDict(node) - n.conn = self.table.udp.connectionForAddr((n.host, n.port)) + n = self.caller.Node(node) if not self.found.has_key(n.id): self.found[n.id] = n self.schedule() @@ -70,28 +63,28 @@ class FindNode(ActionBase): return l = self.found.values() l.sort(self.sort) - for node in l[:const.K]: + for node in l[:self.config['K']]: if node.id == self.target: self.finished=1 return self.callback([node]) - if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: + if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id: #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.id) + df = node.findNode(self.target, self.caller.node.id) df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 - if self.outstanding >= const.CONCURRENT_REQS: + if self.outstanding >= self.config['CONCURRENT_REQS']: break assert(self.outstanding) >=0 if self.outstanding == 0: ## all done!! self.finished=1 - reactor.callLater(0, self.callback, l[:const.K]) + reactor.callLater(0, self.callback, l[:self.config['K']]) def makeMsgFailed(self, node): def defaultGotNodes(err, self=self, node=node): - print ">>> find failed %s/%s" % (node.host, node.port), err - self.table.table.nodeFailed(node) + print ">>> find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port), err + self.caller.table.nodeFailed(node) self.outstanding = self.outstanding - 1 self.schedule() return defaultGotNodes @@ -102,7 +95,7 @@ class FindNode(ActionBase): it's a transaction since we got called from the dispatcher """ for node in nodes: - if node.id == self.table.node.id: + if node.id == self.caller.node.id: continue else: self.found[node.id] = node @@ -112,31 +105,26 @@ class FindNode(ActionBase): get_value_timeout = 15 class GetValue(FindNode): - def __init__(self, table, target, callback, find="findValue"): - FindNode.__init__(self, table, target, callback) + def __init__(self, caller, target, callback, config, find="findValue"): + FindNode.__init__(self, caller, target, callback, config) self.findValue = find """ get value task """ def handleGotNodes(self, dict): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] - sender = {'id' : dict["id"]} - sender['port'] = _krpc_sender[1] - sender['host'] = _krpc_sender[0] - sender = self.table.Node().initWithDict(sender) - sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) - self.table.table.insertNode(sender) - if self.finished or self.answered.has_key(sender.id): + n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1]) + self.caller.insertNode(n) + if self.finished or self.answered.has_key(dict["id"]): # a day late and a dollar short return self.outstanding = self.outstanding - 1 - self.answered[sender.id] = 1 + self.answered[dict["id"]] = 1 # go through nodes # if we have any closer than what we already got, query them if dict.has_key('nodes'): for node in dict['nodes']: - n = self.table.Node().initWithDict(node) - n.conn = self.table.udp.connectionForAddr((n.host, n.port)) + n = self.caller.Node(node) if not self.found.has_key(n.id): self.found[n.id] = n elif dict.has_key('values'): @@ -149,7 +137,7 @@ class GetValue(FindNode): z = len(dict['values']) v = filter(None, map(x, dict['values'])) if(len(v)): - reactor.callLater(0, self.callback, v) + reactor.callLater(0, self.callback, self.target, v) self.schedule() ## get value @@ -159,26 +147,26 @@ class GetValue(FindNode): l = self.found.values() l.sort(self.sort) - for node in l[:const.K]: - if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: + for node in l[:self.config['K']]: + if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id: #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT try: f = getattr(node, self.findValue) except AttributeError: print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue) else: - df = f(self.target, self.table.node.id) + df = f(self.target, self.caller.node.id) df.addCallback(self.handleGotNodes) df.addErrback(self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 - if self.outstanding >= const.CONCURRENT_REQS: + if self.outstanding >= self.config['CONCURRENT_REQS']: break assert(self.outstanding) >=0 if self.outstanding == 0: ## all done, didn't find it!! self.finished=1 - reactor.callLater(0, self.callback,[]) + reactor.callLater(0, self.callback, self.target, []) ## get value def goWithNodes(self, nodes, found=None): @@ -187,7 +175,7 @@ class GetValue(FindNode): for n in found: self.results[n] = 1 for node in nodes: - if node.id == self.table.node.id: + if node.id == self.caller.node.id: continue else: self.found[node.id] = node @@ -196,29 +184,29 @@ class GetValue(FindNode): class StoreValue(ActionBase): - def __init__(self, table, target, value, callback, store="storeValue"): - ActionBase.__init__(self, table, target, callback) + def __init__(self, caller, target, value, callback, config, store="storeValue"): + ActionBase.__init__(self, caller, target, callback, config) self.value = value self.stored = [] self.store = store def storedValue(self, t, node): self.outstanding -= 1 - self.table.insertNode(node) + self.caller.insertNode(node) if self.finished: return self.stored.append(t) - if len(self.stored) >= const.STORE_REDUNDANCY: + if len(self.stored) >= self.config['STORE_REDUNDANCY']: self.finished=1 - self.callback(self.stored) + self.callback(self.target, self.value, self.stored) else: - if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY: + if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']: self.schedule() return t def storeFailed(self, t, node): print ">>> store failed %s/%s" % (node.host, node.port) - self.table.nodeFailed(node) + self.caller.nodeFailed(node) self.outstanding -= 1 if self.finished: return t @@ -228,25 +216,25 @@ class StoreValue(ActionBase): def schedule(self): if self.finished: return - num = const.CONCURRENT_REQS - self.outstanding - if num > const.STORE_REDUNDANCY: - num = const.STORE_REDUNDANCY + num = self.config['CONCURRENT_REQS'] - self.outstanding + if num > self.config['STORE_REDUNDANCY']: + num = self.config['STORE_REDUNDANCY'] for i in range(num): try: node = self.nodes.pop() except IndexError: if self.outstanding == 0: self.finished = 1 - self.callback(self.stored) + self.callback(self.target, self.value, self.stored) else: - if not node.id == self.table.node.id: + if not node.id == self.caller.node.id: self.outstanding += 1 try: f = getattr(node, self.store) except AttributeError: print ">>> %s doesn't have a %s method!" % (node, self.store) else: - df = f(self.target, self.value, self.table.node.id) + df = f(self.target, self.value, self.caller.node.id) df.addCallback(self.storedValue, node=node) df.addErrback(self.storeFailed, node=node) @@ -257,16 +245,17 @@ class StoreValue(ActionBase): class KeyExpirer: - def __init__(self, store): + def __init__(self, store, config): self.store = store - reactor.callLater(const.KEINITIAL_DELAY, self.doExpire) + self.config = config + self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire) def doExpire(self): - self.cut = "%0.6f" % (time() - const.KE_AGE) - self._expire() - - def _expire(self): - c = self.store.cursor() - s = "delete from kv where time < '%s';" % self.cut - c.execute(s) - reactor.callLater(const.KE_DELAY, self.doExpire) + self.store.expireValues(self.config['KE_AGE']) + self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire) + + def shutdown(self): + try: + self.next_expire.cancel() + except: + pass