X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=actions.py;h=edaa655a89fab9322655e55edb7e2a7a5b84d5b2;hb=6533b37c36b8437d4d1fc43714c0cdbc1767eca7;hp=b870534121818c1adc806713fdc235a9cc0366d1;hpb=517b039ed7c53375d72da94bc6c4a45fdbaa2d48;p=quix0rs-apt-p2p.git diff --git a/actions.py b/actions.py index b870534..edaa655 100644 --- a/actions.py +++ b/actions.py @@ -4,6 +4,7 @@ from pickle import loads, dumps from bsddb3 import db from const import reactor +import const from hash import intify from knode import KNode as Node @@ -47,6 +48,7 @@ class FindNode(ActionBase): def handleGotNodes(self, args): l, sender = args sender = Node().initWithDict(sender) + self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short return @@ -75,7 +77,7 @@ class FindNode(ActionBase): if not self.queried.has_key(node.id) and node.id != self.table.node.id: #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) + df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding >= N: @@ -86,12 +88,14 @@ class FindNode(ActionBase): self.finished=1 reactor.callFromThread(self.callback, l[:K]) - def defaultGotNodes(self, t): - if self.finished: - return - self.outstanding = self.outstanding - 1 - self.schedule() - + def makeMsgFailed(self, node): + def defaultGotNodes(err, self=self, node=node): + self.table.table.nodeFailed(node) + if self.finished: + return + self.outstanding = self.outstanding - 1 + self.schedule() + return defaultGotNodes def goWithNodes(self, nodes): """ @@ -104,7 +108,7 @@ class FindNode(ActionBase): self.found[node.id] = node #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) + df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding == 0: @@ -117,6 +121,7 @@ class GetValue(FindNode): def handleGotNodes(self, args): l, sender = args sender = Node().initWithDict(sender) + self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short return @@ -136,6 +141,8 @@ class GetValue(FindNode): if not z.has_key(y): z[y] = 1 return y + else: + return None v = filter(None, map(x, l['values'])) if(len(v)): reactor.callFromThread(self.callback, v) @@ -153,7 +160,7 @@ class GetValue(FindNode): #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT df = node.findValue(self.target, self.table.node.senderDict()) df.addCallback(self.handleGotNodes) - df.addErrback(self.defaultGotNodes) + df.addErrback(self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding >= N: @@ -165,33 +172,35 @@ class GetValue(FindNode): reactor.callFromThread(self.callback,[]) ## get value - def goWithNodes(self, nodes): + def goWithNodes(self, nodes, found=None): self.results = {} + if found: + for n in found: + self.results[n] = 1 for node in nodes: if node.id == self.table.node.id: continue self.found[node.id] = node #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT df = node.findValue(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) + df.addCallback(self.handleGotNodes) + df.addErrback(self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding == 0: reactor.callFromThread(self.callback, []) -KEINITIAL_DELAY = 60 # 1 minute -KE_DELAY = 60 # 1 minute -KE_AGE = 60 * 5 + class KeyExpirer: def __init__(self, store, itime, kw): self.store = store self.itime = itime self.kw = kw - reactor.callLater(KEINITIAL_DELAY, self.doExpire) + reactor.callLater(const.KEINITIAL_DELAY, self.doExpire) def doExpire(self): - self.cut = `time() - KE_AGE` + self.cut = `time() - const.KE_AGE` self._expire() def _expire(self): @@ -225,9 +234,11 @@ class KeyExpirer: self.store.delete(h) ic.delete() i = i + 1 + else: + break irec = f() - reactor.callLater(KE_DELAY, self.doExpire) + reactor.callLater(const.KE_DELAY, self.doExpire) if(i > 0): print ">>>KE: done expiring %d" % i \ No newline at end of file