+from time import time
+from pickle import loads, dumps
+
+from bsddb3 import db
+
from const import reactor
from hash import intify
from knode import KNode as Node
from ktable import KTable, K
+
# concurrent FIND_NODE/VALUE requests!
N = 3
""" find node action merits it's own class as it is a long running stateful process """
def handleGotNodes(self, args):
l, sender = args
- if self.finished or self.answered.has_key(sender['id']):
+ sender = Node().initWithDict(sender)
+ if self.finished or self.answered.has_key(sender.id):
# a day late and a dollar short
return
self.outstanding = self.outstanding - 1
- self.answered[sender['id']] = 1
+ self.answered[sender.id] = 1
for node in l:
- if not self.found.has_key(node['id']):
- n = Node(node['id'], node['host'], node['port'])
+ n = Node().initWithDict(node)
+ if not self.found.has_key(n.id):
self.found[n.id] = n
self.table.insertNode(n)
self.schedule()
""" get value task """
def handleGotNodes(self, args):
l, sender = args
- l = l[0]
- if self.finished or self.answered.has_key(sender['id']):
+ sender = Node().initWithDict(sender)
+ if self.finished or self.answered.has_key(sender.id):
# a day late and a dollar short
return
self.outstanding = self.outstanding - 1
- self.answered[sender['id']] = 1
+ self.answered[sender.id] = 1
# go through nodes
# if we have any closer than what we already got, query them
if l.has_key('nodes'):
for node in l['nodes']:
- if not self.found.has_key(node['id']):
- n = Node(node['id'], node['host'], node['port'])
+ n = Node().initWithDict(node)
+ if not self.found.has_key(n.id):
self.found[n.id] = n
self.table.insertNode(n)
elif l.has_key('values'):
- ## done
- self.finished = 1
- return self.callback(l['values'])
+ def x(y, z=self.results):
+ y = y.data
+ if not z.has_key(y):
+ z[y] = 1
+ return y
+ v = filter(None, map(x, l['values']))
+ if(len(v)):
+ reactor.callFromThread(self.callback, v)
self.schedule()
## get value
for node in l[:K]:
if not self.queried.has_key(node.id) and node.id != self.table.node.id:
#xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
- df = node.getValue(node, self.target)
- df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
+ df = node.findValue(self.target, self.table.node.senderDict())
+ df.addCallback(self.handleGotNodes)
+ df.addErrback(self.defaultGotNodes)
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
if self.outstanding >= N:
## get value
def goWithNodes(self, nodes):
+ self.results = {}
for node in nodes:
if node.id == self.table.node.id:
continue
self.found[node.id] = node
#xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
- df = node.findNode(self.target, self.table.node.senderDict())
+ df = node.findValue(self.target, self.table.node.senderDict())
df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
if self.outstanding == 0:
reactor.callFromThread(self.callback, [])
+
+KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours
+KE_DELAY = 60 * 60 # 1 hour
+KE_AGE = KEINITIAL_DELAY
+
+class KeyExpirer:
+ def __init__(self, store, itime, kw):
+ self.store = store
+ self.itime = itime
+ self.kw = kw
+ reactor.callLater(KEINITIAL_DELAY, self.doExpire)
+
+ def doExpire(self):
+ self.cut = `time() - KE_AGE`
+ self._expire()
+
+ def _expire(self):
+ ic = self.itime.cursor()
+ sc = self.store.cursor()
+ kc = self.kw.cursor()
+ irec = None
+ try:
+ irec = ic.set_range(self.cut)
+ except db.DBNotFoundError:
+ # everything is expired
+ f = ic.prev
+ irec = f()
+ else:
+ f = ic.next
+ i = 0
+ while irec:
+ it, h = irec
+ try:
+ k, v, lt = loads(self.store[h])
+ except KeyError:
+ ic.delete()
+ else:
+ if lt < self.cut:
+ try:
+ kc.set_both(k, h)
+ except db.DBNotFoundError:
+ print "Database inconsistency! No key->value entry when a store entry was found!"
+ else:
+ kc.delete()
+ self.store.delete(h)
+ ic.delete()
+ i = i + 1
+ irec = f()
+
+ reactor.callLater(KE_DELAY, self.doExpire)
+ if(i > 0):
+ print ">>>KE: done expiring %d" % i
+
\ No newline at end of file