+from time import time
+from pickle import loads, dumps
+
+from bsddb3 import db
+
from const import reactor
from hash import intify
from knode import KNode as Node
from ktable import KTable, K
+
# concurrent FIND_NODE/VALUE requests!
N = 3
if self.outstanding == 0:
reactor.callFromThread(self.callback, [])
+
+KEINITIAL_DELAY = 60 # 1 minute
+KE_DELAY = 60 # 1 minute
+KE_AGE = 60 * 5
+class KeyExpirer:
+ def __init__(self, store, itime, kw):
+ self.store = store
+ self.itime = itime
+ self.kw = kw
+ reactor.callLater(KEINITIAL_DELAY, self.doExpire)
+
+ def doExpire(self):
+ self.cut = `time() - KE_AGE`
+ self._expire()
+
+ def _expire(self):
+ ic = self.itime.cursor()
+ sc = self.store.cursor()
+ kc = self.kw.cursor()
+ irec = None
+ try:
+ irec = ic.set_range(self.cut)
+ except db.DBNotFoundError:
+ # everything is expired
+ f = ic.prev
+ irec = f()
+ else:
+ f = ic.next
+ i = 0
+ while irec:
+ it, h = irec
+ try:
+ k, v, lt = loads(self.store[h])
+ except KeyError:
+ ic.delete()
+ else:
+ if lt < self.cut:
+ try:
+ kc.set_both(k, h)
+ except db.DBNotFoundError:
+ print "Database inconsistency! No key->value entry when a store entry was found!"
+ else:
+ kc.delete()
+ self.store.delete(h)
+ ic.delete()
+ i = i + 1
+ irec = f()
+
+ reactor.callLater(KE_DELAY, self.doExpire)
+ if(i > 0):
+ print ">>>KE: done expiring %d" % i
+
\ No newline at end of file
from hash import newID
-from actions import FindNode, GetValue
+from actions import FindNode, GetValue, KeyExpirer
from twisted.web import xmlrpc
from twisted.internet.defer import Deferred
from twisted.python import threadable
self.kw.set_flags(db.DB_DUP)
self.kw.open(None, None, db.DB_BTREE)
-
+ KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
+
def render(self, request):
"""
Override the built in render so we can have access to the request object!
reactor.callFromThread(state.goWithNodes, nodes)
+
## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
- def storeValueForKey(self, key, value):
+ def storeValueForKey(self, key, value, callback=None):
""" stores the value for key in the global table, returns immediately, no status
in this implementation, peers respond but don't indicate status to storing values
values are stored in peers on a first-come first-served basis
this will probably change so more than one value can be stored under a key
"""
- def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
+ def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
+ if not callback:
+ # default callback - this will get called for each successful store value
+ def _storedValueHandler(sender):
+ pass
+ response=_storedValueHandler
for node in nodes:
if node.id != self.node.id:
df = node.storeValue(key, value, self.node.senderDict())
self.table.insertNode(old)
df = old.ping(self.node.senderDict())
- df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
+ df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
def sendPing(self, node):
self.insertNode(n)
if self.kw.has_key(key):
c = self.kw.cursor()
- tup = c.set_range(key)
+ tup = c.set(key)
l = []
while(tup):
h1 = tup[1]
nodes = map(lambda node: node.senderDict(), nodes)
return {'nodes' : nodes}, self.node.senderDict()
- ###
- ### message response callbacks
- # called when we get a response to store value
- def _storedValueHandler(self, sender):
- pass
-
-
#------ testing
-def test_build_net(quiet=0, peers=64, pause=1):
+def test_build_net(quiet=0, peers=8, pause=1):
from whrandom import randrange
import thread
port = 2001
if pause:
time.sleep(.5)
if pause:
- time.sleep(10)
+ time.sleep(2)
# for peer in l:
# peer.refreshTable()
return l
def test_find_value(l, quiet=0):
from whrandom import randrange
from sha import sha
+ from hash import newID
import time, threading, sys
fa = threading.Event()
c = l[randrange(0,n)]
d = l[randrange(0,n)]
- key = sha(`randrange(0,100000)`).digest()
- value = sha(`randrange(0,100000)`).digest()
+ key = newID()
+ value = newID()
if not quiet:
print "inserting value..."
sys.stdout.flush()