only send the ID along in khashmir messages, don't send the host and
authorburris <burris>
Mon, 14 Jun 2004 04:53:26 +0000 (04:53 +0000)
committerburris <burris>
Mon, 14 Jun 2004 04:53:26 +0000 (04:53 +0000)
port, get that from the socket instead

actions.py
khashmir.py
knet.py [new file with mode: 0644]
knode.py
krpc.py

index 9e77196b7138355be0db415194a8e10b05e9f3cd..1afdeb146e0ef56f637ec77721cabac62b5c20c8 100644 (file)
@@ -46,8 +46,9 @@ class FindNode(ActionBase):
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
         l = dict["nodes"]
-        sender = dict["sender"]
+        sender = {'id' : dict["id"]}
         sender['port'] = _krpc_sender[1]        
+        sender['host'] = _krpc_sender[0]        
         sender = Node().initWithDict(sender)
         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
@@ -77,7 +78,7 @@ class FindNode(ActionBase):
                 return self.callback([node])
             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-                df = node.findNode(self.target, self.table.node.senderDict())
+                df = node.findNode(self.target, self.table.node.id)
                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
                 self.outstanding = self.outstanding + 1
                 self.queried[node.id] = 1
@@ -117,8 +118,9 @@ class GetValue(FindNode):
     def handleGotNodes(self, dict):
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
-        sender = dict["sender"]
-        sender['port'] = _krpc_sender[1]        
+        sender = {'id' : dict["id"]}
+        sender['port'] = _krpc_sender[1]
+        sender['host'] = _krpc_sender[0]                
         sender = Node().initWithDict(sender)
         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
@@ -158,7 +160,7 @@ class GetValue(FindNode):
         for node in l[:K]:
             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
-                df = node.findValue(self.target, self.table.node.senderDict())
+                df = node.findValue(self.target, self.table.node.id)
                 df.addCallback(self.handleGotNodes)
                 df.addErrback(self.makeMsgFailed(node))
                 self.outstanding = self.outstanding + 1
@@ -229,7 +231,7 @@ class StoreValue(ActionBase):
             else:
                 if not node.id == self.table.node.id:
                     self.outstanding += 1
-                    df = node.storeValue(self.target, self.value, self.table.node.senderDict())
+                    df = node.storeValue(self.target, self.value, self.table.node.id)
                     df.addCallback(self.storedValue, node=node)
                     df.addErrback(self.storeFailed, node=node)
                     
index 7091f805f1069cda6ff47a282c84d2436e289299..92619fdb2e2c4627042a2df1cef0f3e93d5a33e6 100644 (file)
@@ -117,8 +117,7 @@ class Khashmir(protocol.Factory):
         c.execute("delete from nodes where id not NULL;")
         for bucket in self.table.buckets:
             for node in bucket.l:
-                d = node.senderDict()
-                c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(d['id']), d['host'], d['port']))
+                c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
         self.store.commit()
         self.store.autocommit = 1;
         
@@ -223,23 +222,22 @@ class Khashmir(protocol.Factory):
             def _notStaleNodeHandler(dict, old=old):
                 """ called when we get a pong from the old node """
                 dict = dict['rsp']
-                sender = dict['sender']
-                if sender['id'] == old.id:
+                if dict['id'] == old.id:
                     self.table.justSeenNode(old.id)
             
-            df = old.ping(self.node.senderDict())
+            df = old.ping(self.node.id)
             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
     def sendPing(self, node, callback=None):
         """
             ping a node
         """
-        df = node.ping(self.node.senderDict())
+        df = node.ping(self.node.id)
         ## these are the callbacks we use when we issue a PING
         def _pongHandler(dict, node=node, table=self.table, callback=callback):
             _krpc_sender = dict['_krpc_sender']
             dict = dict['rsp']
-            sender = dict['sender']
+            sender = {'id' : dict['id']}
             if node.id != const.NULL_ID and node.id != sender['id']:
                 # whoah, got response from different peer than we were expecting
                 self.table.invalidateNode(node)
@@ -293,29 +291,27 @@ class Khashmir(protocol.Factory):
     #####
     ##### INCOMING MESSAGE HANDLERS
     
-    def krpc_ping(self, sender, _krpc_sender):
-        """
-            takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
-            returns sender dict
-        """
+    def krpc_ping(self, id, _krpc_sender):
+        sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
-        return {"sender" : self.node.senderDict()}
+        return {"id" : self.node.id}
         
-    def krpc_find_node(self, target, sender, _krpc_sender):
+    def krpc_find_node(self, target, id, _krpc_sender):
         nodes = self.table.findNodes(target)
         nodes = map(lambda node: node.senderDict(), nodes)
+        sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
-        return {"nodes" : nodes, "sender" : self.node.senderDict()}
+        return {"nodes" : nodes, "id" : self.node.id}
             
-    def krpc_store_value(self, key, value, sender, _krpc_sender):
+    def krpc_store_value(self, key, value, id, _krpc_sender):
         t = "%0.6f" % time.time()
         c = self.store.cursor()
         try:
@@ -323,14 +319,16 @@ class Khashmir(protocol.Factory):
         except sqlite.IntegrityError, reason:
             # update last insert time
             c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
+        sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
-        return {"sender" : self.node.senderDict()}
+        return {"id" : self.node.id}
     
