+## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
+
from time import time
from const import reactor
import const
-from hash import intify
+from khash import intify
from knode import KNode as Node
from ktable import KTable, K
class FindNode(ActionBase):
""" find node action merits it's own class as it is a long running stateful process """
def handleGotNodes(self, dict):
+ _krpc_sender = dict['_krpc_sender']
+ dict = dict['rsp']
l = dict["nodes"]
- sender = dict["sender"]
+ sender = {'id' : dict["id"]}
+ sender['port'] = _krpc_sender[1]
+ sender['host'] = _krpc_sender[0]
sender = Node().initWithDict(sender)
- sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+ sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
# a day late and a dollar short
self.answered[sender.id] = 1
for node in l:
n = Node().initWithDict(node)
- n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.table.udp.connectionForAddr((n.host, n.port))
if not self.found.has_key(n.id):
self.found[n.id] = n
self.schedule()
return self.callback([node])
if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
#xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
- df = node.findNode(self.target, self.table.node.senderDict())
+ df = node.findNode(self.target, self.table.node.id)
df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
def makeMsgFailed(self, node):
def defaultGotNodes(err, self=self, node=node):
+ print ">>> find failed %s/%s" % (node.host, node.port)
self.table.table.nodeFailed(node)
self.outstanding = self.outstanding - 1
self.schedule()
class GetValue(FindNode):
""" get value task """
def handleGotNodes(self, dict):
- sender = dict["sender"]
+ _krpc_sender = dict['_krpc_sender']
+ dict = dict['rsp']
+ sender = {'id' : dict["id"]}
+ sender['port'] = _krpc_sender[1]
+ sender['host'] = _krpc_sender[0]
sender = Node().initWithDict(sender)
- sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+ sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
# a day late and a dollar short
if dict.has_key('nodes'):
for node in dict['nodes']:
n = Node().initWithDict(node)
- n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.table.udp.connectionForAddr((n.host, n.port))
if not self.found.has_key(n.id):
self.found[n.id] = n
elif dict.has_key('values'):
return y
else:
return None
+ z = len(dict['values'])
v = filter(None, map(x, dict['values']))
if(len(v)):
reactor.callFromThread(self.callback, v)
for node in l[:K]:
if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
#xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
- df = node.findValue(self.target, self.table.node.senderDict())
+ df = node.findValue(self.target, self.table.node.id)
df.addCallback(self.handleGotNodes)
df.addErrback(self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.schedule()
def storeFailed(self, t, node):
+ print ">>> store failed %s/%s" % (node.host, node.port)
self.table.nodeFailed(node)
self.outstanding -= 1
if self.finished:
else:
if not node.id == self.table.node.id:
self.outstanding += 1
- df = node.storeValue(self.target, self.value, self.table.node.senderDict())
+ df = node.storeValue(self.target, self.value, self.table.node.id)
df.addCallback(self.storedValue, node=node)
df.addErrback(self.storeFailed, node=node)
s = "delete from kv where time < '%s';" % self.cut
c.execute(s)
reactor.callLater(const.KE_DELAY, self.doExpire)
-
\ No newline at end of file