from knode import KNode as Node
from ktable import KTable, K
-# concurrent FIND_NODE/VALUE requests!
-N = 3
-
class ActionBase:
""" base class for some long running asynchronous proccesses like finding nodes or values """
def __init__(self, table, target, callback):
class FindNode(ActionBase):
""" find node action merits it's own class as it is a long running stateful process """
def handleGotNodes(self, args):
+ args, conn = args
l, sender = args
+ sender['host'] = conn['host']
sender = Node().initWithDict(sender)
self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
n = Node().initWithDict(node)
if not self.found.has_key(n.id):
self.found[n.id] = n
- self.table.insertNode(n)
self.schedule()
def schedule(self):
df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
- if self.outstanding >= N:
+ if self.outstanding >= const.CONCURRENT_REQS:
break
assert(self.outstanding) >=0
if self.outstanding == 0:
df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
+ if self.outstanding >= const.CONCURRENT_REQS:
+ break
+
if self.outstanding == 0:
self.callback(nodes)
class GetValue(FindNode):
""" get value task """
def handleGotNodes(self, args):
+ args, conn = args
l, sender = args
+ sender['host'] = conn['host']
sender = Node().initWithDict(sender)
self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
n = Node().initWithDict(node)
if not self.found.has_key(n.id):
self.found[n.id] = n
- self.table.insertNode(n)
elif l.has_key('values'):
def x(y, z=self.results):
y = y.data
df.addErrback(self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
- if self.outstanding >= N:
+ if self.outstanding >= const.CONCURRENT_REQS:
break
assert(self.outstanding) >=0
if self.outstanding == 0:
df.addErrback(self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
+ if self.outstanding >= const.CONCURRENT_REQS:
+ break
+
if self.outstanding == 0:
reactor.callFromThread(self.callback, [])