]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
now with auto-expiring of stored keys/values
authorburris <burris>
Sat, 7 Sep 2002 19:13:28 +0000 (19:13 +0000)
committerburris <burris>
Sat, 7 Sep 2002 19:13:28 +0000 (19:13 +0000)
lots of typo bug fixes

actions.py
khashmir.py

index a4d9514f0de6a8c833c8a1b42c788f6fd4ab2d41..c3cea32821a09eb49a1d680efd615695a2ff763f 100644 (file)
@@ -1,8 +1,14 @@
+from time import time
+from pickle import loads, dumps
+
+from bsddb3 import db
+
 from const import reactor
 
 from hash import intify
 from knode import KNode as Node
 from ktable import KTable, K
+
 # concurrent FIND_NODE/VALUE requests!
 N = 3
 
@@ -170,3 +176,55 @@ class GetValue(FindNode):
        if self.outstanding == 0:
            reactor.callFromThread(self.callback, [])
 
+
+KEINITIAL_DELAY = 60 # 1 minute
+KE_DELAY = 60 # 1 minute
+KE_AGE = 60 * 5
+class KeyExpirer:
+    def __init__(self, store, itime, kw):
+       self.store = store
+       self.itime = itime
+       self.kw = kw
+       reactor.callLater(KEINITIAL_DELAY, self.doExpire)
+       
+    def doExpire(self):
+       self.cut = `time() - KE_AGE`
+       self._expire()
+       
+    def _expire(self):
+       ic = self.itime.cursor()
+       sc = self.store.cursor()
+       kc = self.kw.cursor()
+       irec = None
+       try:
+           irec = ic.set_range(self.cut)
+       except db.DBNotFoundError:
+           # everything is expired
+           f = ic.prev
+           irec = f()
+       else:
+           f = ic.next
+       i = 0
+       while irec:
+           it, h = irec
+           try:
+               k, v, lt = loads(self.store[h])
+           except KeyError:
+               ic.delete()
+           else:
+               if lt < self.cut:
+                   try:
+                       kc.set_both(k, h)
+                   except db.DBNotFoundError:
+                       print "Database inconsistency!  No key->value entry when a store entry was found!"
+                   else:
+                       kc.delete()
+                   self.store.delete(h)
+                   ic.delete()
+                   i = i + 1
+           irec = f()
+           
+       reactor.callLater(KE_DELAY, self.doExpire)
+       if(i > 0):
+           print ">>>KE: done expiring %d" % i
+       
\ No newline at end of file
index 2b716e62937d7bf65a68ba0bea90ddba6fae95b0..9e5a26cc17e7c13e592081e00bdf5fa26f8942a9 100644 (file)
@@ -10,7 +10,7 @@ from knode import KNode as Node
 
 from hash import newID
 
-from actions import FindNode, GetValue
+from actions import FindNode, GetValue, KeyExpirer
 from twisted.web import xmlrpc
 from twisted.internet.defer import Deferred
 from twisted.python import threadable
@@ -50,7 +50,8 @@ class Khashmir(xmlrpc.XMLRPC):
        self.kw.set_flags(db.DB_DUP)
        self.kw.open(None, None, db.DB_BTREE)
 
-
+       KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
+       
     def render(self, request):
        """
            Override the built in render so we can have access to the request object!
@@ -94,14 +95,20 @@ class Khashmir(xmlrpc.XMLRPC):
        reactor.callFromThread(state.goWithNodes, nodes)
 
 
+
     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
-    def storeValueForKey(self, key, value):
+    def storeValueForKey(self, key, value, callback=None):
        """ stores the value for key in the global table, returns immediately, no status 
            in this implementation, peers respond but don't indicate status to storing values
            values are stored in peers on a first-come first-served basis
            this will probably change so more than one value can be stored under a key
        """
-       def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
+       def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
+           if not callback:
+               # default callback - this will get called for each successful store value
+               def _storedValueHandler(sender):
+                   pass
+               response=_storedValueHandler
            for node in nodes:
                if node.id != self.node.id:
                    df = node.storeValue(key, value, self.node.senderDict())
@@ -134,7 +141,7 @@ class Khashmir(xmlrpc.XMLRPC):
                    self.table.insertNode(old)
 
            df = old.ping(self.node.senderDict())
-           df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
+           df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
 
     def sendPing(self, node):
@@ -229,7 +236,7 @@ class Khashmir(xmlrpc.XMLRPC):
        self.insertNode(n)
        if self.kw.has_key(key):
            c = self.kw.cursor()
-           tup = c.set_range(key)
+           tup = c.set(key)
            l = []
            while(tup):
                h1 = tup[1]
@@ -242,20 +249,13 @@ class Khashmir(xmlrpc.XMLRPC):
            nodes = map(lambda node: node.senderDict(), nodes)
            return {'nodes' : nodes}, self.node.senderDict()
 
-    ###
-    ### message response callbacks
-    # called when we get a response to store value
-    def _storedValueHandler(self, sender):
-       pass
-
-
 
 
 
 
 #------ testing
 
-def test_build_net(quiet=0, peers=64, pause=1):
+def test_build_net(quiet=0, peers=8, pause=1):
     from whrandom import randrange
     import thread
     port = 2001
@@ -295,7 +295,7 @@ def test_build_net(quiet=0, peers=64, pause=1):
        if pause:
            time.sleep(.5)
     if pause:
-           time.sleep(10)
+           time.sleep(2)
 #    for peer in l:
 #      peer.refreshTable()
     return l
@@ -322,6 +322,7 @@ def test_find_nodes(l, quiet=0):
 def test_find_value(l, quiet=0):
     from whrandom import randrange
     from sha import sha
+    from hash import newID
     import time, threading, sys
     
     fa = threading.Event()
@@ -334,8 +335,8 @@ def test_find_value(l, quiet=0):
     c = l[randrange(0,n)]
     d = l[randrange(0,n)]
 
-    key = sha(`randrange(0,100000)`).digest()
-    value = sha(`randrange(0,100000)`).digest()
+    key = newID()
+    value = newID()
     if not quiet:
        print "inserting value..."
        sys.stdout.flush()