]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
*** empty log message ***
authorburris <burris>
Sat, 22 Feb 2003 06:55:43 +0000 (06:55 +0000)
committerburris <burris>
Sat, 22 Feb 2003 06:55:43 +0000 (06:55 +0000)
actions.py
airhook.py
const.py
hash.py
khashmir.py
krpc.py
test.py
test_krpc.py

index 18b9cddf82ae1dd016468eaf6b5ff5479af3067d..f5d3a9fca6d1b7c12ed01ffa85c945791175d3aa 100644 (file)
@@ -88,7 +88,7 @@ class FindNode(ActionBase):
     
     def makeMsgFailed(self, node):
         def defaultGotNodes(err, self=self, node=node):
-            print ">>> find failed"
+            print ">>> find failed %s/%s" % (node.host, node.port)
             self.table.table.nodeFailed(node)
             self.outstanding = self.outstanding - 1
             self.schedule()
@@ -203,7 +203,7 @@ class StoreValue(ActionBase):
                 self.schedule()
             
     def storeFailed(self, t, node):
-        print ">>> store failed"
+        print ">>> store failed %s/%s" % (node.host, node.port)
         self.table.nodeFailed(node)
         self.outstanding -= 1
         if self.finished:
index d3ae57a40e8a17f88d8ed17036c52ffe26cf93fc..dc17dca9f13a230b2814a8dfaecf800e4d7c463a 100644 (file)
@@ -41,6 +41,8 @@ class Airhook(protocol.DatagramProtocol):
         self.connectionForAddr(addr).datagramReceived(datagram)
 
     def connectionForAddr(self, addr):
+        if addr == self.addr:
+            raise Exception
         if not self.connections.has_key(addr):
             conn = self.connection()
             conn.protocol = self.factory.buildProtocol(addr)
@@ -51,10 +53,10 @@ class Airhook(protocol.DatagramProtocol):
         else:
             conn = self.connections[addr]
         return conn
-#    def makeConnection(self, transport):
-#        protocol.DatagramProtocol.makeConnection(self, transport)
-#        tup = transport.getHost()
-#        self.addr = (tup[1], tup[2])
+    def makeConnection(self, transport):
+        protocol.DatagramProtocol.makeConnection(self, transport)
+        tup = transport.getHost()
+        self.addr = (tup[1], tup[2])
         
 class AirhookPacket:
     def __init__(self, msg):
@@ -93,18 +95,14 @@ class AirhookPacket:
 class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConnectedTransport):
     def __init__(self):        
         self.outSeq = 0  # highest sequence we have sent, can't be 255 more than obSeq
-        self.obSeq = 0   # highest sequence confirmed by remote
-        self.inSeq = 0   # last received sequence
         self.observed = None  # their session id
         self.sessionID = long(rand(0, 2**32))  # our session id
         
         self.lastTransmit = 0  # time we last sent a packet with messages
-        self.lastReceieved = 0 # time we last received a packet with messages
+        self.lastReceived = 0 # time we last received a packet with messages
         self.lastTransmitSeq = -1 # last sequence we sent a packet
         self.state = pending # one of pending, sent, confirmed
         
-        self.omsgq = [] # list of messages to go out
-        self.imsgq = [] # list of messages coming in
         self.sendSession = None  # send session/observed fields until obSeq > sendSession
         self.response = 0 # if we know we have a response now (like resending missed packets)
         self.noisy = 0
@@ -117,6 +115,10 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
         self.outMsgNums = [0] * 256 # outgoing message numbers i = outNum % 256
         self.next = 0  # next outgoing message number
         self.scheduled = 0 # a sendNext is scheduled, don't schedule another
+        self.omsgq = [] # list of messages to go out
+        self.imsgq = [] # list of messages coming in
+        self.obSeq = 0   # highest sequence confirmed by remote
+        self.inSeq = 0   # last received sequence
 
     def datagramReceived(self, datagram):
         if not datagram:
@@ -132,12 +134,15 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
                 if p.observed == self.sessionID:
                     self.observed = p.session
                     self.state = confirmed
+                    self.response = 1
                 else:
-                    # bogus packet!
-                    return
+                    self.observed = p.session
+                    self.response = 1
             elif p.session != None:
                 self.observed = p.session
                 self.response = 1
+            else:
+                self.response = 1
         elif self.state == sent:
             if p.observed != None and p.session != None:
                 if p.observed == self.sessionID:
@@ -149,8 +154,18 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
                     self.observed = p.session
                 elif self.observed != p.session:
                     self.state = pending
+                    self.observed = p.session
                     self.resetConnection()
+                    self.response = 1
+                    if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
+                        self.protocol.resetConnection()
                     self.inSeq = p.seq
