]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - actions.py
unresponsive peers are now purged from the routing table
[quix0rs-apt-p2p.git] / actions.py
index b870534121818c1adc806713fdc235a9cc0366d1..edaa655a89fab9322655e55edb7e2a7a5b84d5b2 100644 (file)
@@ -4,6 +4,7 @@ from pickle import loads, dumps
 from bsddb3 import db
 
 from const import reactor
+import const
 
 from hash import intify
 from knode import KNode as Node
@@ -47,6 +48,7 @@ class FindNode(ActionBase):
     def handleGotNodes(self, args):
        l, sender = args
        sender = Node().initWithDict(sender)
+       self.table.table.insertNode(sender)
        if self.finished or self.answered.has_key(sender.id):
            # a day late and a dollar short
            return
@@ -75,7 +77,7 @@ class FindNode(ActionBase):
            if not self.queried.has_key(node.id) and node.id != self.table.node.id:
                #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
                df = node.findNode(self.target, self.table.node.senderDict())
-               df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
+               df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
                self.outstanding = self.outstanding + 1
                self.queried[node.id] = 1
            if self.outstanding >= N:
@@ -86,12 +88,14 @@ class FindNode(ActionBase):
            self.finished=1
            reactor.callFromThread(self.callback, l[:K])
        
-    def defaultGotNodes(self, t):
-       if self.finished:
-           return
-       self.outstanding = self.outstanding - 1
-       self.schedule()
-       
+    def makeMsgFailed(self, node):
+       def defaultGotNodes(err, self=self, node=node):
+           self.table.table.nodeFailed(node)
+           if self.finished:
+               return
+           self.outstanding = self.outstanding - 1
+           self.schedule()
+       return defaultGotNodes
        
     def goWithNodes(self, nodes):
        """
@@ -104,7 +108,7 @@ class FindNode(ActionBase):
            self.found[node.id] = node
            #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
            df = node.findNode(self.target, self.table.node.senderDict())
-           df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
+           df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
            self.outstanding = self.outstanding + 1
            self.queried[node.id] = 1
        if self.outstanding == 0:
@@ -117,6 +121,7 @@ class GetValue(FindNode):
     def handleGotNodes(self, args):
        l, sender = args
        sender = Node().initWithDict(sender)
+       self.table.table.insertNode(sender)
        if self.finished or self.answered.has_key(sender.id):
            # a day late and a dollar short
            return
@@ -136,6 +141,8 @@ class GetValue(FindNode):
                if not z.has_key(y):
                    z[y] = 1
                    return y
+               else:
+                   return None
            v = filter(None, map(x, l['values']))
            if(len(v)):
                reactor.callFromThread(self.callback, v)
@@ -153,7 +160,7 @@ class GetValue(FindNode):
                #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
                df = node.findValue(self.target, self.table.node.senderDict())
                df.addCallback(self.handleGotNodes)
-               df.addErrback(self.defaultGotNodes)
+               df.addErrback(self.makeMsgFailed(node))
                self.outstanding = self.outstanding + 1
                self.queried[node.id] = 1
            if self.outstanding >= N:
@@ -165,33 +172,35 @@ class GetValue(FindNode):
            reactor.callFromThread(self.callback,[])
     
     ## get value
-    def goWithNodes(self, nodes):
+    def goWithNodes(self, nodes, found=None):
        self.results = {}
+       if found:
+           for n in found:
+               self.results[n] = 1
        for node in nodes:
            if node.id == self.table.node.id:
                continue
            self.found[node.id] = node
            #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
            df = node.findValue(self.target, self.table.node.senderDict())
-           df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
+           df.addCallback(self.handleGotNodes)
+           df.addErrback(self.makeMsgFailed(node))
            self.outstanding = self.outstanding + 1
            self.queried[node.id] = 1
        if self.outstanding == 0:
            reactor.callFromThread(self.callback, [])
 
 
-KEINITIAL_DELAY = 60 # 1 minute
-KE_DELAY = 60 # 1 minute
-KE_AGE = 60 * 5
+
 class KeyExpirer:
     def __init__(self, store, itime, kw):
        self.store = store
        self.itime = itime
        self.kw = kw
-       reactor.callLater(KEINITIAL_DELAY, self.doExpire)
+       reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
        
     def doExpire(self):
-       self.cut = `time() - KE_AGE`
+       self.cut = `time() - const.KE_AGE`
        self._expire()
        
     def _expire(self):
@@ -225,9 +234,11 @@ class KeyExpirer:
                    self.store.delete(h)
                    ic.delete()
                    i = i + 1
+               else:
+                   break
            irec = f()
            
-       reactor.callLater(KE_DELAY, self.doExpire)
+       reactor.callLater(const.KE_DELAY, self.doExpire)
        if(i > 0):
            print ">>>KE: done expiring %d" % i
        
\ No newline at end of file