callbacks now return a dict now that has the _krpc_sender connection information
authorburris <burris>
Thu, 30 Jan 2003 04:42:00 +0000 (04:42 +0000)
committerburris <burris>
Thu, 30 Jan 2003 04:42:00 +0000 (04:42 +0000)
fixed connection resets in airhook, resets are propogated to the protocol via resetConnection()

actions.py
airhook.py
const.py
knode.py
krpc.py
test_airhook.py
test_krpc.py

index 7a3a5b937f67004332a0e7b8e2180e2449421d6d..18b9cddf82ae1dd016468eaf6b5ff5479af3067d 100644 (file)
@@ -40,8 +40,11 @@ FIND_NODE_TIMEOUT = 15
 class FindNode(ActionBase):
     """ find node action merits it's own class as it is a long running stateful process """
     def handleGotNodes(self, dict):
+        _krpc_sender = dict['_krpc_sender']
+        dict = dict['rsp']
         l = dict["nodes"]
         sender = dict["sender"]
+        sender['port'] = _krpc_sender[1]        
         sender = Node().initWithDict(sender)
         sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
@@ -109,7 +112,10 @@ GET_VALUE_TIMEOUT = 15
 class GetValue(FindNode):
     """ get value task """
     def handleGotNodes(self, dict):
+        _krpc_sender = dict['_krpc_sender']
+        dict = dict['rsp']
         sender = dict["sender"]
+        sender['port'] = _krpc_sender[1]        
         sender = Node().initWithDict(sender)
         sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
@@ -133,6 +139,7 @@ class GetValue(FindNode):
                     return y
                 else:
                     return None
+            z = len(dict['values'])
             v = filter(None, map(x, dict['values']))
             if(len(v)):
                 reactor.callFromThread(self.callback, v)
@@ -243,4 +250,3 @@ class KeyExpirer:
         s = "delete from kv where time < '%s';" % self.cut
         c.execute(s)
         reactor.callLater(const.KE_DELAY, self.doExpire)
-    
\ No newline at end of file
index 60078bd184dbca881a435d6cd08865b31cbf0fc5..d3ae57a40e8a17f88d8ed17036c52ffe26cf93fc 100644 (file)
@@ -103,20 +103,20 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
         self.lastTransmitSeq = -1 # last sequence we sent a packet
         self.state = pending # one of pending, sent, confirmed
         
-        self.outMsgs = [None] * 256  # outgoing messages  (seq sent, message), index = message number
         self.omsgq = [] # list of messages to go out
         self.imsgq = [] # list of messages coming in
         self.sendSession = None  # send session/observed fields until obSeq > sendSession
         self.response = 0 # if we know we have a response now (like resending missed packets)
         self.noisy = 0
-        self.scheduled = 0 # a sendNext is scheduled, don't schedule another
-        self.resetMessages()
+        self.resetConnection()
     
-    def resetMessages(self):
+    def resetConnection(self):
         self.weMissed = []
+        self.outMsgs = [None] * 256  # outgoing messages  (seq sent, message), index = message number
         self.inMsg = 0   # next incoming message number
         self.outMsgNums = [0] * 256 # outgoing message numbers i = outNum % 256
         self.next = 0  # next outgoing message number
+        self.scheduled = 0 # a sendNext is scheduled, don't schedule another
 
     def datagramReceived(self, datagram):
         if not datagram:
@@ -149,15 +149,17 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
                     self.observed = p.session
                 elif self.observed != p.session:
                     self.state = pending
-                    self.resetMessages()
+                    self.resetConnection()
                     self.inSeq = p.seq
         elif self.state == confirmed:
             if p.session != None or p.observed != None :
                 if (p.session != None and p.session != self.observed) or (p.observed != None and p.observed != self.sessionID):
                     self.state = pending
                     self.observed = p.session
-                    self.resetMessages()
+                    self.resetConnection()
                     self.inSeq = p.seq
+                    if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
+                        self.protocol.resetConnection()
 
         # check to make sure sequence number isn't out of order
         if (p.seq - self.inSeq) % 2**16 >= 256:
@@ -345,10 +347,17 @@ class StreamConnection(AirhookConnection):
     """
     def __init__(self):
         AirhookConnection.__init__(self)
+        self.resetStream()
+        
+    def resetStream(self):
         self.oseq = 0
         self.iseq = 0
         self.q = []
 
+    def resetConnection(self):
+        AirhookConnection.resetConnection(self)
+        self.resetStream()
+        
     def dataCameIn(self):
         # put 'em together
         for msg in self.imsgq:
index bade7c418cb458302cf69dc795f022a238d7d9c0..797b2d0ce84adea331ca512b7a82d57d66c45159 100644 (file)
--- a/const.py
+++ b/const.py
@@ -5,12 +5,13 @@ reactor = SelectReactor(installSignalHandlers=0)
 from twisted.internet import main
 main.installReactor(reactor)
 
+
 try:
     import twisted.names.client
     reactor.installResolver(twisted.names.client.theResolver)
 except IOError:
     print "no resolv.conf!"
-    
+
 # magic id to use before we know a peer's id
 NULL_ID =  20 * '\0'
 
index f952795c6d647634dd156a85487115aa424e83f7..8453942ba0c3ca847a06969375cc3cb330b1b078 100644 (file)
--- a/knode.py
+++ b/knode.py
@@ -10,11 +10,13 @@ class IDChecker:
 class KNode(Node):
     def checkSender(self, dict):
         try:
-            senderid = dict['sender']['id']
+            senderid = dict['rsp']['sender']['id']
         except KeyError:
+            print ">>>> No peer id in response"
             raise Exception, "No peer id in response."
         else:
             if self.id != NULL_ID and senderid != self.id:
+                print "Got response from different node than expected."
                 raise Exception, "Got response from different node than expected."
         return dict
         
diff --git a/krpc.py b/krpc.py
index 40ed862ce69d340006f0162bf2dd4bbf2cfabeb8..b2700e14b9cd74e418ed1c9c5e5518f23b36bd27 100644 (file)
--- a/krpc.py
+++ b/krpc.py
@@ -19,12 +19,18 @@ class KRPC(basic.NetstringReceiver):
     def __init__(self):
         self.tids = {}
 
+    def resetConnection(self):
+        self.brokenPeer = 0
+        self._readerState = basic.LENGTH
+        self._readerLength = 0
+
     def stringReceived(self, str):
         # bdecode
         try:
             msg = bdecode(str)
         except Exception, e:
-            print "response decode error: " + `e`
+            if self.naisy:
+                print "response decode error: " + `e`
             self.d.errback()
         else:
             # look at msg type
@@ -47,12 +53,14 @@ class KRPC(basic.NetstringReceiver):
                             #  make response
                             str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
                         else:
-                            str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : []})
+                            str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}})
                         #      send response
                         olen = len(str)
                         self.sendString(str)
 
                 else:
+                    if self.noisy:
+                        print "don't know about method %s" % msg['req']
                     # unknown method
                     str = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
                     olen = len(str)
@@ -66,9 +74,9 @@ class KRPC(basic.NetstringReceiver):
                 if self.tids.has_key(msg['tid']):
                     df = self.tids[msg['tid']]
                     #  callback
-                    df.callback(msg['rsp'])
                     del(self.tids[msg['tid']])
-                # no tid, perhaps this transaction timed out already...
+                    df.callback({'rsp' : msg['rsp'], '_krpc_sender': self.transport.addr})
+                # no tid, this transaction timed out already...
             elif msg['typ'] == 'err':
                 # if error
                 #      lookup tid
@@ -88,15 +96,15 @@ class KRPC(basic.NetstringReceiver):
         # send it
         msg = {'tid' : hash.newID(), 'typ' : 'req',  'req' : method, 'arg' : args}
         str = bencode(msg)
-        self.sendString(str)
         d = Deferred()
         self.tids[msg['tid']] = d
-        
         def timeOut(tids = self.tids, id = msg['tid']):
             if tids.has_key(id):
                 df = tids[id]
                 del(tids[id])
+                print ">>>>>> KRPC_ERROR_TIMEOUT"
                 df.errback(KRPC_ERROR_TIMEOUT)
         reactor.callLater(KRPC_TIMEOUT, timeOut)
+        self.sendString(str)
         return d
\ No newline at end of file
index d6cfb13b7f7cfd173557cbc98e1c80f51b0beb3f..407cf1fe5f141ae3ee454055c3c7fa2419f7efc8 100644 (file)
@@ -547,6 +547,8 @@ class BasicTests(unittest.TestCase):
         self.assertEqual(a.outMsgNums[(a.outSeq-1) % 256], 254)
 
     def testConnectionReset(self):
+        self.testTwoWayBlast()
+        self.b.protocol.q = []
         a = self.a
         b = self.b
         msg = swap(a, noisy=self.noisy)
@@ -587,6 +589,57 @@ class BasicTests(unittest.TestCase):
         self.assertEqual(len(b.protocol.q), 2)
         self.assertEqual(b.protocol.q[1], "TESTING2")
 
+    def testRecipientReset(self):
+        self.testTwoWayBlast()
+        self.b.protocol.q = []
+        self.noisy = 1
+        a = self.a
+        b = self.b
+        msg = swap(a, noisy=self.noisy)
+        b.datagramReceived(msg)
+
+        msg = swap(b, noisy=self.noisy)
+        a.datagramReceived(msg)
+
+        a.omsgq.append("TESTING")
+        msg = swap(a, noisy=self.noisy)
+        b.datagramReceived(msg)
+
+        msg = swap(b, noisy=self.noisy)
+        a.datagramReceived(msg)
+
+        self.assertEqual(b.protocol.q[0], "TESTING")
+        self.assertEqual(b.state, confirmed)
+        
+        self.b = AirhookConnection()
+        self.b.makeConnection(DummyTransport())
+        self.b.protocol = Receiver()
+        self.b.addr = ('127.0.0.1', 4444)
+        b = self.b
+        
+        msg = swap(a, noisy=self.noisy)
+        b.datagramReceived(msg)
+
+        msg = swap(b, noisy=self.noisy)
+        a.datagramReceived(msg)
+        
+        a.omsgq.append("TESTING2")
+        self.assertEqual(len(b.protocol.q), 0)
+        msg = swap(a, noisy=self.noisy)
+        b.datagramReceived(msg)
+
+        msg = swap(b, noisy=self.noisy)
+        a.datagramReceived(msg)
+
+        msg = swap(a, noisy=self.noisy)
+        b.datagramReceived(msg)
+
+        msg = swap(b, noisy=self.noisy)
+        a.datagramReceived(msg)
+
+        self.assertEqual(len(b.protocol.q), 1)
+        self.assertEqual(b.protocol.q[0], "TESTING2")
+
         
 class StreamTests(unittest.TestCase):
     def setUp(self):
@@ -709,4 +762,4 @@ class EchoReactorStreamBig(unittest.TestCase):
         reactor.iterate()
         self.assertEqual(self.ac.protocol.buf, msg)
 
-        
\ No newline at end of file
+        
index 42b7b161c6b2272a816d04bde83c904b4b4903c9..40c6f8dd4b3c5eca64b8f25e8d41f99ae29ad5a1 100644 (file)
@@ -50,13 +50,36 @@ class SimpleTest(TestCase):
         self.b = listenAirhookStream(4051, self.bf)
         
     def testSimpleMessage(self):
-        self.noisy = 1
+        self.noisy = 0
         self.a.connectionForAddr(('127.0.0.1', 4051)).protocol.sendRequest('store', {'msg' : "This is a test."})
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
         self.assertEqual(self.bf.buf, ["This is a test."])
 
+class BlastTest(TestCase):
+    def setUp(self):
+        self.noisy = 0
+        
+        self.af = Receiver()
+        self.bf = Receiver()        
+        self.a = listenAirhookStream(4060, self.af)
+        self.b = listenAirhookStream(4061, self.bf)
+
+    def testMessageBlast(self):
+        self.a.connectionForAddr(('127.0.0.1', 4061)).protocol.sendRequest('store', {'msg' : "This is a test."})
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.assertEqual(self.bf.buf, ["This is a test."])
+        self.bf.buf = []
+        
+        for i in range(100):
+            self.a.connectionForAddr(('127.0.0.1', 4061)).protocol.sendRequest('store', {'msg' : "This is a test."})
+            reactor.iterate()
+            #self.bf.buf = []
+        self.assertEqual(self.bf.buf, ["This is a test."] * 100)
+
 class EchoTest(TestCase):
     def setUp(self):
         self.noisy = 0
@@ -77,7 +100,9 @@ class EchoTest(TestCase):
         reactor.iterate()
         self.assertEqual(self.msg, "This is a test.")
 
-    def gotMsg(self, msg):
+    def gotMsg(self, dict):
+        _krpc_sender = dict['_krpc_sender']
+        msg = dict['rsp']
         self.msg = msg
 
 class MultiEchoTest(TestCase):
@@ -116,7 +141,50 @@ class MultiEchoTest(TestCase):
         reactor.iterate()
         self.assertEqual(self.msg, "This is yet another test.")
 
-    def gotMsg(self, msg):
+    def gotMsg(self, dict):
+        _krpc_sender = dict['_krpc_sender']
+        msg = dict['rsp']
+        self.msg = msg
+
+class EchoResetTest(TestCase):
+    def setUp(self):
+        self.noisy = 0
+        self.msg = None
+        
+        self.af = Receiver()
+        self.bf = Receiver()        
+        self.a = listenAirhookStream(4078, self.af)
+        self.b = listenAirhookStream(4079, self.bf)
+        
+    def testEchoReset(self):
+        self.noisy = 1
+        df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+        df.addCallback(self.gotMsg)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.assertEqual(self.msg, "This is a test.")
+
+        df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is another test."})
+        df.addCallback(self.gotMsg)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.assertEqual(self.msg, "This is another test.")
+
+        df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is yet another test."})
+        df.addCallback(self.gotMsg)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.assertEqual(self.msg, "This is yet another test.")
+
+    def gotMsg(self, dict):
+        _krpc_sender = dict['_krpc_sender']
+        msg = dict['rsp']
         self.msg = msg
 
 class UnknownMethErrTest(TestCase):
@@ -140,3 +208,4 @@ class UnknownMethErrTest(TestCase):
 
     def gotErr(self, err):
         self.err = err.value
+