+                    self.schedule()
+                    return
+            elif p.session == None and p.observed == None:
+                self.response = 1
+                self.schedule()
+    
         elif self.state == confirmed:
             if p.session != None or p.observed != None :
                 if (p.session != None and p.session != self.observed) or (p.observed != None and p.observed != self.sessionID):
@@ -160,7 +175,8 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
                     self.inSeq = p.seq
                     if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
                         self.protocol.resetConnection()
-
+                    self.schedule()
+                    return
         # check to make sure sequence number isn't out of order
         if (p.seq - self.inSeq) % 2**16 >= 256:
             return
@@ -357,7 +373,10 @@ class StreamConnection(AirhookConnection):
     def resetConnection(self):
         AirhookConnection.resetConnection(self)
         self.resetStream()
-        
+
+    def loseConnection(self):
+        pass
+    
     def dataCameIn(self):
         # put 'em together
         for msg in self.imsgq:
index 797b2d0ce84adea331ca512b7a82d57d66c45159..1faab28925f49f2c9a4186e0cca7d32136a28e9b 100644 (file)
--- a/const.py
+++ b/const.py
@@ -16,7 +16,7 @@ except IOError:
 NULL_ID =  20 * '\0'
 
 # Kademlia "K" constant, this should be an even number
-K = 20
+K = 8
 
 # SHA1 is 160 bits long
 HASH_LENGTH = 160
diff --git a/hash.py b/hash.py
index 2a312461155d21137407223553251f1b59e04162..6ac97cb66048b04ba1d47c45eceb336392c35e15 100644 (file)
--- a/hash.py
+++ b/hash.py
@@ -3,7 +3,6 @@
 from sha import sha
 import whrandom
 
-random = open('/dev/urandom', 'r') # sucks for windoze
 
 def intify(hstr):
     """20 bit hash, big-endian -> long python integer"""
@@ -28,7 +27,8 @@ def distance(a, b):
 def newID():
     """returns a new pseudorandom globally unique ID string"""
     h = sha()
-    h.update(random.read(20))
+    for i in range(20):
+        h.update(chr(whrandom.randint(0,255)))
     return h.digest()
 
 def newIDInRange(min, max):
index a738a97fad8048e2296aeee876a9d8a20bddfe24..b98785677d2b718d1bd229317d12bdd1cc0011d7 100644 (file)
@@ -353,11 +353,11 @@ from sha import sha
 from hash import newID
 
 
-def test_net(peers=24, startport=2001, dbprefix='/tmp/test'):
+def test_net(host='127.0.0.1', peers=24, startport=2001, dbprefix='/tmp/test'):
     import thread
     l = []
     for i in xrange(peers):
-        a = Khashmir('127.0.0.1', startport + i, db = dbprefix+`i`)
+        a = Khashmir(host, startport + i, db = dbprefix+`i`)
         l.append(a)
     thread.start_new_thread(l[0].app.run, ())
     for peer in l[1:]:
@@ -479,17 +479,15 @@ def test_find_value(l, quiet=0):
             self.found = 0
             self.port = port
         def callback(self, values):
-            try:
                 if(len(values) == 0):
                     if not self.found:
-                        print "find   %s             NOT FOUND" % self.port
+                        print "find   %s           NOT FOUND" % self.port
                     else:
                         print "find   %s           FOUND" % self.port
+                    self.flag.set()
                 else:
                     if self.val in values:
                         self.found = 1
-            finally:
-                self.flag.set()
     
     b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0)
     fa.wait()
diff --git a/krpc.py b/krpc.py
index b2700e14b9cd74e418ed1c9c5e5518f23b36bd27..12fb4d399dfa281a8e12b40aa1e4da756c1d71b3 100644 (file)
--- a/krpc.py
+++ b/krpc.py
@@ -7,7 +7,7 @@ import time
 
 import hash
 
-KRPC_TIMEOUT = 30
+KRPC_TIMEOUT = 60
 
 KRPC_ERROR = 1
 KRPC_ERROR_METHOD_UNKNOWN = 2
@@ -19,6 +19,12 @@ class KRPC(basic.NetstringReceiver):
     def __init__(self):
         self.tids = {}
 
+
+    def dataRecieved(self, data):
+        basic.NetstringReceiver(self, data)
+        if self.brokenPeer:
+            self.resetConnection()
+            
     def resetConnection(self):
         self.brokenPeer = 0
         self._readerState = basic.LENGTH
@@ -45,26 +51,26 @@ class KRPC(basic.NetstringReceiver):
                         ret = apply(f, (), msg['arg'])
                     except Exception, e:
                         ## send error
