import const
from khash import intify
-from knode import KNode as Node
from ktable import KTable, K
class ActionBase:
sender = {'id' : dict["id"]}
sender['port'] = _krpc_sender[1]
sender['host'] = _krpc_sender[0]
- sender = Node().initWithDict(sender)
+ sender = self.table.Node().initWithDict(sender)
sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
self.outstanding = self.outstanding - 1
self.answered[sender.id] = 1
for node in l:
- n = Node().initWithDict(node)
+ n = self.table.Node().initWithDict(node)
n.conn = self.table.udp.connectionForAddr((n.host, n.port))
if not self.found.has_key(n.id):
self.found[n.id] = n
def makeMsgFailed(self, node):
def defaultGotNodes(err, self=self, node=node):
- print ">>> find failed %s/%s" % (node.host, node.port)
+ print ">>> find failed %s/%s" % (node.host, node.port), err
self.table.table.nodeFailed(node)
self.outstanding = self.outstanding - 1
self.schedule()
self.schedule()
-GET_VALUE_TIMEOUT = 15
+get_value_timeout = 15
class GetValue(FindNode):
+ def __init__(self, table, target, callback, find="findValue"):
+ FindNode.__init__(self, table, target, callback)
+ self.findValue = find
+
""" get value task """
def handleGotNodes(self, dict):
_krpc_sender = dict['_krpc_sender']
sender = {'id' : dict["id"]}
sender['port'] = _krpc_sender[1]
sender['host'] = _krpc_sender[0]
- sender = Node().initWithDict(sender)
+ sender = self.table.Node().initWithDict(sender)
sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
# if we have any closer than what we already got, query them
if dict.has_key('nodes'):
for node in dict['nodes']:
- n = Node().initWithDict(node)
+ n = self.table.Node().initWithDict(node)
n.conn = self.table.udp.connectionForAddr((n.host, n.port))
if not self.found.has_key(n.id):
self.found[n.id] = n
for node in l[:K]:
if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
#xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
- df = node.findValue(self.target, self.table.node.id)
- df.addCallback(self.handleGotNodes)
- df.addErrback(self.makeMsgFailed(node))
- self.outstanding = self.outstanding + 1
- self.queried[node.id] = 1
+ try:
+ f = getattr(node, self.findValue)
+ except AttributeError:
+ print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
+ else:
+ df = f(self.target, self.table.node.id)
+ df.addCallback(self.handleGotNodes)
+ df.addErrback(self.makeMsgFailed(node))
+ self.outstanding = self.outstanding + 1
+ self.queried[node.id] = 1
if self.outstanding >= const.CONCURRENT_REQS:
break
assert(self.outstanding) >=0
class StoreValue(ActionBase):
- def __init__(self, table, target, value, callback):
+ def __init__(self, table, target, value, callback, store="storeValue"):
ActionBase.__init__(self, table, target, callback)
self.value = value
self.stored = []
-
+ self.store = store
+
def storedValue(self, t, node):
self.outstanding -= 1
self.table.insertNode(node)
else:
if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
self.schedule()
-
+ return t
+
def storeFailed(self, t, node):
print ">>> store failed %s/%s" % (node.host, node.port)
self.table.nodeFailed(node)
self.outstanding -= 1
if self.finished:
- return
+ return t
self.schedule()
-
+ return t
+
def schedule(self):
if self.finished:
return
else:
if not node.id == self.table.node.id:
self.outstanding += 1
- if type(self.value) == type([]):
- df = node.storeValues(self.target, self.value, self.table.node.id)
+ try:
+ f = getattr(node, self.store)
+ except AttributeError:
+ print ">>> %s doesn't have a %s method!" % (node, self.store)
else:
- df = node.storeValue(self.target, self.value, self.table.node.id)
-
- df.addCallback(self.storedValue, node=node)
- df.addErrback(self.storeFailed, node=node)
+ df = f(self.target, self.value, self.table.node.id)
+ df.addCallback(self.storedValue, node=node)
+ df.addErrback(self.storeFailed, node=node)
def goWithNodes(self, nodes):
self.nodes = nodes