X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=actions.py;h=13b386470a32658f7609a955a9ae01fd83055d87;hb=c1b5a9ab09dcbbae0e99caeb26809429a5d17ebd;hp=a4d9514f0de6a8c833c8a1b42c788f6fd4ab2d41;hpb=e0d99ef1d31cef0cf5aec945030bb348ad1ab3d7;p=quix0rs-apt-p2p.git diff --git a/actions.py b/actions.py index a4d9514..13b3864 100644 --- a/actions.py +++ b/actions.py @@ -1,10 +1,14 @@ +from time import time +from pickle import loads, dumps + +from bsddb3 import db + from const import reactor +import const from hash import intify from knode import KNode as Node from ktable import KTable, K -# concurrent FIND_NODE/VALUE requests! -N = 3 class ActionBase: """ base class for some long running asynchronous proccesses like finding nodes or values """ @@ -39,17 +43,20 @@ FIND_NODE_TIMEOUT = 15 class FindNode(ActionBase): """ find node action merits it's own class as it is a long running stateful process """ def handleGotNodes(self, args): + args, conn = args l, sender = args - if self.finished or self.answered.has_key(sender['id']): + sender['host'] = conn['host'] + sender = Node().initWithDict(sender) + self.table.table.insertNode(sender) + if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short return self.outstanding = self.outstanding - 1 - self.answered[sender['id']] = 1 + self.answered[sender.id] = 1 for node in l: - if not self.found.has_key(node['id']): - n = Node(node['id'], node['host'], node['port']) + n = Node().initWithDict(node) + if not self.found.has_key(n.id): self.found[n.id] = n - self.table.insertNode(n) self.schedule() def schedule(self): @@ -61,17 +68,17 @@ class FindNode(ActionBase): l = self.found.values() l.sort(self.sort) - for node in l[:K]: + for node in l: if node.id == self.target: self.finished=1 return self.callback([node]) if not self.queried.has_key(node.id) and node.id != self.table.node.id: #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) + df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 - if self.outstanding >= N: + if self.outstanding >= const.CONCURRENT_REQS: break assert(self.outstanding) >=0 if self.outstanding == 0: @@ -79,12 +86,14 @@ class FindNode(ActionBase): self.finished=1 reactor.callFromThread(self.callback, l[:K]) - def defaultGotNodes(self, t): - if self.finished: - return - self.outstanding = self.outstanding - 1 - self.schedule() - + def makeMsgFailed(self, node): + def defaultGotNodes(err, self=self, node=node): + self.table.table.nodeFailed(node) + if self.finished: + return + self.outstanding = self.outstanding - 1 + self.schedule() + return defaultGotNodes def goWithNodes(self, nodes): """ @@ -94,39 +103,41 @@ class FindNode(ActionBase): for node in nodes: if node.id == self.table.node.id: continue - self.found[node.id] = node - #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding == 0: - self.callback(nodes) + else: + self.found[node.id] = node + + self.schedule() GET_VALUE_TIMEOUT = 15 class GetValue(FindNode): """ get value task """ def handleGotNodes(self, args): + args, conn = args l, sender = args - if self.finished or self.answered.has_key(sender['id']): + sender['host'] = conn['host'] + sender = Node().initWithDict(sender) + self.table.table.insertNode(sender) + if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short return self.outstanding = self.outstanding - 1 - self.answered[sender['id']] = 1 + self.answered[sender.id] = 1 # go through nodes # if we have any closer than what we already got, query them if l.has_key('nodes'): for node in l['nodes']: - if not self.found.has_key(node['id']): - n = Node(node['id'], node['host'], node['port']) + n = Node().initWithDict(node) + if not self.found.has_key(n.id): self.found[n.id] = n - self.table.insertNode(n) elif l.has_key('values'): def x(y, z=self.results): + y = y.data if not z.has_key(y): z[y] = 1 return y + else: + return None v = filter(None, map(x, l['values'])) if(len(v)): reactor.callFromThread(self.callback, v) @@ -139,15 +150,15 @@ class GetValue(FindNode): l = self.found.values() l.sort(self.sort) - for node in l[:K]: + for node in l: if not self.queried.has_key(node.id) and node.id != self.table.node.id: #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT df = node.findValue(self.target, self.table.node.senderDict()) df.addCallback(self.handleGotNodes) - df.addErrback(self.defaultGotNodes) + df.addErrback(self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 - if self.outstanding >= N: + if self.outstanding >= const.CONCURRENT_REQS: break assert(self.outstanding) >=0 if self.outstanding == 0: @@ -156,17 +167,68 @@ class GetValue(FindNode): reactor.callFromThread(self.callback,[]) ## get value - def goWithNodes(self, nodes): + def goWithNodes(self, nodes, found=None): self.results = {} + if found: + for n in found: + self.results[n] = 1 for node in nodes: if node.id == self.table.node.id: continue - self.found[node.id] = node - #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findValue(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding == 0: - reactor.callFromThread(self.callback, []) + else: + self.found[node.id] = node + + self.schedule() + + +class KeyExpirer: + def __init__(self, store, itime, kw): + self.store = store + self.itime = itime + self.kw = kw + reactor.callLater(const.KEINITIAL_DELAY, self.doExpire) + + def doExpire(self): + self.cut = `time() - const.KE_AGE` + self._expire() + + def _expire(self): + ic = self.itime.cursor() + sc = self.store.cursor() + kc = self.kw.cursor() + irec = None + try: + irec = ic.set_range(self.cut) + except db.DBNotFoundError: + # everything is expired + f = ic.prev + irec = f() + else: + f = ic.next + i = 0 + while irec: + it, h = irec + try: + k, v, lt = loads(self.store[h]) + except KeyError: + ic.delete() + else: + if lt < self.cut: + try: + kc.set_both(k, h) + except db.DBNotFoundError: + print "Database inconsistency! No key->value entry when a store entry was found!" + else: + kc.delete() + self.store.delete(h) + ic.delete() + i = i + 1 + else: + break + irec = f() + + reactor.callLater(const.KE_DELAY, self.doExpire) + if(i > 0): + print ">>>KE: done expiring %d" % i + \ No newline at end of file