-                        str = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`})
-                        olen = len(str)
-                        self.sendString(str)
+                        out = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`})
+                        olen = len(out)
+                        self.sendString(out)
                     else:
                         if ret:
                             #  make response
-                            str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
+                            out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
                         else:
-                            str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}})
+                            out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}})
                         #      send response
-                        olen = len(str)
-                        self.sendString(str)
+                        olen = len(out)
+                        self.sendString(out)
 
                 else:
                     if self.noisy:
                         print "don't know about method %s" % msg['req']
                     # unknown method
-                    str = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
-                    olen = len(str)
-                    self.sendString(str)
+                    out = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
+                    olen = len(out)
+                    self.sendString(out)
                 if self.noisy:
                     print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.transport.addr, self.factory.node.port, 
                                                     ilen, msg['req'], olen)
@@ -76,7 +82,9 @@ class KRPC(basic.NetstringReceiver):
                     #  callback
                     del(self.tids[msg['tid']])
                     df.callback({'rsp' : msg['rsp'], '_krpc_sender': self.transport.addr})
-                # no tid, this transaction timed out already...
+                else:
+                    print 'timeout ' + `msg['rsp']['sender']`
+                    # no tid, this transaction timed out already...
             elif msg['typ'] == 'err':
                 # if error
                 #      lookup tid
@@ -85,6 +93,7 @@ class KRPC(basic.NetstringReceiver):
                 df.errback(msg['err'])
                 del(self.tids[msg['tid']])
             else:
+                print "unknown message type " + `msg`
                 # unknown message type
                 df = self.tids[msg['tid']]
                 #      callback
diff --git a/test.py b/test.py
index 0dd3f419b2037ea9df867a5da253b32cabe53fac..4cb499e2ec39a89b576e158743bbcbc3af86e11c 100644 (file)
--- a/test.py
+++ b/test.py
@@ -5,6 +5,6 @@ import hash, node, knode
 import actions
 import btemplate
 import test_airhook
-
-tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'knode', 'actions',  'ktable', 'test_airhook'])
+import test_krpc
+tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'knode', 'actions',  'ktable', 'test_airhook', 'test_krpc'])
 result = unittest.TextTestRunner().run(tests)
index 40c6f8dd4b3c5eca64b8f25e8d41f99ae29ad5a1..147b8bb75a64ab69892b20fb7f7f7969131e597d 100644 (file)
@@ -91,7 +91,6 @@ class EchoTest(TestCase):
         self.b = listenAirhookStream(4043, self.bf)
         
     def testEcho(self):
-        self.noisy = 1
         df = self.a.connectionForAddr(('127.0.0.1', 4043)).protocol.sendRequest('echo', {'msg' : "This is a test."})
         df.addCallback(self.gotMsg)
         reactor.iterate()
@@ -105,6 +104,39 @@ class EchoTest(TestCase):
         msg = dict['rsp']
         self.msg = msg
 
+class ManyEchoTest(TestCase):
+    def setUp(self):
+        self.noisy = 0
+        self.msg = None
+        
+        self.af = Receiver()
+        self.bf = Receiver()        
+        self.a = listenAirhookStream(4588, self.af)
+        self.b = listenAirhookStream(4589, self.bf)
+        
+    def testManyEcho(self):
+        df = self.a.connectionForAddr(('127.0.0.1', 4589)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+        df.addCallback(self.gotMsg)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.assertEqual(self.msg, "This is a test.")
+        for i in xrange(100):
+            self.msg = None
+            df = self.a.connectionForAddr(('127.0.0.1', 4589)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+            df.addCallback(self.gotMsg)
+            reactor.iterate()
+            reactor.iterate()
+            reactor.iterate()
+            reactor.iterate()
+            self.assertEqual(self.msg, "This is a test.")
+            
+    def gotMsg(self, dict):
+        _krpc_sender = dict['_krpc_sender']
+        msg = dict['rsp']
+        self.msg = msg
+
 class MultiEchoTest(TestCase):
     def setUp(self):
         self.noisy = 0
@@ -174,6 +206,7 @@ class EchoResetTest(TestCase):
         reactor.iterate()
         self.assertEqual(self.msg, "This is another test.")
 
+        del(self.a.connections[('127.0.0.1', 4079)])
         df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is yet another test."})
         df.addCallback(self.gotMsg)
         reactor.iterate()
@@ -182,6 +215,9 @@ class EchoResetTest(TestCase):
         reactor.iterate()
         self.assertEqual(self.msg, "This is yet another test.")
 
+    def testLotsofEchoReset(self):
+        for i in range(100):
+            self.testEchoReset()
     def gotMsg(self, dict):
         _krpc_sender = dict['_krpc_sender']
         msg = dict['rsp']