_krpc_sender = dict['_krpc_sender']
dict = dict['rsp']
l = dict["nodes"]
- sender = dict["sender"]
+ sender = {'id' : dict["id"]}
sender['port'] = _krpc_sender[1]
+ sender['host'] = _krpc_sender[0]
sender = Node().initWithDict(sender)
sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
self.table.table.insertNode(sender)
return self.callback([node])
if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
#xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
- df = node.findNode(self.target, self.table.node.senderDict())
+ df = node.findNode(self.target, self.table.node.id)
df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
def handleGotNodes(self, dict):
_krpc_sender = dict['_krpc_sender']
dict = dict['rsp']
- sender = dict["sender"]
- sender['port'] = _krpc_sender[1]
+ sender = {'id' : dict["id"]}
+ sender['port'] = _krpc_sender[1]
+ sender['host'] = _krpc_sender[0]
sender = Node().initWithDict(sender)
sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
self.table.table.insertNode(sender)
for node in l[:K]:
if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
#xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
- df = node.findValue(self.target, self.table.node.senderDict())
+ df = node.findValue(self.target, self.table.node.id)
df.addCallback(self.handleGotNodes)
df.addErrback(self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
else:
if not node.id == self.table.node.id:
self.outstanding += 1
- df = node.storeValue(self.target, self.value, self.table.node.senderDict())
+ df = node.storeValue(self.target, self.value, self.table.node.id)
df.addCallback(self.storedValue, node=node)
df.addErrback(self.storeFailed, node=node)
c.execute("delete from nodes where id not NULL;")
for bucket in self.table.buckets:
for node in bucket.l:
- d = node.senderDict()
- c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(d['id']), d['host'], d['port']))
+ c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
self.store.commit()
self.store.autocommit = 1;
def _notStaleNodeHandler(dict, old=old):
""" called when we get a pong from the old node """
dict = dict['rsp']
- sender = dict['sender']
- if sender['id'] == old.id:
+ if dict['id'] == old.id:
self.table.justSeenNode(old.id)
- df = old.ping(self.node.senderDict())
+ df = old.ping(self.node.id)
df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
def sendPing(self, node, callback=None):
"""
ping a node
"""
- df = node.ping(self.node.senderDict())
+ df = node.ping(self.node.id)
## these are the callbacks we use when we issue a PING
def _pongHandler(dict, node=node, table=self.table, callback=callback):
_krpc_sender = dict['_krpc_sender']
dict = dict['rsp']
- sender = dict['sender']
+ sender = {'id' : dict['id']}
if node.id != const.NULL_ID and node.id != sender['id']:
# whoah, got response from different peer than we were expecting
self.table.invalidateNode(node)
#####
##### INCOMING MESSAGE HANDLERS
- def krpc_ping(self, sender, _krpc_sender):
- """
- takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
- returns sender dict
- """
+ def krpc_ping(self, id, _krpc_sender):
+ sender = {'id' : id}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
- return {"sender" : self.node.senderDict()}
+ return {"id" : self.node.id}
- def krpc_find_node(self, target, sender, _krpc_sender):
+ def krpc_find_node(self, target, id, _krpc_sender):
nodes = self.table.findNodes(target)
nodes = map(lambda node: node.senderDict(), nodes)
+ sender = {'id' : id}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
- return {"nodes" : nodes, "sender" : self.node.senderDict()}
+ return {"nodes" : nodes, "id" : self.node.id}
- def krpc_store_value(self, key, value, sender, _krpc_sender):
+ def krpc_store_value(self, key, value, id, _krpc_sender):
t = "%0.6f" % time.time()
c = self.store.cursor()
try:
except sqlite.IntegrityError, reason:
# update last insert time
c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
+ sender = {'id' : id}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
- return {"sender" : self.node.senderDict()}
+ return {"id" : self.node.id}
- def krpc_find_value(self, key, sender, _krpc_sender):
+ def krpc_find_value(self, key, id, _krpc_sender):
+ sender = {'id' : id}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
l = self.retrieveValues(key)
if len(l) > 0:
- return {'values' : l, "sender": self.node.senderDict()}
+ return {'values' : l, "id": self.node.id}
else:
nodes = self.table.findNodes(key)
nodes = map(lambda node: node.senderDict(), nodes)
- return {'nodes' : nodes, "sender": self.node.senderDict()}
+ return {'nodes' : nodes, "id": self.node.id}
--- /dev/null
+#
+# knet.py
+# UberTracker
+#
+# Created by andrew loewenstern on Sun Jun 13 2004.
+# Copyright (c) 2004 __MyCompanyName__. All rights reserved.
+#
+
+from khashmir.khashmir import Khashmir
+from twisted.internet import reactor
+from whrandom import randrange
+import sys, os
+
+class Network:
+ def __init__(self, size=0, startport=5555, localip='127.0.0.1'):
+ self.num = size
+ self.startport = startport
+ self.localip = localip
+
+ def _done(self, val):
+ self.done = 1
+
+ def setUp(self):
+ self.kfiles()
+ self.l = []
+ for i in range(self.num):
+ self.l.append(Khashmir('', self.startport + i, '/tmp/kh%s.db' % (self.startport + i)))
+ reactor.iterate()
+ reactor.iterate()
+
+ for i in self.l:
+ i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+ i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+ i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+
+ for i in self.l:
+ self.done = 0
+ i.findCloseNodes(self._done)
+ while not self.done:
+ reactor.iterate()
+ for i in self.l:
+ self.done = 0
+ i.findCloseNodes(self._done)
+ while not self.done:
+ reactor.iterate()
+
+ def tearDown(self):
+ for i in self.l:
+ i.listenport.stopListening()
+ self.kfiles()
+
+ def kfiles(self):
+ for i in range(self.startport, self.startport+self.num):
+ try:
+ os.unlink('/tmp/kh%s.db' % i)
+ except:
+ pass
+
+ reactor.iterate()
+
+if __name__ == "__main__":
+ n = Network(int(sys.argv[1]), int(sys.argv[2]), sys.argv[3])
+ n.setUp()
+ try:
+ reactor.run()
+ finally:
+ n.tearDown()
class KNode(Node):
def checkSender(self, dict):
try:
- senderid = dict['rsp']['sender']['id']
+ senderid = dict['rsp']['id']
except KeyError:
print ">>>> No peer id in response"
raise Exception, "No peer id in response."
print "Got response from different node than expected."
raise Exception, "Got response from different node than expected."
return dict
+
+ def errBack(self, err):
+ print ">>> ", err
+ return err
- def ping(self, sender):
- df = self.conn.sendRequest('ping', {"sender":sender})
+ def ping(self, id):
+ df = self.conn.sendRequest('ping', {"id":id})
+ df.addErrback(self.errBack)
df.addCallback(self.checkSender)
return df
- def findNode(self, target, sender):
- df = self.conn.sendRequest('find_node', {"target" : target, "sender": sender})
+ def findNode(self, target, id):
+ df = self.conn.sendRequest('find_node', {"target" : target, "id": id})
+ df.addErrback(self.errBack)
df.addCallback(self.checkSender)
return df
- def storeValue(self, key, value, sender):
- df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "sender": sender})
+ def storeValue(self, key, value, id):
+ df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id})
+ df.addErrback(self.errBack)
df.addCallback(self.checkSender)
return df
- def findValue(self, key, sender):
- df = self.conn.sendRequest('find_value', {"key" : key, "sender" : sender})
+ def findValue(self, key, id):
+ df = self.conn.sendRequest('find_value', {"key" : key, "id" : id})
+ df.addErrback(self.errBack)
df.addCallback(self.checkSender)
return df
from twisted.internet import reactor
import time
+import sys
+from traceback import format_exception
+
import khash as hash
KRPC_TIMEOUT = 60
ret = apply(f, (), msg[ARG])
except Exception, e:
## send error
- out = bencode({TID:msg[TID], TYP:ERR, ERR :`e`})
+ out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
olen = len(out)
self.transport.write(out, addr)
else:
del(self.tids[msg[TID]])
df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
else:
- print 'timeout ' + `msg[RSP]['sender']`
+ print 'timeout ' + `msg[RSP]['id']`
# no tid, this transaction timed out already...
elif msg[TYP] == ERR:
# if error