]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
only send the ID along in khashmir messages, don't send the host and
authorburris <burris>
Mon, 14 Jun 2004 04:53:26 +0000 (04:53 +0000)
committerburris <burris>
Mon, 14 Jun 2004 04:53:26 +0000 (04:53 +0000)
port, get that from the socket instead

actions.py
khashmir.py
knet.py [new file with mode: 0644]
knode.py
krpc.py

index 9e77196b7138355be0db415194a8e10b05e9f3cd..1afdeb146e0ef56f637ec77721cabac62b5c20c8 100644 (file)
@@ -46,8 +46,9 @@ class FindNode(ActionBase):
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
         l = dict["nodes"]
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
         l = dict["nodes"]
-        sender = dict["sender"]
+        sender = {'id' : dict["id"]}
         sender['port'] = _krpc_sender[1]        
         sender['port'] = _krpc_sender[1]        
+        sender['host'] = _krpc_sender[0]        
         sender = Node().initWithDict(sender)
         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
         sender = Node().initWithDict(sender)
         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
@@ -77,7 +78,7 @@ class FindNode(ActionBase):
                 return self.callback([node])
             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
                 return self.callback([node])
             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-                df = node.findNode(self.target, self.table.node.senderDict())
+                df = node.findNode(self.target, self.table.node.id)
                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
                 self.outstanding = self.outstanding + 1
                 self.queried[node.id] = 1
                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
                 self.outstanding = self.outstanding + 1
                 self.queried[node.id] = 1
@@ -117,8 +118,9 @@ class GetValue(FindNode):
     def handleGotNodes(self, dict):
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
     def handleGotNodes(self, dict):
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
-        sender = dict["sender"]
-        sender['port'] = _krpc_sender[1]        
+        sender = {'id' : dict["id"]}
+        sender['port'] = _krpc_sender[1]
+        sender['host'] = _krpc_sender[0]                
         sender = Node().initWithDict(sender)
         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
         sender = Node().initWithDict(sender)
         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
@@ -158,7 +160,7 @@ class GetValue(FindNode):
         for node in l[:K]:
             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
         for node in l[:K]:
             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
-                df = node.findValue(self.target, self.table.node.senderDict())
+                df = node.findValue(self.target, self.table.node.id)
                 df.addCallback(self.handleGotNodes)
                 df.addErrback(self.makeMsgFailed(node))
                 self.outstanding = self.outstanding + 1
                 df.addCallback(self.handleGotNodes)
                 df.addErrback(self.makeMsgFailed(node))
                 self.outstanding = self.outstanding + 1
@@ -229,7 +231,7 @@ class StoreValue(ActionBase):
             else:
                 if not node.id == self.table.node.id:
                     self.outstanding += 1
             else:
                 if not node.id == self.table.node.id:
                     self.outstanding += 1
-                    df = node.storeValue(self.target, self.value, self.table.node.senderDict())
+                    df = node.storeValue(self.target, self.value, self.table.node.id)
                     df.addCallback(self.storedValue, node=node)
                     df.addErrback(self.storeFailed, node=node)
                     
                     df.addCallback(self.storedValue, node=node)
                     df.addErrback(self.storeFailed, node=node)
                     
index 7091f805f1069cda6ff47a282c84d2436e289299..92619fdb2e2c4627042a2df1cef0f3e93d5a33e6 100644 (file)
@@ -117,8 +117,7 @@ class Khashmir(protocol.Factory):
         c.execute("delete from nodes where id not NULL;")
         for bucket in self.table.buckets:
             for node in bucket.l:
         c.execute("delete from nodes where id not NULL;")
         for bucket in self.table.buckets:
             for node in bucket.l:
-                d = node.senderDict()
-                c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(d['id']), d['host'], d['port']))
+                c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
         self.store.commit()
         self.store.autocommit = 1;
         
         self.store.commit()
         self.store.autocommit = 1;
         
@@ -223,23 +222,22 @@ class Khashmir(protocol.Factory):
             def _notStaleNodeHandler(dict, old=old):
                 """ called when we get a pong from the old node """
                 dict = dict['rsp']
             def _notStaleNodeHandler(dict, old=old):
                 """ called when we get a pong from the old node """
                 dict = dict['rsp']
-                sender = dict['sender']
-                if sender['id'] == old.id:
+                if dict['id'] == old.id:
                     self.table.justSeenNode(old.id)
             
                     self.table.justSeenNode(old.id)
             
-            df = old.ping(self.node.senderDict())
+            df = old.ping(self.node.id)
             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
     def sendPing(self, node, callback=None):
         """
             ping a node
         """
             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
     def sendPing(self, node, callback=None):
         """
             ping a node
         """