-    def krpc_find_value(self, key, sender, _krpc_sender):
+    def krpc_find_value(self, key, id, _krpc_sender):
+        sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
@@ -339,9 +337,9 @@ class Khashmir(protocol.Factory):
     
         l = self.retrieveValues(key)
         if len(l) > 0:
-            return {'values' : l, "sender": self.node.senderDict()}
+            return {'values' : l, "id": self.node.id}
         else:
             nodes = self.table.findNodes(key)
             nodes = map(lambda node: node.senderDict(), nodes)
-            return {'nodes' : nodes, "sender": self.node.senderDict()}
+            return {'nodes' : nodes, "id": self.node.id}
 
diff --git a/knet.py b/knet.py
new file mode 100644 (file)
index 0000000..8a1536c
--- /dev/null
+++ b/knet.py
@@ -0,0 +1,70 @@
+#
+#  knet.py
+#  UberTracker
+#
+#  Created by andrew loewenstern on Sun Jun 13 2004.
+#  Copyright (c) 2004 __MyCompanyName__. All rights reserved.
+#
+
+from khashmir.khashmir import Khashmir
+from twisted.internet import reactor
+from whrandom import randrange
+import sys, os
+
+class Network:
+    def __init__(self, size=0, startport=5555, localip='127.0.0.1'):
+        self.num = size
+        self.startport = startport
+        self.localip = localip
+
+    def _done(self, val):
+        self.done = 1
+        
+    def setUp(self):
+        self.kfiles()
+        self.l = []
+        for i in range(self.num):
+            self.l.append(Khashmir('', self.startport + i, '/tmp/kh%s.db' % (self.startport + i)))
+        reactor.iterate()
+        reactor.iterate()
+        
+        for i in self.l:
+            i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+            i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+            i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+            reactor.iterate()
+            reactor.iterate()
+            reactor.iterate() 
+            
+        for i in self.l:
+            self.done = 0
+            i.findCloseNodes(self._done)
+            while not self.done:
+                reactor.iterate()
+        for i in self.l:
+            self.done = 0
+            i.findCloseNodes(self._done)
+            while not self.done:
+                reactor.iterate()
+
+    def tearDown(self):
+        for i in self.l:
+            i.listenport.stopListening()
+        self.kfiles()
+        
+    def kfiles(self):
+        for i in range(self.startport, self.startport+self.num):
+            try:
+                os.unlink('/tmp/kh%s.db' % i)
+            except:
+                pass
+            
+        reactor.iterate()
+    
+if __name__ == "__main__":
+    n = Network(int(sys.argv[1]), int(sys.argv[2]), sys.argv[3])
+    n.setUp()
+    try:
+        reactor.run()
+    finally:
+        n.tearDown()
index 70069fae710339686bd978bfca8ecf802afe505a..791e725c4a616cd85b1f98e9af76a97755c7b32c 100644 (file)
--- a/knode.py
+++ b/knode.py
@@ -13,7 +13,7 @@ class IDChecker:
 class KNode(Node):
     def checkSender(self, dict):
         try:
-            senderid = dict['rsp']['sender']['id']
+            senderid = dict['rsp']['id']
         except KeyError:
             print ">>>> No peer id in response"
             raise Exception, "No peer id in response."
@@ -22,20 +22,28 @@ class KNode(Node):
                 print "Got response from different node than expected."
                 raise Exception, "Got response from different node than expected."
         return dict
+
+    def errBack(self, err):
+        print ">>> ", err
+        return err
         
-    def ping(self, sender):
-        df = self.conn.sendRequest('ping', {"sender":sender})
+    def ping(self, id):
+        df = self.conn.sendRequest('ping', {"id":id})
+        df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
-    def findNode(self, target, sender):
-        df = self.conn.sendRequest('find_node', {"target" : target, "sender": sender})
+    def findNode(self, target, id):
+        df = self.conn.sendRequest('find_node', {"target" : target, "id": id})
+        df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
-    def storeValue(self, key, value, sender):
-        df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "sender": sender})
+    def storeValue(self, key, value, id):
+        df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id})
+        df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
-    def findValue(self, key, sender):
-        df =  self.conn.sendRequest('find_value', {"key" : key, "sender" : sender})
+    def findValue(self, key, id):
+        df =  self.conn.sendRequest('find_value', {"key" : key, "id" : id})
+        df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
diff --git a/krpc.py b/krpc.py
index 7e64544f4103b26e8b0ed925d6144e5c40a8561b..1cf33d5478fe52df203c53e246521e2f44836ffe 100644 (file)
--- a/krpc.py
+++ b/krpc.py
@@ -10,6 +10,9 @@ from twisted.internet import protocol
 from twisted.internet import reactor
 import time
 
+import sys
+from traceback import format_exception
+
 import khash as hash
 
 KRPC_TIMEOUT = 60
@@ -89,7 +92,7 @@ class KRPC:
                         ret = apply(f, (), msg[ARG])
                     except Exception, e:
                         ## send error
-                        out = bencode({TID:msg[TID], TYP:ERR, ERR :`e`})
+                        out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
                         olen = len(out)
                         self.transport.write(out, addr)
                     else:
@@ -121,7 +124,7 @@ class KRPC:
                     del(self.tids[msg[TID]])
                     df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
                 else:
-                    print 'timeout ' + `msg[RSP]['sender']`
+                    print 'timeout ' + `msg[RSP]['id']`
                     # no tid, this transaction timed out already...
             elif msg[TYP] == ERR:
                 # if error