Nodes are now initialized with their id, host and port
(optionally stored in a dict) instead of needing 2 steps.
All other classes now call the main khashmir Node() constructor
which adds the udp connection and table.
Previous actions that called insertNode on the table now
call it on the main khashmir class, so that if buckets are
full they can have a chance of being added.
class ActionBase:
""" base class for some long running asynchronous proccesses like finding nodes or values """
class ActionBase:
""" base class for some long running asynchronous proccesses like finding nodes or values """
- def __init__(self, table, target, callback, config):
- self.table = table
+ def __init__(self, caller, target, callback, config):
+ self.caller = caller
self.target = target
self.config = config
self.num = intify(target)
self.target = target
self.config = config
self.num = intify(target)
def handleGotNodes(self, dict):
_krpc_sender = dict['_krpc_sender']
dict = dict['rsp']
def handleGotNodes(self, dict):
_krpc_sender = dict['_krpc_sender']
dict = dict['rsp']
+ n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
+ self.caller.insertNode(n)
- sender = {'id' : dict["id"]}
- sender['port'] = _krpc_sender[1]
- sender['host'] = _krpc_sender[0]
- sender = self.table.Node().initWithDict(sender)
- sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
- self.table.table.insertNode(sender)
- if self.finished or self.answered.has_key(sender.id):
+ if self.finished or self.answered.has_key(dict["id"]):
# a day late and a dollar short
return
self.outstanding = self.outstanding - 1
# a day late and a dollar short
return
self.outstanding = self.outstanding - 1
- self.answered[sender.id] = 1
+ self.answered[dict["id"]] = 1
- n = self.table.Node().initWithDict(node)
- n.conn = self.table.udp.connectionForAddr((n.host, n.port))
+ n = self.caller.Node(node)
if not self.found.has_key(n.id):
self.found[n.id] = n
self.schedule()
if not self.found.has_key(n.id):
self.found[n.id] = n
self.schedule()
if node.id == self.target:
self.finished=1
return self.callback([node])
if node.id == self.target:
self.finished=1
return self.callback([node])
- if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
+ if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
#xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
#xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
- df = node.findNode(self.target, self.table.node.id)
+ df = node.findNode(self.target, self.caller.node.id)
df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
def makeMsgFailed(self, node):
def defaultGotNodes(err, self=self, node=node):
def makeMsgFailed(self, node):
def defaultGotNodes(err, self=self, node=node):
- print ">>> find failed %s/%s" % (node.host, node.port), err
- self.table.table.nodeFailed(node)
+ print ">>> find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port), err
+ self.caller.table.nodeFailed(node)
self.outstanding = self.outstanding - 1
self.schedule()
return defaultGotNodes
self.outstanding = self.outstanding - 1
self.schedule()
return defaultGotNodes
it's a transaction since we got called from the dispatcher
"""
for node in nodes:
it's a transaction since we got called from the dispatcher
"""
for node in nodes:
- if node.id == self.table.node.id:
+ if node.id == self.caller.node.id:
continue
else:
self.found[node.id] = node
continue
else:
self.found[node.id] = node
get_value_timeout = 15
class GetValue(FindNode):
get_value_timeout = 15
class GetValue(FindNode):
- def __init__(self, table, target, callback, config, find="findValue"):
- FindNode.__init__(self, table, target, callback, config)
+ def __init__(self, caller, target, callback, config, find="findValue"):
+ FindNode.__init__(self, caller, target, callback, config)
self.findValue = find
""" get value task """
def handleGotNodes(self, dict):
_krpc_sender = dict['_krpc_sender']
dict = dict['rsp']
self.findValue = find
""" get value task """
def handleGotNodes(self, dict):
_krpc_sender = dict['_krpc_sender']
dict = dict['rsp']
- sender = {'id' : dict["id"]}
- sender['port'] = _krpc_sender[1]
- sender['host'] = _krpc_sender[0]
- sender = self.table.Node().initWithDict(sender)
- sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
- self.table.table.insertNode(sender)
- if self.finished or self.answered.has_key(sender.id):
+ n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
+ self.caller.insertNode(n)
+ if self.finished or self.answered.has_key(dict["id"]):
# a day late and a dollar short
return
self.outstanding = self.outstanding - 1
# a day late and a dollar short
return
self.outstanding = self.outstanding - 1
- self.answered[sender.id] = 1
+ self.answered[dict["id"]] = 1
# go through nodes
# if we have any closer than what we already got, query them
if dict.has_key('nodes'):
for node in dict['nodes']:
# go through nodes
# if we have any closer than what we already got, query them
if dict.has_key('nodes'):
for node in dict['nodes']:
- n = self.table.Node().initWithDict(node)
- n.conn = self.table.udp.connectionForAddr((n.host, n.port))
+ n = self.caller.Node(node)
if not self.found.has_key(n.id):
self.found[n.id] = n
elif dict.has_key('values'):
if not self.found.has_key(n.id):
self.found[n.id] = n
elif dict.has_key('values'):
l.sort(self.sort)
for node in l[:self.config['K']]:
l.sort(self.sort)
for node in l[:self.config['K']]:
- if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
+ if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
#xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
try:
f = getattr(node, self.findValue)
except AttributeError:
print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
else:
#xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
try:
f = getattr(node, self.findValue)
except AttributeError:
print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
else:
- df = f(self.target, self.table.node.id)
+ df = f(self.target, self.caller.node.id)
df.addCallback(self.handleGotNodes)
df.addErrback(self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
df.addCallback(self.handleGotNodes)
df.addErrback(self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
for n in found:
self.results[n] = 1
for node in nodes:
for n in found:
self.results[n] = 1
for node in nodes:
- if node.id == self.table.node.id:
+ if node.id == self.caller.node.id:
continue
else:
self.found[node.id] = node
continue
else:
self.found[node.id] = node
class StoreValue(ActionBase):
class StoreValue(ActionBase):
- def __init__(self, table, target, value, callback, config, store="storeValue"):
- ActionBase.__init__(self, table, target, callback, config)
+ def __init__(self, caller, target, value, callback, config, store="storeValue"):
+ ActionBase.__init__(self, caller, target, callback, config)
self.value = value
self.stored = []
self.store = store
def storedValue(self, t, node):
self.outstanding -= 1
self.value = value
self.stored = []
self.store = store
def storedValue(self, t, node):
self.outstanding -= 1
- self.table.insertNode(node)
+ self.caller.insertNode(node)
if self.finished:
return
self.stored.append(t)
if self.finished:
return
self.stored.append(t)
def storeFailed(self, t, node):
print ">>> store failed %s/%s" % (node.host, node.port)
def storeFailed(self, t, node):
print ">>> store failed %s/%s" % (node.host, node.port)
- self.table.nodeFailed(node)
+ self.caller.nodeFailed(node)
self.outstanding -= 1
if self.finished:
return t
self.outstanding -= 1
if self.finished:
return t
self.finished = 1
self.callback(self.target, self.value, self.stored)
else:
self.finished = 1
self.callback(self.target, self.value, self.stored)
else:
- if not node.id == self.table.node.id:
+ if not node.id == self.caller.node.id:
self.outstanding += 1
try:
f = getattr(node, self.store)
except AttributeError:
print ">>> %s doesn't have a %s method!" % (node, self.store)
else:
self.outstanding += 1
try:
f = getattr(node, self.store)
except AttributeError:
print ">>> %s doesn't have a %s method!" % (node, self.store)
else:
- df = f(self.target, self.value, self.table.node.id)
+ df = f(self.target, self.value, self.caller.node.id)
df.addCallback(self.storedValue, node=node)
df.addErrback(self.storeFailed, node=node)
df.addCallback(self.storedValue, node=node)
df.addErrback(self.storeFailed, node=node)
self.refreshTable(force=1)
self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
self.refreshTable(force=1)
self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
- def Node(self):
- n = self._Node()
+ def Node(self, id, host = None, port = None):
+ """Create a new node."""
+ n = self._Node(id, host, port)
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
return n
def __del__(self):
return n
def __del__(self):
id = self.store.getSelfNode()
if not id:
id = newID()
id = self.store.getSelfNode()
if not id:
id = newID()
- return self._Node().init(id, host, port)
+ return self._Node(id, host, port)
def checkpoint(self, auto=0):
self.store.saveSelfNode(self.node.id)
def checkpoint(self, auto=0):
self.store.saveSelfNode(self.node.id)
"""
nodes = self.store.getRoutingTable()
for rec in nodes:
"""
nodes = self.store.getRoutingTable()
for rec in nodes:
- n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
- n.conn = self.udp.connectionForAddr((n.host, n.port))
+ n = self.Node(rec[0], rec[1], int(rec[2]))
self.table.insertNode(n, contacted=0)
self.table.insertNode(n, contacted=0)
- def _update_node(self, id, host, port):
- n = self.Node().init(id, host, port)
- n.conn = self.udp.connectionForAddr((host, port))
- self.insertNode(n, contacted=0)
-
#######
####### LOCAL INTERFACE - use these methods!
#######
####### LOCAL INTERFACE - use these methods!
"""
ping this node and add the contact info to the table on pong!
"""
"""
ping this node and add the contact info to the table on pong!
"""
- n =self.Node().init(NULL_ID, host, port)
- n.conn = self.udp.connectionForAddr((n.host, n.port))
+ n = self.Node(NULL_ID, host, port)
self.sendPing(n, callback=callback)
## this call is async!
self.sendPing(n, callback=callback)
## this call is async!
"""
df = node.ping(self.node.id)
## these are the callbacks we use when we issue a PING
"""
df = node.ping(self.node.id)
## these are the callbacks we use when we issue a PING
- def _pongHandler(dict, node=node, table=self.table, callback=callback):
- _krpc_sender = dict['_krpc_sender']
- dict = dict['rsp']
- sender = {'id' : dict['id']}
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = self.Node().initWithDict(sender)
- n.conn = self.udp.connectionForAddr((n.host, n.port))
- table.insertNode(n)
+ def _pongHandler(dict, node=node, self=self, callback=callback):
+ n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+ self.insertNode(n)
if callback:
callback()
def _defaultPong(err, node=node, table=self.table, callback=callback):
if callback:
callback()
def _defaultPong(err, node=node, table=self.table, callback=callback):
#### Remote Interface - called by remote nodes
def krpc_ping(self, id, _krpc_sender):
#### Remote Interface - called by remote nodes
def krpc_ping(self, id, _krpc_sender):
- self._update_node(id, _krpc_sender[0], _krpc_sender[1])
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted=0)
return {"id" : self.node.id}
def krpc_find_node(self, target, id, _krpc_sender):
return {"id" : self.node.id}
def krpc_find_node(self, target, id, _krpc_sender):
- self._update_node(id, _krpc_sender[0], _krpc_sender[1])
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted=0)
nodes = self.table.findNodes(target)
nodes = map(lambda node: node.senderDict(), nodes)
return {"nodes" : nodes, "id" : self.node.id}
nodes = self.table.findNodes(target)
nodes = map(lambda node: node.senderDict(), nodes)
return {"nodes" : nodes, "id" : self.node.id}
#### Remote Interface - called by remote nodes
def krpc_find_value(self, key, id, _krpc_sender):
#### Remote Interface - called by remote nodes
def krpc_find_value(self, key, id, _krpc_sender):
- self._update_node(id, _krpc_sender[0], _krpc_sender[1])
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted=0)
l = self.store.retrieveValues(key)
if len(l) > 0:
l = self.store.retrieveValues(key)
if len(l) > 0:
#### Remote Interface - called by remote nodes
def krpc_store_value(self, key, value, id, _krpc_sender):
#### Remote Interface - called by remote nodes
def krpc_store_value(self, key, value, id, _krpc_sender):
- self._update_node(id, _krpc_sender[0], _krpc_sender[1])
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted=0)
self.store.storeValue(key, value)
return {"id" : self.node.id}
self.store.storeValue(key, value)
return {"id" : self.node.id}
"""local routing table for a kademlia like distributed hash table"""
def __init__(self, node, config):
# this is the root node, a.k.a. US!
"""local routing table for a kademlia like distributed hash table"""
def __init__(self, node, config):
# this is the root node, a.k.a. US!
+ assert node.id != NULL_ID
self.node = node
self.config = config
self.buckets = [KBucket([], 0L, 2L**self.config['HASH_LENGTH'])]
self.node = node
self.config = config
self.buckets = [KBucket([], 0L, 2L**self.config['HASH_LENGTH'])]
def _bucketIndexForInt(self, num):
"""the index of the bucket that should hold int"""
def _bucketIndexForInt(self, num):
"""the index of the bucket that should hold int"""
class TestKTable(unittest.TestCase):
def setUp(self):
class TestKTable(unittest.TestCase):
def setUp(self):
- self.a = Node().init(khash.newID(), 'localhost', 2002)
+ self.a = Node(khash.newID(), 'localhost', 2002)
self.t = KTable(self.a, {'HASH_LENGTH': 160, 'K': 8, 'MAX_FAILURES': 3})
def testAddNode(self):
self.t = KTable(self.a, {'HASH_LENGTH': 160, 'K': 8, 'MAX_FAILURES': 3})
def testAddNode(self):
- self.b = Node().init(khash.newID(), 'localhost', 2003)
+ self.b = Node(khash.newID(), 'localhost', 2003)
self.t.insertNode(self.b)
self.assertEqual(len(self.t.buckets[0].l), 1)
self.assertEqual(self.t.buckets[0].l[0], self.b)
self.t.insertNode(self.b)
self.assertEqual(len(self.t.buckets[0].l), 1)
self.assertEqual(self.t.buckets[0].l[0], self.b)
class Node:
"""encapsulate contact info"""
class Node:
"""encapsulate contact info"""
+ def __init__(self, id, host = None, port = None):
self.fails = 0
self.lastSeen = datetime(MINYEAR, 1, 1)
self.fails = 0
self.lastSeen = datetime(MINYEAR, 1, 1)
- self.id = self.host = self.port = ''
-
- def init(self, id, host, port):
+
+ # Alternate method, init Node from dictionary
+ if isinstance(id, dict):
+ host = id['host']
+ port = id['port']
+ id = id['id']
+
+ assert(isinstance(id, str))
+ assert(isinstance(host, str))
self.id = id
self.num = khash.intify(id)
self.host = host
self.id = id
self.num = khash.intify(id)
self.host = host
self._senderDict = {'id': self.id, 'port' : self.port, 'host' : self.host}
self._senderDict = {'id': self.id, 'port' : self.port, 'host' : self.host}
- return self
-
- def initWithDict(self, dict):
- self._senderDict = dict
- self.id = dict['id']
- self.num = khash.intify(self.id)
- self.port = dict['port']
- self.host = dict['host']
- return self
def updateLastSeen(self):
self.lastSeen = datetime.now()
def updateLastSeen(self):
self.lastSeen = datetime.now()
class TestNode(unittest.TestCase):
def setUp(self):
class TestNode(unittest.TestCase):
def setUp(self):
- self.node = Node().init(khash.newID(), 'localhost', 2002)
+ self.node = Node(khash.newID(), 'localhost', 2002)
def testUpdateLastSeen(self):
t = self.node.lastSeen
self.node.updateLastSeen()
def testUpdateLastSeen(self):
t = self.node.lastSeen
self.node.updateLastSeen()