]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - actions.py
more constants
[quix0rs-apt-p2p.git] / actions.py
index a4d9514f0de6a8c833c8a1b42c788f6fd4ab2d41..13b386470a32658f7609a955a9ae01fd83055d87 100644 (file)
@@ -1,10 +1,14 @@
+from time import time
+from pickle import loads, dumps
+
+from bsddb3 import db
+
 from const import reactor
+import const
 
 from hash import intify
 from knode import KNode as Node
 from ktable import KTable, K
-# concurrent FIND_NODE/VALUE requests!
-N = 3
 
 class ActionBase:
     """ base class for some long running asynchronous proccesses like finding nodes or values """
@@ -39,17 +43,20 @@ FIND_NODE_TIMEOUT = 15
 class FindNode(ActionBase):
     """ find node action merits it's own class as it is a long running stateful process """
     def handleGotNodes(self, args):
+       args, conn = args
        l, sender = args
-       if self.finished or self.answered.has_key(sender['id']):
+       sender['host'] = conn['host']
+       sender = Node().initWithDict(sender)
+       self.table.table.insertNode(sender)
+       if self.finished or self.answered.has_key(sender.id):
            # a day late and a dollar short
            return
        self.outstanding = self.outstanding - 1
-       self.answered[sender['id']] = 1
+       self.answered[sender.id] = 1
        for node in l:
-           if not self.found.has_key(node['id']):
-               n = Node(node['id'], node['host'], node['port'])
+           n = Node().initWithDict(node)
+           if not self.found.has_key(n.id):
                self.found[n.id] = n
-               self.table.insertNode(n)
        self.schedule()
                
     def schedule(self):
@@ -61,17 +68,17 @@ class FindNode(ActionBase):
        l = self.found.values()
        l.sort(self.sort)
 
-       for node in l[:K]:
+       for node in l:
            if node.id == self.target:
                self.finished=1
                return self.callback([node])
            if not self.queried.has_key(node.id) and node.id != self.table.node.id:
                #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
                df = node.findNode(self.target, self.table.node.senderDict())
-               df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
+               df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
                self.outstanding = self.outstanding + 1
                self.queried[node.id] = 1
-           if self.outstanding >= N:
+           if self.outstanding >= const.CONCURRENT_REQS:
                break
        assert(self.outstanding) >=0
        if self.outstanding == 0:
@@ -79,12 +86,14 @@ class FindNode(ActionBase):
            self.finished=1
            reactor.callFromThread(self.callback, l[:K])
        
-    def defaultGotNodes(self, t):
-       if self.finished:
-           return
-       self.outstanding = self.outstanding - 1
-       self.schedule()
-       
+    def makeMsgFailed(self, node):
+       def defaultGotNodes(err, self=self, node=node):
+           self.table.table.nodeFailed(node)
+           if self.finished:
+               return
+           self.outstanding = self.outstanding - 1
+           self.schedule()
+       return defaultGotNodes
        
     def goWithNodes(self, nodes):
        """
@@ -94,39 +103,41 @@ class FindNode(ActionBase):
        for node in nodes:
            if node.id == self.table.node.id:
                continue
-           self.found[node.id] = node
-           #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-           df = node.findNode(self.target, self.table.node.senderDict())
-           df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
-           self.outstanding = self.outstanding + 1
-           self.queried[node.id] = 1
-       if self.outstanding == 0:
-           self.callback(nodes)
+           else:
+               self.found[node.id] = node
+           
+       self.schedule()
 
 
 GET_VALUE_TIMEOUT = 15
 class GetValue(FindNode):
     """ get value task """
     def handleGotNodes(self, args):
+       args, conn = args
        l, sender = args
-       if self.finished or self.answered.has_key(sender['id']):
+       sender['host'] = conn['host']
+       sender = Node().initWithDict(sender)
+       self.table.table.insertNode(sender)
+       if self.finished or self.answered.has_key(sender.id):
            # a day late and a dollar short
            return
        self.outstanding = self.outstanding - 1
-       self.answered[sender['id']] = 1
+       self.answered[sender.id] = 1
        # go through nodes
        # if we have any closer than what we already got, query them
        if l.has_key('nodes'):
            for node in l['nodes']:
-               if not self.found.has_key(node['id']):
-                   n = Node(node['id'], node['host'], node['port'])
+               n = Node().initWithDict(node)
+               if not self.found.has_key(n.id):
                    self.found[n.id] = n
-                   self.table.insertNode(n)
        elif l.has_key('values'):
            def x(y, z=self.results):
+               y = y.data
                if not z.has_key(y):
                    z[y] = 1
                    return y
+               else:
+                   return None
            v = filter(None, map(x, l['values']))
            if(len(v)):
                reactor.callFromThread(self.callback, v)
@@ -139,15 +150,15 @@ class GetValue(FindNode):
        l = self.found.values()
        l.sort(self.sort)
 
-       for node in l[:K]:
+       for node in l:
            if not self.queried.has_key(node.id) and node.id != self.table.node.id:
                #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
                df = node.findValue(self.target, self.table.node.senderDict())
                df.addCallback(self.handleGotNodes)
-               df.addErrback(self.defaultGotNodes)
+               df.addErrback(self.makeMsgFailed(node))
                self.outstanding = self.outstanding + 1
                self.queried[node.id] = 1
-           if self.outstanding >= N:
+           if self.outstanding >= const.CONCURRENT_REQS:
                break
        assert(self.outstanding) >=0
        if self.outstanding == 0:
@@ -156,17 +167,68 @@ class GetValue(FindNode):
            reactor.callFromThread(self.callback,[])
     
     ## get value
-    def goWithNodes(self, nodes):
+    def goWithNodes(self, nodes, found=None):
        self.results = {}
+       if found:
+           for n in found:
+               self.results[n] = 1
        for node in nodes:
            if node.id == self.table.node.id:
                continue
-           self.found[node.id] = node
-           #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-           df = node.findValue(self.target, self.table.node.senderDict())
-           df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
-           self.outstanding = self.outstanding + 1
-           self.queried[node.id] = 1
-       if self.outstanding == 0:
-           reactor.callFromThread(self.callback, [])
+           else:
+               self.found[node.id] = node
+           
+       self.schedule()
 
+
+
+class KeyExpirer:
+    def __init__(self, store, itime, kw):
+       self.store = store
+       self.itime = itime
+       self.kw = kw
+       reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
+       
+    def doExpire(self):
+       self.cut = `time() - const.KE_AGE`
+       self._expire()
+       
+    def _expire(self):
+       ic = self.itime.cursor()
+       sc = self.store.cursor()
+       kc = self.kw.cursor()
+       irec = None
+       try:
+           irec = ic.set_range(self.cut)
+       except db.DBNotFoundError:
+           # everything is expired
+           f = ic.prev
+           irec = f()
+       else:
+           f = ic.next
+       i = 0
+       while irec:
+           it, h = irec
+           try:
+               k, v, lt = loads(self.store[h])
+           except KeyError:
+               ic.delete()
+           else:
+               if lt < self.cut:
+                   try:
+                       kc.set_both(k, h)
+                   except db.DBNotFoundError:
+                       print "Database inconsistency!  No key->value entry when a store entry was found!"
+                   else:
+                       kc.delete()
+                   self.store.delete(h)
+                   ic.delete()
+                   i = i + 1
+               else:
+                   break
+           irec = f()
+           
+       reactor.callLater(const.KE_DELAY, self.doExpire)
+       if(i > 0):
+           print ">>>KE: done expiring %d" % i
+       
\ No newline at end of file