X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=actions.py;h=13b386470a32658f7609a955a9ae01fd83055d87;hb=c1b5a9ab09dcbbae0e99caeb26809429a5d17ebd;hp=0a6062f0ee136f21a35af19e29d0b07ed1bdbb93;hpb=b2de61b864a5ee74afc9b6eafb4a64ff31ba1ba0;p=quix0rs-apt-p2p.git diff --git a/actions.py b/actions.py index 0a6062f..13b3864 100644 --- a/actions.py +++ b/actions.py @@ -4,14 +4,12 @@ from pickle import loads, dumps from bsddb3 import db from const import reactor +import const from hash import intify from knode import KNode as Node from ktable import KTable, K -# concurrent FIND_NODE/VALUE requests! -N = 3 - class ActionBase: """ base class for some long running asynchronous proccesses like finding nodes or values """ def __init__(self, table, target, callback): @@ -45,8 +43,11 @@ FIND_NODE_TIMEOUT = 15 class FindNode(ActionBase): """ find node action merits it's own class as it is a long running stateful process """ def handleGotNodes(self, args): + args, conn = args l, sender = args + sender['host'] = conn['host'] sender = Node().initWithDict(sender) + self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short return @@ -56,7 +57,6 @@ class FindNode(ActionBase): n = Node().initWithDict(node) if not self.found.has_key(n.id): self.found[n.id] = n - self.table.insertNode(n) self.schedule() def schedule(self): @@ -68,17 +68,17 @@ class FindNode(ActionBase): l = self.found.values() l.sort(self.sort) - for node in l[:K]: + for node in l: if node.id == self.target: self.finished=1 return self.callback([node]) if not self.queried.has_key(node.id) and node.id != self.table.node.id: #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) + df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 - if self.outstanding >= N: + if self.outstanding >= const.CONCURRENT_REQS: break assert(self.outstanding) >=0 if self.outstanding == 0: @@ -86,12 +86,14 @@ class FindNode(ActionBase): self.finished=1 reactor.callFromThread(self.callback, l[:K]) - def defaultGotNodes(self, t): - if self.finished: - return - self.outstanding = self.outstanding - 1 - self.schedule() - + def makeMsgFailed(self, node): + def defaultGotNodes(err, self=self, node=node): + self.table.table.nodeFailed(node) + if self.finished: + return + self.outstanding = self.outstanding - 1 + self.schedule() + return defaultGotNodes def goWithNodes(self, nodes): """ @@ -101,22 +103,21 @@ class FindNode(ActionBase): for node in nodes: if node.id == self.table.node.id: continue - self.found[node.id] = node - #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding == 0: - self.callback(nodes) + else: + self.found[node.id] = node + + self.schedule() GET_VALUE_TIMEOUT = 15 class GetValue(FindNode): """ get value task """ def handleGotNodes(self, args): + args, conn = args l, sender = args + sender['host'] = conn['host'] sender = Node().initWithDict(sender) + self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short return @@ -129,13 +130,14 @@ class GetValue(FindNode): n = Node().initWithDict(node) if not self.found.has_key(n.id): self.found[n.id] = n - self.table.insertNode(n) elif l.has_key('values'): def x(y, z=self.results): y = y.data if not z.has_key(y): z[y] = 1 return y + else: + return None v = filter(None, map(x, l['values'])) if(len(v)): reactor.callFromThread(self.callback, v) @@ -148,15 +150,15 @@ class GetValue(FindNode): l = self.found.values() l.sort(self.sort) - for node in l[:K]: + for node in l: if not self.queried.has_key(node.id) and node.id != self.table.node.id: #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT df = node.findValue(self.target, self.table.node.senderDict()) df.addCallback(self.handleGotNodes) - df.addErrback(self.defaultGotNodes) + df.addErrback(self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 - if self.outstanding >= N: + if self.outstanding >= const.CONCURRENT_REQS: break assert(self.outstanding) >=0 if self.outstanding == 0: @@ -165,35 +167,30 @@ class GetValue(FindNode): reactor.callFromThread(self.callback,[]) ## get value - def goWithNodes(self, nodes): + def goWithNodes(self, nodes, found=None): self.results = {} + if found: + for n in found: + self.results[n] = 1 for node in nodes: if node.id == self.table.node.id: continue - self.found[node.id] = node - #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findValue(self.target, self.table.node.senderDict()) - df.addCallback(self.handleGotNodes) - df.addErrback(self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding == 0: - reactor.callFromThread(self.callback, []) + else: + self.found[node.id] = node + + self.schedule() -KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours -KE_DELAY = 60 * 60 # 1 hour -KE_AGE = KEINITIAL_DELAY class KeyExpirer: def __init__(self, store, itime, kw): self.store = store self.itime = itime self.kw = kw - reactor.callLater(KEINITIAL_DELAY, self.doExpire) + reactor.callLater(const.KEINITIAL_DELAY, self.doExpire) def doExpire(self): - self.cut = `time() - KE_AGE` + self.cut = `time() - const.KE_AGE` self._expire() def _expire(self): @@ -231,7 +228,7 @@ class KeyExpirer: break irec = f() - reactor.callLater(KE_DELAY, self.doExpire) + reactor.callLater(const.KE_DELAY, self.doExpire) if(i > 0): print ">>>KE: done expiring %d" % i \ No newline at end of file