]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - test_airhook.py
airhook reliable datagram protocol
[quix0rs-apt-p2p.git] / test_airhook.py
diff --git a/test_airhook.py b/test_airhook.py
new file mode 100644 (file)
index 0000000..b00681e
--- /dev/null
@@ -0,0 +1,433 @@
+import unittest
+from airhook import *
+from random import uniform as rand
+
+if __name__ =="__main__":
+       tests = unittest.defaultTestLoader.loadTestsFromNames(['test_airhook'])
+       result = unittest.TextTestRunner().run(tests)
+
+
+
+def test_createStartPacket():
+       flags = 0 | FLAG_AIRHOOK | FLAG_SESSION 
+       packet = chr(flags) + "\xff" + "\x00\x00" + pack("!L", long(rand(0, 2**32)))
+       return packet
+
+def test_createReply(session, observed, obseq, seq):
+       flags = 0 | FLAG_AIRHOOK | FLAG_SESSION | FLAG_OBSERVED
+       packet = chr(flags) + pack("!H", seq)[1] + pack("!H", obseq + 1) + pack("!L", session) + pack("!L", observed)
+       return packet
+
+
+def pscope(msg, noisy=0):
+       # packet scope
+       str = ""
+       p = AirhookPacket(msg)
+       str += "oseq: %s  seq: %s " %  (p.oseq, p.seq)
+       if noisy:
+               str += "packet: %s  \n" % (`p.datagram`)
+       flags = p.flags
+       str += "flags: "
+       if flags & FLAG_SESSION:
+               str += "FLAG_SESSION "
+       if flags & FLAG_OBSERVED:
+               str += "FLAG_OBSERVED "
+       if flags & FLAG_MISSED:
+               str += "FLAG_MISSED "
+       if flags & FLAG_NEXT:
+               str += "FLAG_NEXT "
+       str += "\n"
+       
+       if p.observed != None:
+               str += "OBSERVED: %s\n" % p.observed
+       if p.session != None:
+               str += "SESSION: %s\n" % p.session
+       if p.next != None:
+               str += "NEXT: %s\n" % p.next
+       if p.missed:
+               if noisy:
+                       str += "MISSED: " + `p.missed`
+               else:
+                       str += "MISSED: " + `len(p.missed)`
+               str += "\n"
+       if p.msgs:
+               if noisy:
+                       str += "MSGS: " + `p.msgs` + "\n"
+               else:
+                       str += "MSGS: <%s> " % len(p.msgs)
+               str += "\n"
+       return str
+                       
+# testing function
+def swap(a, dir="", noisy=0):
+       msg = ""
+       while not msg:
+               a.transport.seek(0)
+               msg= a.transport.read()
+               a.transport = StringIO()
+               if not msg:
+                       a.sendNext()
+       if noisy:
+                               print 6*dir + " " + pscope(msg)
+       return msg
+       
+class SimpleTest(unittest.TestCase):
+       def setUp(self):
+               self.noisy = 0
+               self.a = AirhookConnection(StringIO(), (None, 'localhost', 4040))
+               self.b = AirhookConnection(StringIO(), (None, 'localhost', 4040))
+       def testReallySimple(self):
+               # connect to eachother and send a few packets, observe sequence incrementing
+               self.noisy = 0
+               a = self.a
+               b = self.b
+               self.assertEqual(a.state, pending)
+               self.assertEqual(b.state, pending)
+               self.assertEqual(a.outSeq, 0)
+               self.assertEqual(b.outSeq, 0)
+               self.assertEqual(a.obSeq, -1)
+               self.assertEqual(b.obSeq, -1)
+
+               msg = swap(a, '>', self.noisy)          
+               self.assertEqual(a.state, sent)
+               self.assertEqual(a.outSeq, 1)
+               self.assertEqual(a.obSeq, -1)
+
+               b.datagramReceived(msg)
+               self.assertEqual(b.state, sent)
+               self.assertEqual(b.inSeq, 0)
+               self.assertEqual(b.obSeq, -1)
+               msg = swap(b, '<', self.noisy)          
+               self.assertEqual(b.outSeq, 1)
+
+               a.datagramReceived(msg)
+               self.assertEqual(a.state, confirmed)
+               self.assertEqual(a.obSeq, 0)
+               self.assertEqual(a.inSeq, 0)
+               msg = swap(a, '>', self.noisy)          
+               self.assertEqual(a.outSeq, 2)
+
+               b.datagramReceived(msg)
+               self.assertEqual(b.state, confirmed)
+               self.assertEqual(b.obSeq, 0)
+               self.assertEqual(b.inSeq, 1)
+               msg = swap(b, '<', self.noisy)          
+               self.assertEqual(b.outSeq, 2)
+
+               a.datagramReceived(msg)
+               self.assertEqual(a.outSeq, 2)
+               self.assertEqual(a.inSeq, 1)
+               self.assertEqual(a.obSeq, 1)
+
+class BasicTests(unittest.TestCase):
+       def setUp(self):
+               self.a = AirhookConnection(StringIO(), (None, 'localhost', 4040))
+               self.b = AirhookConnection(StringIO(), (None, 'localhost', 4040))
+               self.noisy = 0
+       def testSimple(self):
+               a = self.a
+               b = self.b
+               
+               TESTMSG = "Howdy, Y'All!"
+               a.omsgq.append(TESTMSG)
+               a.sendNext()
+               msg = swap(a, '>', self.noisy)
+               
+               b.datagramReceived(msg)
+               msg = swap(b, '<', self.noisy)
+               a.datagramReceived(msg)
+               msg = swap(a, '>', self.noisy)
+               b.datagramReceived(msg)
+               
+               self.assertEqual(b.inMsg, 1)
+               self.assertEqual(len(b.imsgq), 1)
+               self.assertEqual(b.imsgq[0], TESTMSG)
+               
+               msg = swap(b, '<', self.noisy)
+               
+               a.datagramReceived(msg)
+               msg = swap(a, '>', self.noisy)
+               b.datagramReceived(msg)
+               
+       def testLostFirst(self):
+               a = self.a
+               b = self.b
+               
+               TESTMSG = "Howdy, Y'All!"
+               TESTMSG2 = "Yee Haw"
+               
+               a.omsgq.append(TESTMSG)
+               msg = swap(a, '>', self.noisy)
+               b.datagramReceived(msg)
+               msg = swap(b, '<', self.noisy)
+               self.assertEqual(b.state, sent)
+               a.datagramReceived(msg)
+               msg = swap(a, '>', self.noisy)
+
+               del(msg) # dropping first message
+               
+               a.omsgq.append(TESTMSG2)
+               msg = swap(a, '>', self.noisy)
+       
+               b.datagramReceived(msg)
+               self.assertEqual(b.state, confirmed)
+               self.assertEqual(len(b.imsgq), 1)
+               self.assertEqual(b.imsgq[0], TESTMSG2)
+               self.assertEqual(b.weMissed, [(1, 0)])
+               msg = swap(b, '<', self.noisy)
+               
+               a.datagramReceived(msg)
+                                                               
+               msg = swap(a, '>', self.noisy)
+               
+               b.datagramReceived(msg)
+               self.assertEqual(len(b.imsgq), 2)
+               b.imsgq.sort()
+               l = [TESTMSG2, TESTMSG]
+               l.sort()
+               self.assertEqual(b.imsgq,l)
+               
+               msg = swap(b, '<', self.noisy)
+               
+               a.datagramReceived(msg)
+               msg = swap(a, '>', self.noisy)
+               b.datagramReceived(msg)
+               
+               msg = swap(b, '<', self.noisy)
+               a.datagramReceived(msg)
+               msg = swap(a, '>', self.noisy)
+               b.datagramReceived(msg)
+
+               msg = swap(b, '<', self.noisy)
+               a.datagramReceived(msg)
+               msg = swap(a, '>', self.noisy)
+
+               self.assertEqual(len(b.imsgq), 2)
+               b.imsgq.sort()
+               l = [TESTMSG2, TESTMSG]
+               l.sort()
+               self.assertEqual(b.imsgq,l)
+
+       def testLostSecond(self):
+               a = self.a
+               b = self.b
+               
+               TESTMSG = "Howdy, Y'All!"
+               TESTMSG2 = "Yee Haw"
+               
+               a.omsgq.append(TESTMSG)
+               msg = swap(a, '>', self.noisy)
+               b.datagramReceived(msg)
+               msg = swap(b, '<', self.noisy)
+               self.assertEqual(b.state, sent)
+               a.datagramReceived(msg)
+               msg = swap(a, '>', self.noisy)
+
+               a.omsgq.append(TESTMSG2)
+               msg2 = swap(a, '>', self.noisy)
+               del(msg2) # dropping second message
+
+               assert(a.outMsgs[1] != None)
+
+               b.datagramReceived(msg)
+               self.assertEqual(b.state, confirmed)
+               self.assertEqual(len(b.imsgq), 1)
+               self.assertEqual(b.imsgq[0], TESTMSG)
+               self.assertEqual(b.inMsg, 1)
+               self.assertEqual(b.weMissed, [])
+               msg = swap(b, '<', self.noisy)
+               
+               a.datagramReceived(msg)
+               assert(a.outMsgs[1] != None)
+               msg = swap(a, '>', self.noisy)
+
+               b.datagramReceived(msg)
+               self.assertEqual(b.state, confirmed)
+               self.assertEqual(len(b.imsgq), 1)
+               self.assertEqual(b.imsgq[0], TESTMSG)
+               self.assertEqual(b.weMissed, [(2, 1)])
+               msg = swap(b, '<', self.noisy)
+
+               a.datagramReceived(msg)
+               msg = swap(a, '>', self.noisy)
+
+               b.datagramReceived(msg)
+               self.assertEqual(len(b.imsgq), 2)
+               b.imsgq.sort()
+               l = [TESTMSG2, TESTMSG]
+               l.sort()
+               self.assertEqual(b.imsgq,l)
+               
+               msg = swap(b, '<', self.noisy)
+
+               a.datagramReceived(msg)
+               msg = swap(a, '>', self.noisy)
+
+               b.datagramReceived(msg)
+               
+               msg = swap(b, '<', self.noisy)
+
+               a.datagramReceived(msg)
+               msg = swap(a, '>', self.noisy)
+
+               b.datagramReceived(msg)
+
+               msg = swap(b, '<', self.noisy)
+
+               a.datagramReceived(msg)
+
+
+               msg = swap(a, '>', self.noisy)
+
+               self.assertEqual(len(b.imsgq), 2)
+               b.imsgq.sort()
+               l = [TESTMSG2, TESTMSG]
+               l.sort()
+               self.assertEqual(b.imsgq,l)
+
+       def testDoubleDouble(self):
+               a = self.a
+               b = self.b
+               
+               TESTMSGA = "Howdy, Y'All!"
+               TESTMSGB = "Yee Haw"
+               TESTMSGC = "FOO BAR"
+               TESTMSGD = "WING WANG"
+               
+               a.omsgq.append(TESTMSGA)
+               a.omsgq.append(TESTMSGB)
+
+               b.omsgq.append(TESTMSGC)
+               b.omsgq.append(TESTMSGD)
+               
+               
+               msg = swap(a, '>', self.noisy)
+                       
+
+               b.datagramReceived(msg)
+               self.assertEqual(b.state, sent)
+               
+               msg = swap(b, '<', self.noisy)
+               a.datagramReceived(msg)
+
+               msg = swap(a, '>', self.noisy)
+
+               b.datagramReceived(msg)
+               self.assertEqual(len(b.imsgq), 2)
+               l = [TESTMSGA, TESTMSGB]
+               l.sort();b.imsgq.sort()
+               self.assertEqual(b.imsgq, l)
+               self.assertEqual(b.inMsg, 2)
+
+               msg = swap(b, '<', self.noisy)
+               a.datagramReceived(msg)
+               
+               self.assertEqual(len(a.imsgq), 2)
+               l = [TESTMSGC, TESTMSGD]
+               l.sort();a.imsgq.sort()
+               self.assertEqual(a.imsgq, l)
+               self.assertEqual(a.inMsg, 2)
+
+       def testDoubleDoubleProb(self, prob=0.25):
+               a = self.a
+               b = self.b
+               TESTMSGA = "Howdy, Y'All!"
+               TESTMSGB = "Yee Haw"
+               TESTMSGC = "FOO BAR"
+               TESTMSGD = "WING WANG"
+               
+               a.omsgq.append(TESTMSGA)
+               a.omsgq.append(TESTMSGB)
+
+               b.omsgq.append(TESTMSGC)
+               b.omsgq.append(TESTMSGD)
+               
+               while a.state != confirmed or b.state != confirmed or ord(msga[0]) & FLAG_NEXT or ord(msgb[0]) & FLAG_NEXT :
+                       msga = swap(a, '>', self.noisy)
+       
+                       if rand(0,1) < prob:
+                               b.datagramReceived(msga)
+                       
+                       msgb = swap(b, '<', self.noisy)
+
+                       if rand(0,1) < prob:
+                               a.datagramReceived(msgb)
+
+               self.assertEqual(a.state, confirmed)
+               self.assertEqual(b.state, confirmed)
+               self.assertEqual(len(b.imsgq), 2)
+               l = [TESTMSGA, TESTMSGB]
+               l.sort();b.imsgq.sort()
+               self.assertEqual(b.imsgq, l)
+                               
+               self.assertEqual(len(a.imsgq), 2)
+               l = [TESTMSGC, TESTMSGD]
+               l.sort();a.imsgq.sort()
+               self.assertEqual(a.imsgq, l)
+
+       def testOneWayBlast(self, num = 2**8):
+               a = self.a
+               b = self.b
+               import sha
+               
+               
+               for i in xrange(num):
+                       a.omsgq.append(sha.sha(`i`).digest())
+               msg = swap(a, '>', self.noisy)
+               while a.state != confirmed or ord(msg[0]) & FLAG_NEXT:
+                       b.datagramReceived(msg)
+                       msg = swap(b, '<', self.noisy)
+
+                       a.datagramReceived(msg)
+                       msg = swap(a, '>', self.noisy)
+
+               self.assertEqual(len(b.imsgq), num)
+               
+       def testTwoWayBlast(self, num = 2**15, prob=0.5):
+               a = self.a
+               b = self.b
+               import sha
+               
+               
+               for i in xrange(num):
+                       a.omsgq.append(sha.sha('a' + `i`).digest())
+                       b.omsgq.append(sha.sha('b' + `i`).digest())
+                       
+               while a.omsgq or b.omsgq or a.weMissed or b.weMissed or ord(msga[0]) & (FLAG_NEXT | FLAG_MISSED) or ord(msgb[0]) & (FLAG_NEXT | FLAG_MISSED):
+                       if rand(0,1) < prob:
+                               msga = swap(a, '>', self.noisy)
+                               b.datagramReceived(msga)
+                       else:
+                               msga = swap(a, '>', 0)
+                       if rand(0,1) < prob:
+                               msgb = swap(b, '<', self.noisy)
+                               a.datagramReceived(msgb)
+                       else:
+                               msgb = swap(b, '<', 0)
+                                       
+
+
+               self.assertEqual(len(a.imsgq), num)
+               self.assertEqual(len(b.imsgq), num)
+               
+       def testLimitMessageNumbers(self):
+               a = self.a
+               b = self.b
+               import sha
+
+               msg = swap(a, noisy=self.noisy)
+               b.datagramReceived(msg)
+
+               msg = swap(b, noisy=self.noisy)
+               a.datagramReceived(msg)
+               
+               
+               for i in range(5000):
+                       a.omsgq.append(sha.sha('a' + 'i').digest())
+               
+               for i in range(31):
+                       msg = swap(a, noisy=self.noisy)
+                       self.assertEqual(a.obSeq, 0)
+                       self.assertEqual(a.outMsgNums[a.obSeq], 0)
+               self.assertEqual(a.next, 254)
+               self.assertEqual(a.outMsgNums[19], 254)