]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - actions.py
type fix, sorry
[quix0rs-apt-p2p.git] / actions.py
index 052f6a4b26f6a3f913b93bc49f200c2f1b5d1e9c..77160d7669b3e5e1a2b0bd97ce26a29c47dbb38b 100644 (file)
@@ -1,8 +1,14 @@
+from time import time
+from pickle import loads, dumps
+
+from bsddb3 import db
+
 from const import reactor
 
 from hash import intify
 from knode import KNode as Node
 from ktable import KTable, K
+
 # concurrent FIND_NODE/VALUE requests!
 N = 3
 
@@ -40,14 +46,15 @@ class FindNode(ActionBase):
     """ find node action merits it's own class as it is a long running stateful process """
     def handleGotNodes(self, args):
        l, sender = args
-       if self.finished or self.answered.has_key(sender['id']):
+       sender = Node().initWithDict(sender)
+       if self.finished or self.answered.has_key(sender.id):
            # a day late and a dollar short
            return
        self.outstanding = self.outstanding - 1
-       self.answered[sender['id']] = 1
+       self.answered[sender.id] = 1
        for node in l:
-           if not self.found.has_key(node['id']):
-               n = Node(node['id'], node['host'], node['port'])
+           n = Node().initWithDict(node)
+           if not self.found.has_key(n.id):
                self.found[n.id] = n
                self.table.insertNode(n)
        self.schedule()
@@ -109,24 +116,29 @@ class GetValue(FindNode):
     """ get value task """
     def handleGotNodes(self, args):
        l, sender = args
-       l = l[0]
-       if self.finished or self.answered.has_key(sender['id']):
+       sender = Node().initWithDict(sender)
+       if self.finished or self.answered.has_key(sender.id):
            # a day late and a dollar short
            return
        self.outstanding = self.outstanding - 1
-       self.answered[sender['id']] = 1
+       self.answered[sender.id] = 1
        # go through nodes
        # if we have any closer than what we already got, query them
        if l.has_key('nodes'):
            for node in l['nodes']:
-               if not self.found.has_key(node['id']):
-                   n = Node(node['id'], node['host'], node['port'])
+               n = Node().initWithDict(node)
+               if not self.found.has_key(n.id):
                    self.found[n.id] = n
                    self.table.insertNode(n)
        elif l.has_key('values'):
-           ## done
-           self.finished = 1
-           return self.callback(l['values'])
+           def x(y, z=self.results):
+               y = y.data
+               if not z.has_key(y):
+                   z[y] = 1
+                   return y
+           v = filter(None, map(x, l['values']))
+           if(len(v)):
+               reactor.callFromThread(self.callback, v)
        self.schedule()
                
     ## get value
@@ -139,8 +151,9 @@ class GetValue(FindNode):
        for node in l[:K]:
            if not self.queried.has_key(node.id) and node.id != self.table.node.id:
                #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
-               df = node.getValue(node, self.target)
-               df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
+               df = node.findValue(self.target, self.table.node.senderDict())
+               df.addCallback(self.handleGotNodes)
+               df.addErrback(self.defaultGotNodes)
                self.outstanding = self.outstanding + 1
                self.queried[node.id] = 1
            if self.outstanding >= N:
@@ -153,15 +166,69 @@ class GetValue(FindNode):
     
     ## get value
     def goWithNodes(self, nodes):
+       self.results = {}
        for node in nodes:
            if node.id == self.table.node.id:
                continue
            self.found[node.id] = node
            #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-           df = node.findNode(self.target, self.table.node.senderDict())
+           df = node.findValue(self.target, self.table.node.senderDict())
            df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
            self.outstanding = self.outstanding + 1
            self.queried[node.id] = 1
        if self.outstanding == 0:
            reactor.callFromThread(self.callback, [])
 
+
+KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours
+KE_DELAY = 60 * 60 # 1 hour
+KE_AGE = KEINITIAL_DELAY
+
+class KeyExpirer:
+    def __init__(self, store, itime, kw):
+       self.store = store
+       self.itime = itime
+       self.kw = kw
+       reactor.callLater(KEINITIAL_DELAY, self.doExpire)
+       
+    def doExpire(self):
+       self.cut = `time() - KE_AGE`
+       self._expire()
+       
+    def _expire(self):
+       ic = self.itime.cursor()
+       sc = self.store.cursor()
+       kc = self.kw.cursor()
+       irec = None
+       try:
+           irec = ic.set_range(self.cut)
+       except db.DBNotFoundError:
+           # everything is expired
+           f = ic.prev
+           irec = f()
+       else:
+           f = ic.next
+       i = 0
+       while irec:
+           it, h = irec
+           try:
+               k, v, lt = loads(self.store[h])
+           except KeyError:
+               ic.delete()
+           else:
+               if lt < self.cut:
+                   try:
+                       kc.set_both(k, h)
+                   except db.DBNotFoundError:
+                       print "Database inconsistency!  No key->value entry when a store entry was found!"
+                   else:
+                       kc.delete()
+                   self.store.delete(h)
+                   ic.delete()
+                   i = i + 1
+           irec = f()
+           
+       reactor.callLater(KE_DELAY, self.doExpire)
+       if(i > 0):
+           print ">>>KE: done expiring %d" % i
+       
\ No newline at end of file