-        df = node.ping(self.node.senderDict())
+        df = node.ping(self.node.id)
         ## these are the callbacks we use when we issue a PING
         def _pongHandler(dict, node=node, table=self.table, callback=callback):
             _krpc_sender = dict['_krpc_sender']
             dict = dict['rsp']
         ## these are the callbacks we use when we issue a PING
         def _pongHandler(dict, node=node, table=self.table, callback=callback):
             _krpc_sender = dict['_krpc_sender']
             dict = dict['rsp']
-            sender = dict['sender']
+            sender = {'id' : dict['id']}
             if node.id != const.NULL_ID and node.id != sender['id']:
                 # whoah, got response from different peer than we were expecting
                 self.table.invalidateNode(node)
             if node.id != const.NULL_ID and node.id != sender['id']:
                 # whoah, got response from different peer than we were expecting
                 self.table.invalidateNode(node)
@@ -293,29 +291,27 @@ class Khashmir(protocol.Factory):
     #####
     ##### INCOMING MESSAGE HANDLERS
     
     #####
     ##### INCOMING MESSAGE HANDLERS
     
-    def krpc_ping(self, sender, _krpc_sender):
-        """
-            takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
-            returns sender dict
-        """
+    def krpc_ping(self, id, _krpc_sender):
+        sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
-        return {"sender" : self.node.senderDict()}
+        return {"id" : self.node.id}
         
         
-    def krpc_find_node(self, target, sender, _krpc_sender):
+    def krpc_find_node(self, target, id, _krpc_sender):
         nodes = self.table.findNodes(target)
         nodes = map(lambda node: node.senderDict(), nodes)
         nodes = self.table.findNodes(target)
         nodes = map(lambda node: node.senderDict(), nodes)
+        sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
-        return {"nodes" : nodes, "sender" : self.node.senderDict()}
+        return {"nodes" : nodes, "id" : self.node.id}
             
             
-    def krpc_store_value(self, key, value, sender, _krpc_sender):
+    def krpc_store_value(self, key, value, id, _krpc_sender):
         t = "%0.6f" % time.time()
         c = self.store.cursor()
         try:
         t = "%0.6f" % time.time()
         c = self.store.cursor()
         try:
@@ -323,14 +319,16 @@ class Khashmir(protocol.Factory):
         except sqlite.IntegrityError, reason:
             # update last insert time
             c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
         except sqlite.IntegrityError, reason:
             # update last insert time
             c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
+        sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
-        return {"sender" : self.node.senderDict()}
+        return {"id" : self.node.id}
     
     
-    def krpc_find_value(self, key, sender, _krpc_sender):
+    def krpc_find_value(self, key, id, _krpc_sender):
+        sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
@@ -339,9 +337,9 @@ class Khashmir(protocol.Factory):
     
         l = self.retrieveValues(key)
         if len(l) > 0:
     
         l = self.retrieveValues(key)
         if len(l) > 0:
-            return {'values' : l, "sender": self.node.senderDict()}
+            return {'values' : l, "id": self.node.id}
         else:
             nodes = self.table.findNodes(key)
             nodes = map(lambda node: node.senderDict(), nodes)
         else:
             nodes = self.table.findNodes(key)
             nodes = map(lambda node: node.senderDict(), nodes)
-            return {'nodes' : nodes, "sender": self.node.senderDict()}
+            return {'nodes' : nodes, "id": self.node.id}
 
 
diff --git a/knet.py b/knet.py
new file mode 100644 (file)
index 0000000..8a1536c
--- /dev/null
+++ b/knet.py
@@ -0,0 +1,70 @@
+#
+#  knet.py
+#  UberTracker
+#
+#  Created by andrew loewenstern on Sun Jun 13 2004.
+#  Copyright (c) 2004 __MyCompanyName__. All rights reserved.
+#
+
+from khashmir.khashmir import Khashmir
+from twisted.internet import reactor
+from whrandom import randrange
+import sys, os
+
+class Network:
+    def __init__(self, size=0, startport=5555, localip='127.0.0.1'):
+        self.num = size
+        self.startport = startport
+        self.localip = localip
+
+    def _done(self, val):
+        self.done = 1
+        
+    def setUp(self):
+        self.kfiles()
+        self.l = []
+        for i in range(self.num):
+            self.l.append(Khashmir('', self.startport + i, '/tmp/kh%s.db' % (self.startport + i)))
+        reactor.iterate()
+        reactor.iterate()
+        
+        for i in self.l:
+            i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+            i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+            i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+            reactor.iterate()
+            reactor.iterate()
+            reactor.iterate() 
+            
+        for i in self.l:
+            self.done = 0
+            i.findCloseNodes(self._done)
+            while not self.done:
+                reactor.iterate()
+        for i in self.l:
+            self.done = 0
+            i.findCloseNodes(self._done)
+            while not self.done:
+                reactor.iterate()
+
+    def tearDown(self):
+        for i in self.l:
+            i.listenport.stopListening()
+        self.kfiles()
+        
+    def kfiles(self):
+        for i in range(self.startport, self.startport+self.num):
+            try:
+                os.unlink('/tmp/kh%s.db' % i)
+            except:
+                pass
+            
+        reactor.iterate()
+    
+if __name__ == "__main__":
+    n = Network(int(sys.argv[1]), int(sys.argv[2]), sys.argv[3])
+    n.setUp()
+    try:
+        reactor.run()
+    finally:
+        n.tearDown()
index 70069fae710339686bd978bfca8ecf802afe505a..791e725c4a616cd85b1f98e9af76a97755c7b32c 100644 (file)
--- a/knode.py
+++ b/knode.py
@@ -13,7 +13,7 @@ class IDChecker:
 class KNode(Node):
     def checkSender(self, dict):
         try:
 class KNode(Node):
     def checkSender(self, dict):
         try:
-            senderid = dict['rsp']['sender']['id']
+            senderid = dict['rsp']['id']
         except KeyError:
             print ">>>> No peer id in response"
             raise Exception, "No peer id in response."
         except KeyError:
             print ">>>> No peer id in response"
             raise Exception, "No peer id in response."
@@ -22,20 +22,28 @@ class KNode(Node):
                 print "Got response from different node than expected."
                 raise Exception, "Got response from different node than expected."
         return dict
                 print "Got response from different node than expected."
                 raise Exception, "Got response from different node than expected."
         return dict
+
+    def errBack(self, err):
+        print ">>> ", err
+        return err
         
         
-    def ping(self, sender):
-        df = self.conn.sendRequest('ping', {"sender":sender})
+    def ping(self, id):
+        df = self.conn.sendRequest('ping', {"id":id})
+        df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
         df.addCallback(self.checkSender)
         return df
-    def findNode(self, target, sender):
-        df = self.conn.sendRequest('find_node', {"target" : target, "sender": sender})
+    def findNode(self, target, id):
+        df = self.conn.sendRequest('find_node', {"target" : target, "id": id})
+        df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
         df.addCallback(self.checkSender)
         return df
-    def storeValue(self, key, value, sender):
-        df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "sender": sender})
+    def storeValue(self, key, value, id):
+        df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id})
+        df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
         df.addCallback(self.checkSender)
         return df
-    def findValue(self, key, sender):
-        df =  self.conn.sendRequest('find_value', {"key" : key, "sender" : sender})
+    def findValue(self, key, id):
+        df =  self.conn.sendRequest('find_value', {"key" : key, "id" : id})
+        df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
         df.addCallback(self.checkSender)
         return df
diff --git a/krpc.py b/krpc.py
index 7e64544f4103b26e8b0ed925d6144e5c40a8561b..1cf33d5478fe52df203c53e246521e2f44836ffe 100644 (file)
--- a/krpc.py
+++ b/krpc.py
@@ -10,6 +10,9 @@ from twisted.internet import protocol
 from twisted.internet import reactor
 import time
 
 from twisted.internet import reactor
 import time
 
+import sys
+from traceback import format_exception
+
 import khash as hash
 
 KRPC_TIMEOUT = 60
 import khash as hash
 
 KRPC_TIMEOUT = 60
@@ -89,7 +92,7 @@ class KRPC:
                         ret = apply(f, (), msg[ARG])
                     except Exception, e:
                         ## send error
                         ret = apply(f, (), msg[ARG])
                     except Exception, e:
                         ## send error
-                        out = bencode({TID:msg[TID], TYP:ERR, ERR :`e`})
+                        out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
                         olen = len(out)
                         self.transport.write(out, addr)
                     else:
                         olen = len(out)
                         self.transport.write(out, addr)
                     else:
@@ -121,7 +124,7 @@ class KRPC:
                     del(self.tids[msg[TID]])
                     df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
                 else:
                     del(self.tids[msg[TID]])
                     df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
                 else:
-                    print 'timeout ' + `msg[RSP]['sender']`
+                    print 'timeout ' + `msg[RSP]['id']`
                     # no tid, this transaction timed out already...
             elif msg[TYP] == ERR:
                 # if error
                     # no tid, this transaction timed out already...
             elif msg[TYP] == ERR:
                 # if error