X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=actions.py;h=013a9a7544dcd79189efb010a0afe3cab9c430d8;hb=cc191e1f1a6b78e15cbf13def1d933cc817a13e3;hp=945832970cf3a5467a50b3d1d4211cea3e7648c1;hpb=7a5a9a80d9f9f898ec5cf07a64227e2aec963bf6;p=quix0rs-apt-p2p.git diff --git a/actions.py b/actions.py index 9458329..013a9a7 100644 --- a/actions.py +++ b/actions.py @@ -1,240 +1,272 @@ -from time import time -from pickle import loads, dumps - -from bsddb3 import db +## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information -from const import reactor +from time import time -from hash import intify -from knode import KNode as Node -from ktable import KTable, K +from twisted.internet import reactor -# concurrent FIND_NODE/VALUE requests! -N = 3 +import const +from khash import intify class ActionBase: """ base class for some long running asynchronous proccesses like finding nodes or values """ def __init__(self, table, target, callback): - self.table = table - self.target = target - self.int = intify(target) - self.found = {} - self.queried = {} - self.answered = {} - self.callback = callback - self.outstanding = 0 - self.finished = 0 - - def sort(a, b, int=self.int): - """ this function is for sorting nodes relative to the ID we are looking for """ - x, y = int ^ a.int, int ^ b.int - if x > y: - return 1 - elif x < y: - return -1 - return 0 - self.sort = sort + self.table = table + self.target = target + self.num = intify(target) + self.found = {} + self.queried = {} + self.answered = {} + self.callback = callback + self.outstanding = 0 + self.finished = 0 + def sort(a, b, num=self.num): + """ this function is for sorting nodes relative to the ID we are looking for """ + x, y = num ^ a.num, num ^ b.num + if x > y: + return 1 + elif x < y: + return -1 + return 0 + self.sort = sort + def goWithNodes(self, t): - pass - - + pass + + FIND_NODE_TIMEOUT = 15 class FindNode(ActionBase): """ find node action merits it's own class as it is a long running stateful process """ - def handleGotNodes(self, args): - l, sender = args - sender = Node().initWithDict(sender) - if self.finished or self.answered.has_key(sender.id): - # a day late and a dollar short - return - self.outstanding = self.outstanding - 1 - self.answered[sender.id] = 1 - for node in l: - n = Node().initWithDict(node) - if not self.found.has_key(n.id): - self.found[n.id] = n - self.table.insertNode(n) - self.schedule() - + def handleGotNodes(self, dict): + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] + l = dict["nodes"] + sender = {'id' : dict["id"]} + sender['port'] = _krpc_sender[1] + sender['host'] = _krpc_sender[0] + sender = self.table.Node().initWithDict(sender) + sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) + self.table.table.insertNode(sender) + if self.finished or self.answered.has_key(sender.id): + # a day late and a dollar short + return + self.outstanding = self.outstanding - 1 + self.answered[sender.id] = 1 + for node in l: + n = self.table.Node().initWithDict(node) + n.conn = self.table.udp.connectionForAddr((n.host, n.port)) + if not self.found.has_key(n.id): + self.found[n.id] = n + self.schedule() + def schedule(self): - """ - send messages to new peers, if necessary - """ - if self.finished: - return - l = self.found.values() - l.sort(self.sort) - - for node in l[:K]: - if node.id == self.target: - self.finished=1 - return self.callback([node]) - if not self.queried.has_key(node.id) and node.id != self.table.node.id: - #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding >= N: - break - assert(self.outstanding) >=0 - if self.outstanding == 0: - ## all done!! - self.finished=1 - reactor.callFromThread(self.callback, l[:K]) - - def defaultGotNodes(self, t): - if self.finished: - return - self.outstanding = self.outstanding - 1 - self.schedule() - - + """ + send messages to new peers, if necessary + """ + if self.finished: + return + l = self.found.values() + l.sort(self.sort) + for node in l[:const.K]: + if node.id == self.target: + self.finished=1 + return self.callback([node]) + if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: + #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT + df = node.findNode(self.target, self.table.node.id) + df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 + if self.outstanding >= const.CONCURRENT_REQS: + break + assert(self.outstanding) >=0 + if self.outstanding == 0: + ## all done!! + self.finished=1 + reactor.callLater(0, self.callback, l[:const.K]) + + def makeMsgFailed(self, node): + def defaultGotNodes(err, self=self, node=node): + print ">>> find failed %s/%s" % (node.host, node.port), err + self.table.table.nodeFailed(node) + self.outstanding = self.outstanding - 1 + self.schedule() + return defaultGotNodes + def goWithNodes(self, nodes): - """ - this starts the process, our argument is a transaction with t.extras being our list of nodes - it's a transaction since we got called from the dispatcher - """ - for node in nodes: - if node.id == self.table.node.id: - continue - self.found[node.id] = node - #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding == 0: - self.callback(nodes) - + """ + this starts the process, our argument is a transaction with t.extras being our list of nodes + it's a transaction since we got called from the dispatcher + """ + for node in nodes: + if node.id == self.table.node.id: + continue + else: + self.found[node.id] = node + + self.schedule() + -GET_VALUE_TIMEOUT = 15 +get_value_timeout = 15 class GetValue(FindNode): + def __init__(self, table, target, callback, find="findValue"): + FindNode.__init__(self, table, target, callback) + self.findValue = find + """ get value task """ - def handleGotNodes(self, args): - l, sender = args - sender = Node().initWithDict(sender) - if self.finished or self.answered.has_key(sender.id): - # a day late and a dollar short - return - self.outstanding = self.outstanding - 1 - self.answered[sender.id] = 1 - # go through nodes - # if we have any closer than what we already got, query them - if l.has_key('nodes'): - for node in l['nodes']: - n = Node().initWithDict(node) - if not self.found.has_key(n.id): - self.found[n.id] = n - self.table.insertNode(n) - elif l.has_key('values'): - def x(y, z=self.results): - y = y.data - if not z.has_key(y): - z[y] = 1 - return y - v = filter(None, map(x, l['values'])) - if(len(v)): - reactor.callFromThread(self.callback, v) - self.schedule() - + def handleGotNodes(self, dict): + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] + sender = {'id' : dict["id"]} + sender['port'] = _krpc_sender[1] + sender['host'] = _krpc_sender[0] + sender = self.table.Node().initWithDict(sender) + sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) + self.table.table.insertNode(sender) + if self.finished or self.answered.has_key(sender.id): + # a day late and a dollar short + return + self.outstanding = self.outstanding - 1 + self.answered[sender.id] = 1 + # go through nodes + # if we have any closer than what we already got, query them + if dict.has_key('nodes'): + for node in dict['nodes']: + n = self.table.Node().initWithDict(node) + n.conn = self.table.udp.connectionForAddr((n.host, n.port)) + if not self.found.has_key(n.id): + self.found[n.id] = n + elif dict.has_key('values'): + def x(y, z=self.results): + if not z.has_key(y): + z[y] = 1 + return y + else: + return None + z = len(dict['values']) + v = filter(None, map(x, dict['values'])) + if(len(v)): + reactor.callLater(0, self.callback, v) + self.schedule() + ## get value def schedule(self): - if self.finished: - return - l = self.found.values() - l.sort(self.sort) + if self.finished: + return + l = self.found.values() + l.sort(self.sort) + + for node in l[:const.K]: + if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: + #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT + try: + f = getattr(node, self.findValue) + except AttributeError: + print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue) + else: + df = f(self.target, self.table.node.id) + df.addCallback(self.handleGotNodes) + df.addErrback(self.makeMsgFailed(node)) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 + if self.outstanding >= const.CONCURRENT_REQS: + break + assert(self.outstanding) >=0 + if self.outstanding == 0: + ## all done, didn't find it!! + self.finished=1 + reactor.callLater(0, self.callback,[]) - for node in l[:K]: - if not self.queried.has_key(node.id) and node.id != self.table.node.id: - #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT - df = node.findValue(self.target, self.table.node.senderDict()) - df.addCallback(self.handleGotNodes) - df.addErrback(self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding >= N: - break - assert(self.outstanding) >=0 - if self.outstanding == 0: - ## all done, didn't find it!! - self.finished=1 - reactor.callFromThread(self.callback,[]) - ## get value def goWithNodes(self, nodes, found=None): - self.results = {} - if found: - for n in found: - self.results[n] = 1 - for node in nodes: - if node.id == self.table.node.id: - continue - self.found[node.id] = node - #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findValue(self.target, self.table.node.senderDict()) - df.addCallback(self.handleGotNodes) - df.addErrback(self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding == 0: - reactor.callFromThread(self.callback, []) + self.results = {} + if found: + for n in found: + self.results[n] = 1 + for node in nodes: + if node.id == self.table.node.id: + continue + else: + self.found[node.id] = node + + self.schedule() -KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours -KE_DELAY = 60 * 60 # 1 hour -KE_AGE = KEINITIAL_DELAY +class StoreValue(ActionBase): + def __init__(self, table, target, value, callback, store="storeValue"): + ActionBase.__init__(self, table, target, callback) + self.value = value + self.stored = [] + self.store = store + + def storedValue(self, t, node): + self.outstanding -= 1 + self.table.insertNode(node) + if self.finished: + return + self.stored.append(t) + if len(self.stored) >= const.STORE_REDUNDANCY: + self.finished=1 + self.callback(self.stored) + else: + if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY: + self.schedule() + return t + + def storeFailed(self, t, node): + print ">>> store failed %s/%s" % (node.host, node.port) + self.table.nodeFailed(node) + self.outstanding -= 1 + if self.finished: + return t + self.schedule() + return t + + def schedule(self): + if self.finished: + return + num = const.CONCURRENT_REQS - self.outstanding + if num > const.STORE_REDUNDANCY: + num = const.STORE_REDUNDANCY + for i in range(num): + try: + node = self.nodes.pop() + except IndexError: + if self.outstanding == 0: + self.finished = 1 + self.callback(self.stored) + else: + if not node.id == self.table.node.id: + self.outstanding += 1 + try: + f = getattr(node, self.store) + except AttributeError: + print ">>> %s doesn't have a %s method!" % (node, self.store) + else: + df = f(self.target, self.value, self.table.node.id) + df.addCallback(self.storedValue, node=node) + df.addErrback(self.storeFailed, node=node) + + def goWithNodes(self, nodes): + self.nodes = nodes + self.nodes.sort(self.sort) + self.schedule() + class KeyExpirer: - def __init__(self, store, itime, kw): - self.store = store - self.itime = itime - self.kw = kw - reactor.callLater(KEINITIAL_DELAY, self.doExpire) - + def __init__(self, store): + self.store = store + reactor.callLater(const.KEINITIAL_DELAY, self.doExpire) + def doExpire(self): - self.cut = `time() - KE_AGE` - self._expire() - + self.cut = "%0.6f" % (time() - const.KE_AGE) + self._expire() + def _expire(self): - ic = self.itime.cursor() - sc = self.store.cursor() - kc = self.kw.cursor() - irec = None - try: - irec = ic.set_range(self.cut) - except db.DBNotFoundError: - # everything is expired - f = ic.prev - irec = f() - else: - f = ic.next - i = 0 - while irec: - it, h = irec - try: - k, v, lt = loads(self.store[h]) - except KeyError: - ic.delete() - else: - if lt < self.cut: - try: - kc.set_both(k, h) - except db.DBNotFoundError: - print "Database inconsistency! No key->value entry when a store entry was found!" - else: - kc.delete() - self.store.delete(h) - ic.delete() - i = i + 1 - else: - break - irec = f() - - reactor.callLater(KE_DELAY, self.doExpire) - if(i > 0): - print ">>>KE: done expiring %d" % i - \ No newline at end of file + c = self.store.cursor() + s = "delete from kv where time < '%s';" % self.cut + c.execute(s) + reactor.callLater(const.KE_DELAY, self.doExpire)