]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - actions.py
more constants
[quix0rs-apt-p2p.git] / actions.py
index 0a6062f0ee136f21a35af19e29d0b07ed1bdbb93..13b386470a32658f7609a955a9ae01fd83055d87 100644 (file)
@@ -4,14 +4,12 @@ from pickle import loads, dumps
 from bsddb3 import db
 
 from const import reactor
+import const
 
 from hash import intify
 from knode import KNode as Node
 from ktable import KTable, K
 
-# concurrent FIND_NODE/VALUE requests!
-N = 3
-
 class ActionBase:
     """ base class for some long running asynchronous proccesses like finding nodes or values """
     def __init__(self, table, target, callback):
@@ -45,8 +43,11 @@ FIND_NODE_TIMEOUT = 15
 class FindNode(ActionBase):
     """ find node action merits it's own class as it is a long running stateful process """
     def handleGotNodes(self, args):
+       args, conn = args
        l, sender = args
+       sender['host'] = conn['host']
        sender = Node().initWithDict(sender)
+       self.table.table.insertNode(sender)
        if self.finished or self.answered.has_key(sender.id):
            # a day late and a dollar short
            return
@@ -56,7 +57,6 @@ class FindNode(ActionBase):
            n = Node().initWithDict(node)
            if not self.found.has_key(n.id):
                self.found[n.id] = n
-               self.table.insertNode(n)
        self.schedule()
                
     def schedule(self):
@@ -68,17 +68,17 @@ class FindNode(ActionBase):
        l = self.found.values()
        l.sort(self.sort)
 
-       for node in l[:K]:
+       for node in l:
            if node.id == self.target:
                self.finished=1
                return self.callback([node])
            if not self.queried.has_key(node.id) and node.id != self.table.node.id:
                #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
                df = node.findNode(self.target, self.table.node.senderDict())
-               df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
+               df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
                self.outstanding = self.outstanding + 1
                self.queried[node.id] = 1
-           if self.outstanding >= N:
+           if self.outstanding >= const.CONCURRENT_REQS:
                break
        assert(self.outstanding) >=0
        if self.outstanding == 0:
@@ -86,12 +86,14 @@ class FindNode(ActionBase):
            self.finished=1
            reactor.callFromThread(self.callback, l[:K])
        
-    def defaultGotNodes(self, t):
-       if self.finished:
-           return
-       self.outstanding = self.outstanding - 1
-       self.schedule()
-       
+    def makeMsgFailed(self, node):
+       def defaultGotNodes(err, self=self, node=node):
+           self.table.table.nodeFailed(node)
+           if self.finished:
+               return
+           self.outstanding = self.outstanding - 1
+           self.schedule()
+       return defaultGotNodes
        
     def goWithNodes(self, nodes):
        """
@@ -101,22 +103,21 @@ class FindNode(ActionBase):
        for node in nodes:
            if node.id == self.table.node.id:
                continue
-           self.found[node.id] = node
-           #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-           df = node.findNode(self.target, self.table.node.senderDict())
-           df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
-           self.outstanding = self.outstanding + 1
-           self.queried[node.id] = 1
-       if self.outstanding == 0:
-           self.callback(nodes)
+           else:
+               self.found[node.id] = node
+           
+       self.schedule()
 
 
 GET_VALUE_TIMEOUT = 15
 class GetValue(FindNode):
     """ get value task """
     def handleGotNodes(self, args):
+       args, conn = args
        l, sender = args
+       sender['host'] = conn['host']
        sender = Node().initWithDict(sender)
+       self.table.table.insertNode(sender)
        if self.finished or self.answered.has_key(sender.id):
            # a day late and a dollar short
            return
@@ -129,13 +130,14 @@ class GetValue(FindNode):
                n = Node().initWithDict(node)
                if not self.found.has_key(n.id):
                    self.found[n.id] = n
-                   self.table.insertNode(n)
        elif l.has_key('values'):
            def x(y, z=self.results):
                y = y.data
                if not z.has_key(y):
                    z[y] = 1
                    return y
+               else:
+                   return None
            v = filter(None, map(x, l['values']))
            if(len(v)):
                reactor.callFromThread(self.callback, v)
@@ -148,15 +150,15 @@ class GetValue(FindNode):
        l = self.found.values()
        l.sort(self.sort)
 
-       for node in l[:K]:
+       for node in l:
            if not self.queried.has_key(node.id) and node.id != self.table.node.id:
                #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
                df = node.findValue(self.target, self.table.node.senderDict())
                df.addCallback(self.handleGotNodes)
-               df.addErrback(self.defaultGotNodes)
+               df.addErrback(self.makeMsgFailed(node))
                self.outstanding = self.outstanding + 1
                self.queried[node.id] = 1
-           if self.outstanding >= N:
+           if self.outstanding >= const.CONCURRENT_REQS:
                break
        assert(self.outstanding) >=0
        if self.outstanding == 0:
@@ -165,35 +167,30 @@ class GetValue(FindNode):
            reactor.callFromThread(self.callback,[])
     
     ## get value
-    def goWithNodes(self, nodes):
+    def goWithNodes(self, nodes, found=None):
        self.results = {}
+       if found:
+           for n in found:
+               self.results[n] = 1
        for node in nodes:
            if node.id == self.table.node.id:
                continue
-           self.found[node.id] = node
-           #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-           df = node.findValue(self.target, self.table.node.senderDict())
-           df.addCallback(self.handleGotNodes)
-           df.addErrback(self.defaultGotNodes)
-           self.outstanding = self.outstanding + 1
-           self.queried[node.id] = 1
-       if self.outstanding == 0:
-           reactor.callFromThread(self.callback, [])
+           else:
+               self.found[node.id] = node
+           
+       self.schedule()
 
 
-KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours
-KE_DELAY = 60 * 60 # 1 hour
-KE_AGE = KEINITIAL_DELAY
 
 class KeyExpirer:
     def __init__(self, store, itime, kw):
        self.store = store
        self.itime = itime
        self.kw = kw
-       reactor.callLater(KEINITIAL_DELAY, self.doExpire)
+       reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
        
     def doExpire(self):
-       self.cut = `time() - KE_AGE`
+       self.cut = `time() - const.KE_AGE`
        self._expire()
        
     def _expire(self):
@@ -231,7 +228,7 @@ class KeyExpirer:
                    break
            irec = f()
            
-       reactor.callLater(KE_DELAY, self.doExpire)
+       reactor.callLater(const.KE_DELAY, self.doExpire)
        if(i > 0):
            print ">>>KE: done expiring %d" % i
        
\ No newline at end of file