From 4310c7b8d0f5ba7795b3b4ed6f918329f9b710c0 Mon Sep 17 00:00:00 2001 From: burris Date: Thu, 30 Jan 2003 04:42:00 +0000 Subject: [PATCH] callbacks now return a dict now that has the _krpc_sender connection information fixed connection resets in airhook, resets are propogated to the protocol via resetConnection() --- actions.py | 8 +++++- airhook.py | 21 ++++++++++---- const.py | 3 +- knode.py | 4 ++- krpc.py | 22 ++++++++++----- test_airhook.py | 55 +++++++++++++++++++++++++++++++++++- test_krpc.py | 75 +++++++++++++++++++++++++++++++++++++++++++++++-- 7 files changed, 168 insertions(+), 20 deletions(-) diff --git a/actions.py b/actions.py index 7a3a5b9..18b9cdd 100644 --- a/actions.py +++ b/actions.py @@ -40,8 +40,11 @@ FIND_NODE_TIMEOUT = 15 class FindNode(ActionBase): """ find node action merits it's own class as it is a long running stateful process """ def handleGotNodes(self, dict): + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] l = dict["nodes"] sender = dict["sender"] + sender['port'] = _krpc_sender[1] sender = Node().initWithDict(sender) sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port)) self.table.table.insertNode(sender) @@ -109,7 +112,10 @@ GET_VALUE_TIMEOUT = 15 class GetValue(FindNode): """ get value task """ def handleGotNodes(self, dict): + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] sender = dict["sender"] + sender['port'] = _krpc_sender[1] sender = Node().initWithDict(sender) sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port)) self.table.table.insertNode(sender) @@ -133,6 +139,7 @@ class GetValue(FindNode): return y else: return None + z = len(dict['values']) v = filter(None, map(x, dict['values'])) if(len(v)): reactor.callFromThread(self.callback, v) @@ -243,4 +250,3 @@ class KeyExpirer: s = "delete from kv where time < '%s';" % self.cut c.execute(s) reactor.callLater(const.KE_DELAY, self.doExpire) - \ No newline at end of file diff --git a/airhook.py b/airhook.py index 60078bd..d3ae57a 100644 --- a/airhook.py +++ b/airhook.py @@ -103,20 +103,20 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne self.lastTransmitSeq = -1 # last sequence we sent a packet self.state = pending # one of pending, sent, confirmed - self.outMsgs = [None] * 256 # outgoing messages (seq sent, message), index = message number self.omsgq = [] # list of messages to go out self.imsgq = [] # list of messages coming in self.sendSession = None # send session/observed fields until obSeq > sendSession self.response = 0 # if we know we have a response now (like resending missed packets) self.noisy = 0 - self.scheduled = 0 # a sendNext is scheduled, don't schedule another - self.resetMessages() + self.resetConnection() - def resetMessages(self): + def resetConnection(self): self.weMissed = [] + self.outMsgs = [None] * 256 # outgoing messages (seq sent, message), index = message number self.inMsg = 0 # next incoming message number self.outMsgNums = [0] * 256 # outgoing message numbers i = outNum % 256 self.next = 0 # next outgoing message number + self.scheduled = 0 # a sendNext is scheduled, don't schedule another def datagramReceived(self, datagram): if not datagram: @@ -149,15 +149,17 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne self.observed = p.session elif self.observed != p.session: self.state = pending - self.resetMessages() + self.resetConnection() self.inSeq = p.seq elif self.state == confirmed: if p.session != None or p.observed != None : if (p.session != None and p.session != self.observed) or (p.observed != None and p.observed != self.sessionID): self.state = pending self.observed = p.session - self.resetMessages() + self.resetConnection() self.inSeq = p.seq + if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection): + self.protocol.resetConnection() # check to make sure sequence number isn't out of order if (p.seq - self.inSeq) % 2**16 >= 256: @@ -345,10 +347,17 @@ class StreamConnection(AirhookConnection): """ def __init__(self): AirhookConnection.__init__(self) + self.resetStream() + + def resetStream(self): self.oseq = 0 self.iseq = 0 self.q = [] + def resetConnection(self): + AirhookConnection.resetConnection(self) + self.resetStream() + def dataCameIn(self): # put 'em together for msg in self.imsgq: diff --git a/const.py b/const.py index bade7c4..797b2d0 100644 --- a/const.py +++ b/const.py @@ -5,12 +5,13 @@ reactor = SelectReactor(installSignalHandlers=0) from twisted.internet import main main.installReactor(reactor) + try: import twisted.names.client reactor.installResolver(twisted.names.client.theResolver) except IOError: print "no resolv.conf!" - + # magic id to use before we know a peer's id NULL_ID = 20 * '\0' diff --git a/knode.py b/knode.py index f952795..8453942 100644 --- a/knode.py +++ b/knode.py @@ -10,11 +10,13 @@ class IDChecker: class KNode(Node): def checkSender(self, dict): try: - senderid = dict['sender']['id'] + senderid = dict['rsp']['sender']['id'] except KeyError: + print ">>>> No peer id in response" raise Exception, "No peer id in response." else: if self.id != NULL_ID and senderid != self.id: + print "Got response from different node than expected." raise Exception, "Got response from different node than expected." return dict diff --git a/krpc.py b/krpc.py index 40ed862..b2700e1 100644 --- a/krpc.py +++ b/krpc.py @@ -19,12 +19,18 @@ class KRPC(basic.NetstringReceiver): def __init__(self): self.tids = {} + def resetConnection(self): + self.brokenPeer = 0 + self._readerState = basic.LENGTH + self._readerLength = 0 + def stringReceived(self, str): # bdecode try: msg = bdecode(str) except Exception, e: - print "response decode error: " + `e` + if self.naisy: + print "response decode error: " + `e` self.d.errback() else: # look at msg type @@ -47,12 +53,14 @@ class KRPC(basic.NetstringReceiver): # make response str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret}) else: - str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : []}) + str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}}) # send response olen = len(str) self.sendString(str) else: + if self.noisy: + print "don't know about method %s" % msg['req'] # unknown method str = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN}) olen = len(str) @@ -66,9 +74,9 @@ class KRPC(basic.NetstringReceiver): if self.tids.has_key(msg['tid']): df = self.tids[msg['tid']] # callback - df.callback(msg['rsp']) del(self.tids[msg['tid']]) - # no tid, perhaps this transaction timed out already... + df.callback({'rsp' : msg['rsp'], '_krpc_sender': self.transport.addr}) + # no tid, this transaction timed out already... elif msg['typ'] == 'err': # if error # lookup tid @@ -88,15 +96,15 @@ class KRPC(basic.NetstringReceiver): # send it msg = {'tid' : hash.newID(), 'typ' : 'req', 'req' : method, 'arg' : args} str = bencode(msg) - self.sendString(str) d = Deferred() self.tids[msg['tid']] = d - def timeOut(tids = self.tids, id = msg['tid']): if tids.has_key(id): df = tids[id] del(tids[id]) + print ">>>>>> KRPC_ERROR_TIMEOUT" df.errback(KRPC_ERROR_TIMEOUT) reactor.callLater(KRPC_TIMEOUT, timeOut) + self.sendString(str) return d - \ No newline at end of file + diff --git a/test_airhook.py b/test_airhook.py index d6cfb13..407cf1f 100644 --- a/test_airhook.py +++ b/test_airhook.py @@ -547,6 +547,8 @@ class BasicTests(unittest.TestCase): self.assertEqual(a.outMsgNums[(a.outSeq-1) % 256], 254) def testConnectionReset(self): + self.testTwoWayBlast() + self.b.protocol.q = [] a = self.a b = self.b msg = swap(a, noisy=self.noisy) @@ -587,6 +589,57 @@ class BasicTests(unittest.TestCase): self.assertEqual(len(b.protocol.q), 2) self.assertEqual(b.protocol.q[1], "TESTING2") + def testRecipientReset(self): + self.testTwoWayBlast() + self.b.protocol.q = [] + self.noisy = 1 + a = self.a + b = self.b + msg = swap(a, noisy=self.noisy) + b.datagramReceived(msg) + + msg = swap(b, noisy=self.noisy) + a.datagramReceived(msg) + + a.omsgq.append("TESTING") + msg = swap(a, noisy=self.noisy) + b.datagramReceived(msg) + + msg = swap(b, noisy=self.noisy) + a.datagramReceived(msg) + + self.assertEqual(b.protocol.q[0], "TESTING") + self.assertEqual(b.state, confirmed) + + self.b = AirhookConnection() + self.b.makeConnection(DummyTransport()) + self.b.protocol = Receiver() + self.b.addr = ('127.0.0.1', 4444) + b = self.b + + msg = swap(a, noisy=self.noisy) + b.datagramReceived(msg) + + msg = swap(b, noisy=self.noisy) + a.datagramReceived(msg) + + a.omsgq.append("TESTING2") + self.assertEqual(len(b.protocol.q), 0) + msg = swap(a, noisy=self.noisy) + b.datagramReceived(msg) + + msg = swap(b, noisy=self.noisy) + a.datagramReceived(msg) + + msg = swap(a, noisy=self.noisy) + b.datagramReceived(msg) + + msg = swap(b, noisy=self.noisy) + a.datagramReceived(msg) + + self.assertEqual(len(b.protocol.q), 1) + self.assertEqual(b.protocol.q[0], "TESTING2") + class StreamTests(unittest.TestCase): def setUp(self): @@ -709,4 +762,4 @@ class EchoReactorStreamBig(unittest.TestCase): reactor.iterate() self.assertEqual(self.ac.protocol.buf, msg) - \ No newline at end of file + diff --git a/test_krpc.py b/test_krpc.py index 42b7b16..40c6f8d 100644 --- a/test_krpc.py +++ b/test_krpc.py @@ -50,13 +50,36 @@ class SimpleTest(TestCase): self.b = listenAirhookStream(4051, self.bf) def testSimpleMessage(self): - self.noisy = 1 + self.noisy = 0 self.a.connectionForAddr(('127.0.0.1', 4051)).protocol.sendRequest('store', {'msg' : "This is a test."}) reactor.iterate() reactor.iterate() reactor.iterate() self.assertEqual(self.bf.buf, ["This is a test."]) +class BlastTest(TestCase): + def setUp(self): + self.noisy = 0 + + self.af = Receiver() + self.bf = Receiver() + self.a = listenAirhookStream(4060, self.af) + self.b = listenAirhookStream(4061, self.bf) + + def testMessageBlast(self): + self.a.connectionForAddr(('127.0.0.1', 4061)).protocol.sendRequest('store', {'msg' : "This is a test."}) + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.bf.buf, ["This is a test."]) + self.bf.buf = [] + + for i in range(100): + self.a.connectionForAddr(('127.0.0.1', 4061)).protocol.sendRequest('store', {'msg' : "This is a test."}) + reactor.iterate() + #self.bf.buf = [] + self.assertEqual(self.bf.buf, ["This is a test."] * 100) + class EchoTest(TestCase): def setUp(self): self.noisy = 0 @@ -77,7 +100,9 @@ class EchoTest(TestCase): reactor.iterate() self.assertEqual(self.msg, "This is a test.") - def gotMsg(self, msg): + def gotMsg(self, dict): + _krpc_sender = dict['_krpc_sender'] + msg = dict['rsp'] self.msg = msg class MultiEchoTest(TestCase): @@ -116,7 +141,50 @@ class MultiEchoTest(TestCase): reactor.iterate() self.assertEqual(self.msg, "This is yet another test.") - def gotMsg(self, msg): + def gotMsg(self, dict): + _krpc_sender = dict['_krpc_sender'] + msg = dict['rsp'] + self.msg = msg + +class EchoResetTest(TestCase): + def setUp(self): + self.noisy = 0 + self.msg = None + + self.af = Receiver() + self.bf = Receiver() + self.a = listenAirhookStream(4078, self.af) + self.b = listenAirhookStream(4079, self.bf) + + def testEchoReset(self): + self.noisy = 1 + df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is a test.") + + df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is another test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is another test.") + + df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is yet another test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is yet another test.") + + def gotMsg(self, dict): + _krpc_sender = dict['_krpc_sender'] + msg = dict['rsp'] self.msg = msg class UnknownMethErrTest(TestCase): @@ -140,3 +208,4 @@ class UnknownMethErrTest(TestCase): def gotErr(self, err): self.err = err.value + -- 2.39.5