From 3a8164f9c878f1c8af2958bc932ddfdb9f799a09 Mon Sep 17 00:00:00 2001 From: burris Date: Mon, 14 Jun 2004 04:53:26 +0000 Subject: [PATCH] only send the ID along in khashmir messages, don't send the host and port, get that from the socket instead --- actions.py | 14 ++++++----- khashmir.py | 38 ++++++++++++++--------------- knet.py | 70 +++++++++++++++++++++++++++++++++++++++++++++++++++++ knode.py | 26 +++++++++++++------- krpc.py | 7 ++++-- 5 files changed, 118 insertions(+), 37 deletions(-) create mode 100644 knet.py diff --git a/actions.py b/actions.py index 9e77196..1afdeb1 100644 --- a/actions.py +++ b/actions.py @@ -46,8 +46,9 @@ class FindNode(ActionBase): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] l = dict["nodes"] - sender = dict["sender"] + sender = {'id' : dict["id"]} sender['port'] = _krpc_sender[1] + sender['host'] = _krpc_sender[0] sender = Node().initWithDict(sender) sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) self.table.table.insertNode(sender) @@ -77,7 +78,7 @@ class FindNode(ActionBase): return self.callback([node]) if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.senderDict()) + df = node.findNode(self.target, self.table.node.id) df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 @@ -117,8 +118,9 @@ class GetValue(FindNode): def handleGotNodes(self, dict): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] - sender = dict["sender"] - sender['port'] = _krpc_sender[1] + sender = {'id' : dict["id"]} + sender['port'] = _krpc_sender[1] + sender['host'] = _krpc_sender[0] sender = Node().initWithDict(sender) sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) self.table.table.insertNode(sender) @@ -158,7 +160,7 @@ class GetValue(FindNode): for node in l[:K]: if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT - df = node.findValue(self.target, self.table.node.senderDict()) + df = node.findValue(self.target, self.table.node.id) df.addCallback(self.handleGotNodes) df.addErrback(self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 @@ -229,7 +231,7 @@ class StoreValue(ActionBase): else: if not node.id == self.table.node.id: self.outstanding += 1 - df = node.storeValue(self.target, self.value, self.table.node.senderDict()) + df = node.storeValue(self.target, self.value, self.table.node.id) df.addCallback(self.storedValue, node=node) df.addErrback(self.storeFailed, node=node) diff --git a/khashmir.py b/khashmir.py index 7091f80..92619fd 100644 --- a/khashmir.py +++ b/khashmir.py @@ -117,8 +117,7 @@ class Khashmir(protocol.Factory): c.execute("delete from nodes where id not NULL;") for bucket in self.table.buckets: for node in bucket.l: - d = node.senderDict() - c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(d['id']), d['host'], d['port'])) + c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port)) self.store.commit() self.store.autocommit = 1; @@ -223,23 +222,22 @@ class Khashmir(protocol.Factory): def _notStaleNodeHandler(dict, old=old): """ called when we get a pong from the old node """ dict = dict['rsp'] - sender = dict['sender'] - if sender['id'] == old.id: + if dict['id'] == old.id: self.table.justSeenNode(old.id) - df = old.ping(self.node.senderDict()) + df = old.ping(self.node.id) df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) def sendPing(self, node, callback=None): """ ping a node """ - df = node.ping(self.node.senderDict()) + df = node.ping(self.node.id) ## these are the callbacks we use when we issue a PING def _pongHandler(dict, node=node, table=self.table, callback=callback): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] - sender = dict['sender'] + sender = {'id' : dict['id']} if node.id != const.NULL_ID and node.id != sender['id']: # whoah, got response from different peer than we were expecting self.table.invalidateNode(node) @@ -293,29 +291,27 @@ class Khashmir(protocol.Factory): ##### ##### INCOMING MESSAGE HANDLERS - def krpc_ping(self, sender, _krpc_sender): - """ - takes sender dict = {'id', , 'port', port} optional keys = 'ip' - returns sender dict - """ + def krpc_ping(self, id, _krpc_sender): + sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) - return {"sender" : self.node.senderDict()} + return {"id" : self.node.id} - def krpc_find_node(self, target, sender, _krpc_sender): + def krpc_find_node(self, target, id, _krpc_sender): nodes = self.table.findNodes(target) nodes = map(lambda node: node.senderDict(), nodes) + sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) - return {"nodes" : nodes, "sender" : self.node.senderDict()} + return {"nodes" : nodes, "id" : self.node.id} - def krpc_store_value(self, key, value, sender, _krpc_sender): + def krpc_store_value(self, key, value, id, _krpc_sender): t = "%0.6f" % time.time() c = self.store.cursor() try: @@ -323,14 +319,16 @@ class Khashmir(protocol.Factory): except sqlite.IntegrityError, reason: # update last insert time c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value))) + sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) - return {"sender" : self.node.senderDict()} + return {"id" : self.node.id} - def krpc_find_value(self, key, sender, _krpc_sender): + def krpc_find_value(self, key, id, _krpc_sender): + sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) @@ -339,9 +337,9 @@ class Khashmir(protocol.Factory): l = self.retrieveValues(key) if len(l) > 0: - return {'values' : l, "sender": self.node.senderDict()} + return {'values' : l, "id": self.node.id} else: nodes = self.table.findNodes(key) nodes = map(lambda node: node.senderDict(), nodes) - return {'nodes' : nodes, "sender": self.node.senderDict()} + return {'nodes' : nodes, "id": self.node.id} diff --git a/knet.py b/knet.py new file mode 100644 index 0000000..8a1536c --- /dev/null +++ b/knet.py @@ -0,0 +1,70 @@ +# +# knet.py +# UberTracker +# +# Created by andrew loewenstern on Sun Jun 13 2004. +# Copyright (c) 2004 __MyCompanyName__. All rights reserved. +# + +from khashmir.khashmir import Khashmir +from twisted.internet import reactor +from whrandom import randrange +import sys, os + +class Network: + def __init__(self, size=0, startport=5555, localip='127.0.0.1'): + self.num = size + self.startport = startport + self.localip = localip + + def _done(self, val): + self.done = 1 + + def setUp(self): + self.kfiles() + self.l = [] + for i in range(self.num): + self.l.append(Khashmir('', self.startport + i, '/tmp/kh%s.db' % (self.startport + i))) + reactor.iterate() + reactor.iterate() + + for i in self.l: + i.addContact(self.localip, self.l[randrange(0,self.num)].port) + i.addContact(self.localip, self.l[randrange(0,self.num)].port) + i.addContact(self.localip, self.l[randrange(0,self.num)].port) + reactor.iterate() + reactor.iterate() + reactor.iterate() + + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + reactor.iterate() + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + reactor.iterate() + + def tearDown(self): + for i in self.l: + i.listenport.stopListening() + self.kfiles() + + def kfiles(self): + for i in range(self.startport, self.startport+self.num): + try: + os.unlink('/tmp/kh%s.db' % i) + except: + pass + + reactor.iterate() + +if __name__ == "__main__": + n = Network(int(sys.argv[1]), int(sys.argv[2]), sys.argv[3]) + n.setUp() + try: + reactor.run() + finally: + n.tearDown() diff --git a/knode.py b/knode.py index 70069fa..791e725 100644 --- a/knode.py +++ b/knode.py @@ -13,7 +13,7 @@ class IDChecker: class KNode(Node): def checkSender(self, dict): try: - senderid = dict['rsp']['sender']['id'] + senderid = dict['rsp']['id'] except KeyError: print ">>>> No peer id in response" raise Exception, "No peer id in response." @@ -22,20 +22,28 @@ class KNode(Node): print "Got response from different node than expected." raise Exception, "Got response from different node than expected." return dict + + def errBack(self, err): + print ">>> ", err + return err - def ping(self, sender): - df = self.conn.sendRequest('ping', {"sender":sender}) + def ping(self, id): + df = self.conn.sendRequest('ping', {"id":id}) + df.addErrback(self.errBack) df.addCallback(self.checkSender) return df - def findNode(self, target, sender): - df = self.conn.sendRequest('find_node', {"target" : target, "sender": sender}) + def findNode(self, target, id): + df = self.conn.sendRequest('find_node', {"target" : target, "id": id}) + df.addErrback(self.errBack) df.addCallback(self.checkSender) return df - def storeValue(self, key, value, sender): - df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "sender": sender}) + def storeValue(self, key, value, id): + df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id}) + df.addErrback(self.errBack) df.addCallback(self.checkSender) return df - def findValue(self, key, sender): - df = self.conn.sendRequest('find_value', {"key" : key, "sender" : sender}) + def findValue(self, key, id): + df = self.conn.sendRequest('find_value', {"key" : key, "id" : id}) + df.addErrback(self.errBack) df.addCallback(self.checkSender) return df diff --git a/krpc.py b/krpc.py index 7e64544..1cf33d5 100644 --- a/krpc.py +++ b/krpc.py @@ -10,6 +10,9 @@ from twisted.internet import protocol from twisted.internet import reactor import time +import sys +from traceback import format_exception + import khash as hash KRPC_TIMEOUT = 60 @@ -89,7 +92,7 @@ class KRPC: ret = apply(f, (), msg[ARG]) except Exception, e: ## send error - out = bencode({TID:msg[TID], TYP:ERR, ERR :`e`}) + out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`}) olen = len(out) self.transport.write(out, addr) else: @@ -121,7 +124,7 @@ class KRPC: del(self.tids[msg[TID]]) df.callback({'rsp' : msg[RSP], '_krpc_sender': addr}) else: - print 'timeout ' + `msg[RSP]['sender']` + print 'timeout ' + `msg[RSP]['id']` # no tid, this transaction timed out already... elif msg[TYP] == ERR: # if error -- 2.39.2