from bsddb3 import db
from const import reactor
+import const
from hash import intify
from knode import KNode as Node
from ktable import KTable, K
-# concurrent FIND_NODE/VALUE requests!
-N = 3
-
class ActionBase:
""" base class for some long running asynchronous proccesses like finding nodes or values """
def __init__(self, table, target, callback):
class FindNode(ActionBase):
""" find node action merits it's own class as it is a long running stateful process """
def handleGotNodes(self, args):
+ args, conn = args
l, sender = args
+ sender['host'] = conn['host']
sender = Node().initWithDict(sender)
+ self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
# a day late and a dollar short
return
n = Node().initWithDict(node)
if not self.found.has_key(n.id):
self.found[n.id] = n
- self.table.insertNode(n)
self.schedule()
def schedule(self):
l = self.found.values()
l.sort(self.sort)
- for node in l[:K]:
+ for node in l:
if node.id == self.target:
self.finished=1
return self.callback([node])
if not self.queried.has_key(node.id) and node.id != self.table.node.id:
#xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
df = node.findNode(self.target, self.table.node.senderDict())
- df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
+ df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
- if self.outstanding >= N:
+ if self.outstanding >= const.CONCURRENT_REQS:
break
assert(self.outstanding) >=0
if self.outstanding == 0:
self.finished=1
reactor.callFromThread(self.callback, l[:K])
- def defaultGotNodes(self, t):
- if self.finished:
- return
- self.outstanding = self.outstanding - 1
- self.schedule()
-
+ def makeMsgFailed(self, node):
+ def defaultGotNodes(err, self=self, node=node):
+ self.table.table.nodeFailed(node)
+ if self.finished:
+ return
+ self.outstanding = self.outstanding - 1
+ self.schedule()
+ return defaultGotNodes
def goWithNodes(self, nodes):
"""
for node in nodes:
if node.id == self.table.node.id:
continue
- self.found[node.id] = node
- #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
- df = node.findNode(self.target, self.table.node.senderDict())
- df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
- self.outstanding = self.outstanding + 1
- self.queried[node.id] = 1
- if self.outstanding == 0:
- self.callback(nodes)
+ else:
+ self.found[node.id] = node
+
+ self.schedule()
GET_VALUE_TIMEOUT = 15
class GetValue(FindNode):
""" get value task """
def handleGotNodes(self, args):
+ args, conn = args
l, sender = args
+ sender['host'] = conn['host']
sender = Node().initWithDict(sender)
+ self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
# a day late and a dollar short
return
n = Node().initWithDict(node)
if not self.found.has_key(n.id):
self.found[n.id] = n
- self.table.insertNode(n)
elif l.has_key('values'):
def x(y, z=self.results):
y = y.data
if not z.has_key(y):
z[y] = 1
return y
+ else:
+ return None
v = filter(None, map(x, l['values']))
if(len(v)):
reactor.callFromThread(self.callback, v)
l = self.found.values()
l.sort(self.sort)
- for node in l[:K]:
+ for node in l:
if not self.queried.has_key(node.id) and node.id != self.table.node.id:
#xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
df = node.findValue(self.target, self.table.node.senderDict())
df.addCallback(self.handleGotNodes)
- df.addErrback(self.defaultGotNodes)
+ df.addErrback(self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
- if self.outstanding >= N:
+ if self.outstanding >= const.CONCURRENT_REQS:
break
assert(self.outstanding) >=0
if self.outstanding == 0:
reactor.callFromThread(self.callback,[])
## get value
- def goWithNodes(self, nodes):
+ def goWithNodes(self, nodes, found=None):
self.results = {}
+ if found:
+ for n in found:
+ self.results[n] = 1
for node in nodes:
if node.id == self.table.node.id:
continue
- self.found[node.id] = node
- #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
- df = node.findValue(self.target, self.table.node.senderDict())
- df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
- self.outstanding = self.outstanding + 1
- self.queried[node.id] = 1
- if self.outstanding == 0:
- reactor.callFromThread(self.callback, [])
+ else:
+ self.found[node.id] = node
+
+ self.schedule()
+
-KEINITIAL_DELAY = 60 # 1 minute
-KE_DELAY = 60 # 1 minute
-KE_AGE = 60 * 5
class KeyExpirer:
def __init__(self, store, itime, kw):
self.store = store
self.itime = itime
self.kw = kw
- reactor.callLater(KEINITIAL_DELAY, self.doExpire)
+ reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
def doExpire(self):
- self.cut = `time() - KE_AGE`
+ self.cut = `time() - const.KE_AGE`
self._expire()
def _expire(self):
self.store.delete(h)
ic.delete()
i = i + 1
+ else:
+ break
irec = f()
- reactor.callLater(KE_DELAY, self.doExpire)
+ reactor.callLater(const.KE_DELAY, self.doExpire)
if(i > 0):
print ">>>KE: done expiring %d" % i
\ No newline at end of file