self.assertEqual(a.outMsgNums[(a.outSeq-1) % 256], 254)
def testConnectionReset(self):
+ self.testTwoWayBlast()
+ self.b.protocol.q = []
a = self.a
b = self.b
msg = swap(a, noisy=self.noisy)
self.assertEqual(len(b.protocol.q), 2)
self.assertEqual(b.protocol.q[1], "TESTING2")
+ def testRecipientReset(self):
+ self.testTwoWayBlast()
+ self.b.protocol.q = []
+ self.noisy = 1
+ a = self.a
+ b = self.b
+ msg = swap(a, noisy=self.noisy)
+ b.datagramReceived(msg)
+
+ msg = swap(b, noisy=self.noisy)
+ a.datagramReceived(msg)
+
+ a.omsgq.append("TESTING")
+ msg = swap(a, noisy=self.noisy)
+ b.datagramReceived(msg)
+
+ msg = swap(b, noisy=self.noisy)
+ a.datagramReceived(msg)
+
+ self.assertEqual(b.protocol.q[0], "TESTING")
+ self.assertEqual(b.state, confirmed)
+
+ self.b = AirhookConnection()
+ self.b.makeConnection(DummyTransport())
+ self.b.protocol = Receiver()
+ self.b.addr = ('127.0.0.1', 4444)
+ b = self.b
+
+ msg = swap(a, noisy=self.noisy)
+ b.datagramReceived(msg)
+
+ msg = swap(b, noisy=self.noisy)
+ a.datagramReceived(msg)
+
+ a.omsgq.append("TESTING2")
+ self.assertEqual(len(b.protocol.q), 0)
+ msg = swap(a, noisy=self.noisy)
+ b.datagramReceived(msg)
+
+ msg = swap(b, noisy=self.noisy)
+ a.datagramReceived(msg)
+
+ msg = swap(a, noisy=self.noisy)
+ b.datagramReceived(msg)
+
+ msg = swap(b, noisy=self.noisy)
+ a.datagramReceived(msg)
+
+ self.assertEqual(len(b.protocol.q), 1)
+ self.assertEqual(b.protocol.q[0], "TESTING2")
+
class StreamTests(unittest.TestCase):
def setUp(self):
reactor.iterate()
self.assertEqual(self.ac.protocol.buf, msg)
-
\ No newline at end of file
+