+++ /dev/null
-## Airhook Protocol http://airhook.org/protocol.html
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
-
-from random import uniform as rand
-from struct import pack, unpack
-from time import time
-import unittest
-from bisect import insort_left
-
-from twisted.internet import protocol
-from twisted.internet import abstract
-from twisted.internet import reactor
-from twisted.internet import app
-from twisted.internet import interfaces
-
-# flags
-FLAG_AIRHOOK = 128
-FLAG_OBSERVED = 16
-FLAG_SESSION = 8
-FLAG_MISSED = 4
-FLAG_NEXT = 2
-FLAG_INTERVAL = 1
-
-MAX_PACKET_SIZE = 1450
-
-pending = 0
-sent = 1
-confirmed = 2
-
-
-
-class Airhook:
- def __init__(self):
- self.noisy = None
- # this should be changed to storage that drops old entries
- self.connections = {}
-
- def datagramReceived(self, datagram, addr):
- #print `addr`, `datagram`
- #if addr != self.addr:
- self.connectionForAddr(addr).datagramReceived(datagram)
-
- def connectionForAddr(self, addr):
- if addr == self.addr:
- raise Exception
- if not self.connections.has_key(addr):
- conn = self.connection()
- conn.addr = addr
- conn.makeConnection(self.transport)
- conn.protocol = self.factory.buildProtocol(addr)
- conn.protocol.makeConnection(conn)
- self.connections[addr] = conn
- else:
- conn = self.connections[addr]
- return conn
- def makeConnection(self, transport):
- self.transport = transport
- addr = transport.getHost()
- self.addr = (addr.host, addr.port)
-
- def doStart(self):
- pass
- def doStop(self):
- pass
-
-class AirhookPacket:
- def __init__(self, msg):
- self.datagram = msg
- self.oseq = ord(msg[1])
- self.seq = unpack("!H", msg[2:4])[0]
- self.flags = ord(msg[0])
- self.session = None
- self.observed = None
- self.next = None
- self.missed = []
- self.msgs = []
- skip = 4
- if self.flags & FLAG_OBSERVED:
- self.observed = unpack("!L", msg[skip:skip+4])[0]
- skip += 4
- if self.flags & FLAG_SESSION:
- self.session = unpack("!L", msg[skip:skip+4])[0]
- skip += 4
- if self.flags & FLAG_NEXT:
- self.next = ord(msg[skip])
- skip += 1
- if self.flags & FLAG_MISSED:
- num = ord(msg[skip]) + 1
- skip += 1
- for i in range(num):
- self.missed.append( ord(msg[skip+i]))
- skip += num
- if self.flags & FLAG_NEXT:
- while len(msg) - skip > 0:
- n = ord(msg[skip]) + 1
- skip+=1
- self.msgs.append( msg[skip:skip+n])
- skip += n
-
-class AirhookConnection:
- def __init__(self):
- self.outSeq = 0 # highest sequence we have sent, can't be 255 more than obSeq
- self.observed = None # their session id
- self.sessionID = long(rand(0, 2**32)) # our session id
-
- self.lastTransmit = 0 # time we last sent a packet with messages
- self.lastReceived = 0 # time we last received a packet with messages
- self.lastTransmitSeq = -1 # last sequence we sent a packet
- self.state = pending # one of pending, sent, confirmed
-
- self.sendSession = None # send session/observed fields until obSeq > sendSession
- self.response = 0 # if we know we have a response now (like resending missed packets)
- self.noisy = 0
- self.resetConnection()
-
- def makeConnection(self, transport):
- self.transport = transport
-
- def resetConnection(self):
- self.weMissed = []
- self.outMsgs = [None] * 256 # outgoing messages (seq sent, message), index = message number
- self.inMsg = 0 # next incoming message number
- self.outMsgNums = [0] * 256 # outgoing message numbers i = outNum % 256
- self.next = 0 # next outgoing message number
- self.scheduled = 0 # a sendNext is scheduled, don't schedule another
- self.omsgq = [] # list of messages to go out
- self.imsgq = [] # list of messages coming in
- self.obSeq = 0 # highest sequence confirmed by remote
- self.inSeq = 0 # last received sequence
-
- def datagramReceived(self, datagram):
- if not datagram:
- return
- if self.noisy:
- print `datagram`
- p = AirhookPacket(datagram)
-
-
- # check for state change
- if self.state == pending:
- if p.observed != None and p.session != None:
- if p.observed == self.sessionID:
- self.observed = p.session
- self.state = confirmed
- self.response = 1
- else:
- self.observed = p.session
- self.response = 1
- elif p.session != None:
- self.observed = p.session
- self.response = 1
- else:
- self.response = 1
- elif self.state == sent:
- if p.observed != None and p.session != None:
- if p.observed == self.sessionID:
- self.observed = p.session
- self.sendSession = self.outSeq
- self.state = confirmed
- if p.session != None:
- if not self.observed:
- self.observed = p.session
- elif self.observed != p.session:
- self.state = pending
- self.observed = p.session
- self.resetConnection()
- self.response = 1
- if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
- self.protocol.resetConnection()
- self.inSeq = p.seq
- self.schedule()
- return
- elif p.session == None and p.observed == None:
- self.response = 1
- self.schedule()
-
- elif self.state == confirmed:
- if p.session != None or p.observed != None :
- if (p.session != None and p.session != self.observed) or (p.observed != None and p.observed != self.sessionID):
- self.state = pending
- self.observed = p.session
- self.resetConnection()
- self.inSeq = p.seq
- if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
- self.protocol.resetConnection()
- self.schedule()
- return
- # check to make sure sequence number isn't out of order
- if (p.seq - self.inSeq) % 2**16 >= 256:
- return
-
- if self.state == confirmed:
- msgs = []
- missed = []
-
- # see if they need us to resend anything
- for i in p.missed:
- if self.outMsgs[i] != None:
- self.omsgq.append(self.outMsgs[i])
- self.outMsgs[i] = None
-
- # see if we missed any messages
- if p.next != None:
- missed_count = (p.next - self.inMsg) % 256
- if missed_count:
- self.lastReceived = time()
- for i in range(missed_count):
- missed += [(self.outSeq, (self.inMsg + i) % 256)]
- self.weMissed += missed
- self.response = 1
- # record highest message number seen
- self.inMsg = (p.next + len(p.msgs)) % 256
-
- # append messages, update sequence
- self.imsgq += p.msgs
-
- if self.state == confirmed:
- # unpack the observed sequence
- tseq = unpack('!H', pack('!H', self.outSeq)[0] + chr(p.oseq))[0]
- if ((self.outSeq - tseq)) % 2**16 > 255:
- tseq = unpack('!H', chr(ord(pack('!H', self.outSeq)[0]) - 1) + chr(p.oseq))[0]
- self.obSeq = tseq
-
- self.inSeq = p.seq
-
- self.lastReceived = time()
- self.dataCameIn()
-
- self.schedule()
-
- def sendNext(self):
- flags = 0
- header = chr(self.inSeq & 255) + pack("!H", self.outSeq)
- ids = ""
- missed = ""
- msgs = ""
-
- # session / observed logic
- if self.state == pending:
- if self.observed != None:
- flags = flags | FLAG_OBSERVED
- ids += pack("!L", self.observed)
- flags = flags | FLAG_SESSION
- ids += pack("!L", self.sessionID)
- self.state = sent
- elif self.state == sent:
- if self.observed != None:
- flags = flags | FLAG_SESSION | FLAG_OBSERVED
- ids += pack("!LL", self.observed, self.sessionID)
- else:
- flags = flags | FLAG_SESSION
- ids += pack("!L", self.sessionID)
-
- else:
- if self.state == sent or self.sendSession != None:
- if self.state == confirmed and (self.obSeq - self.sendSession) % 2**16 < 256:
- self.sendSession = None
- else:
- flags = flags | FLAG_SESSION | FLAG_OBSERVED
- ids += pack("!LL", self.observed, self.sessionID)
-
- # missed header
- if self.obSeq >= 0:
- self.weMissed = filter(lambda a: a[0] > self.obSeq, self.weMissed)
-
- if len(self.weMissed) > 0:
- flags = flags | FLAG_MISSED
- missed += chr(len(self.weMissed) - 1)
- for i in self.weMissed:
- missed += chr(i[1])
-
- # append any outgoing messages
- if self.state == confirmed and self.omsgq:
- first = self.next
- outstanding = (256 + (((self.next - 1) % 256) - self.outMsgNums[self.obSeq % 256])) % 256
- while len(self.omsgq) and outstanding < 255 and len(self.omsgq[-1]) + len(msgs) + len(missed) + len(ids) + len(header) + 1 <= MAX_PACKET_SIZE:
- msg = self.omsgq.pop()
- msgs += chr(len(msg) - 1) + msg
- self.outMsgs[self.next] = msg
- self.next = (self.next + 1) % 256
- outstanding+=1
- # update outgoing message stat
- if msgs:
- flags = flags | FLAG_NEXT
- ids += chr(first)
- self.lastTransmitSeq = self.outSeq
- #self.outMsgNums[self.outSeq % 256] = first
- #else:
- self.outMsgNums[self.outSeq % 256] = (self.next - 1) % 256
-
- # do we need a NEXT flag despite not having sent any messages?
- if not flags & FLAG_NEXT and (256 + (((self.next - 1) % 256) - self.outMsgNums[self.obSeq % 256])) % 256 > 0:
- flags = flags | FLAG_NEXT
- ids += chr(self.next)
-
- # update stats and send packet
- packet = chr(flags) + header + ids + missed + msgs
- self.outSeq = (self.outSeq + 1) % 2**16
- self.lastTransmit = time()
- self.transport.write(packet, self.addr)
-
- self.scheduled = 0
- self.schedule()
-
- def timeToSend(self):
- if self.state == pending:
- return (1, 0)
-
- outstanding = (256 + (((self.next - 1) % 256) - self.outMsgNums[self.obSeq % 256])) % 256
- # any outstanding messages and are we not too far ahead of our counterparty?
- if len(self.omsgq) > 0 and self.state != sent and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256:
- #if len(self.omsgq) > 0 and self.state != sent and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and not outstanding:
- return (1, 0)
-
- # do we explicitly need to send a response?
- if self.response:
- self.response = 0
- return (1, 0)
- # have we not sent anything in a while?
- #elif time() - self.lastTransmit > 1.0:
- #return (1, 1)
-
- # nothing to send
- return (0, 0)
-
- def schedule(self):
- tts, t = self.timeToSend()
- if tts and not self.scheduled:
- self.scheduled = 1
- reactor.callLater(t, self.sendNext)
-
- def write(self, data):
- # micropackets can only be 255 bytes or less
- if len(data) <= 255:
- self.omsgq.insert(0, data)
- self.schedule()
-
- def dataCameIn(self):
- """
- called when we get a packet bearing messages
- """
- for msg in self.imsgq:
- self.protocol.dataReceived(msg)
- self.imsgq = []
-
-class ustr(str):
- """
- this subclass of string encapsulates each ordered message, caches it's sequence number,
- and has comparison functions to sort by sequence number
- """
- def getseq(self):
- if not hasattr(self, 'seq'):
- self.seq = unpack("!H", self[0:2])[0]
- return self.seq
- def __lt__(self, other):
- return (self.getseq() - other.getseq()) % 2**16 > 255
- def __le__(self, other):
- return (self.getseq() - other.getseq()) % 2**16 > 255 or self.__eq__(other)
- def __eq__(self, other):
- return self.getseq() == other.getseq()
- def __ne__(self, other):
- return self.getseq() != other.getseq()
- def __gt__(self, other):
- return (self.getseq() - other.getseq()) % 2**16 < 256 and not self.__eq__(other)
- def __ge__(self, other):
- return (self.getseq() - other.getseq()) % 2**16 < 256
-
-class StreamConnection(AirhookConnection):
- """
- this implements a simple protocol for a stream over airhook
- this is done for convenience, instead of making it a twisted.internet.protocol....
- the first two octets of each message are interpreted as a 16-bit sequence number
- 253 bytes are used for payload
-
- """
- def __init__(self):
- AirhookConnection.__init__(self)
- self.resetStream()
-
- def resetStream(self):
- self.oseq = 0
- self.iseq = 0
- self.q = []
-
- def resetConnection(self):
- AirhookConnection.resetConnection(self)
- self.resetStream()
-
- def loseConnection(self):
- pass
-
- def dataCameIn(self):
- # put 'em together
- for msg in self.imsgq:
- insort_left(self.q, ustr(msg))
- self.imsgq = []
- data = ""
- while self.q and self.iseq == self.q[0].getseq():
- data += self.q[0][2:]
- self.q = self.q[1:]
- self.iseq = (self.iseq + 1) % 2**16
- if data != '':
- self.protocol.dataReceived(data)
-
- def write(self, data):
- # chop it up and queue it up
- while data:
- p = pack("!H", self.oseq) + data[:253]
- self.omsgq.insert(0, p)
- data = data[253:]
- self.oseq = (self.oseq + 1) % 2**16
- self.schedule()
-
- def writeSequence(self, sequence):
- for data in sequence:
- self.write(data)
-
-
-def listenAirhook(port, factory):
- ah = Airhook()
- ah.connection = AirhookConnection
- ah.factory = factory
- reactor.listenUDP(port, ah)
- return ah
-
-def listenAirhookStream(port, factory):
- ah = Airhook()
- ah.connection = StreamConnection
- ah.factory = factory
- reactor.listenUDP(port, ah)
- return ah
-
-
+++ /dev/null
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
-
-import unittest
-from airhook import *
-from random import uniform as rand
-from cStringIO import StringIO
-
-
-if __name__ =="__main__":
- tests = unittest.defaultTestLoader.loadTestsFromNames(['test_airhook'])
- result = unittest.TextTestRunner().run(tests)
-
-class Echo(protocol.Protocol):
- def dataReceived(self, data):
- self.transport.write(data)
-
-class Noisy(protocol.Protocol):
- def dataReceived(self, data):
- print `data`
-
-class Receiver(protocol.Protocol):
- def __init__(self):
- self.q = []
- def dataReceived(self, data):
- self.q.append(data)
-
-class StreamReceiver(protocol.Protocol):
- def __init__(self):
- self.buf = ""
- def dataReceived(self, data):
- self.buf += data
-
-def makeEcho(port):
- f = protocol.Factory(); f.protocol = Echo
- return listenAirhookStream(port, f)
-def makeNoisy(port):
- f = protocol.Factory(); f.protocol = Noisy
- return listenAirhookStream(port, f)
-def makeReceiver(port):
- f = protocol.Factory(); f.protocol = Receiver
- return listenAirhookStream(port, f)
-def makeStreamReceiver(port):
- f = protocol.Factory(); f.protocol = StreamReceiver
- return listenAirhookStream(port, f)
-
-class DummyTransport:
- def __init__(self):
- self.s = StringIO()
- def write(self, data, addr):
- self.s.write(data)
- def seek(self, num):
- return self.s.seek(num)
- def read(self):
- return self.s.read()
-
-def test_createStartPacket():
- flags = 0 | FLAG_AIRHOOK | FLAG_SESSION
- packet = chr(flags) + "\xff" + "\x00\x00" + pack("!L", long(rand(0, 2**32)))
- return packet
-
-def test_createReply(session, observed, obseq, seq):
- flags = 0 | FLAG_AIRHOOK | FLAG_SESSION | FLAG_OBSERVED
- packet = chr(flags) + pack("!H", seq)[1] + pack("!H", obseq + 1) + pack("!L", session) + pack("!L", observed)
- return packet
-
-def pscope(msg, noisy=0):
- # packet scope
- str = ""
- p = AirhookPacket(msg)
- str += "oseq: %s seq: %s " % (p.oseq, p.seq)
- if noisy:
- str += "packet: %s \n" % (`p.datagram`)
- flags = p.flags
- str += "flags: "
- if flags & FLAG_SESSION:
- str += "FLAG_SESSION "
- if flags & FLAG_OBSERVED:
- str += "FLAG_OBSERVED "
- if flags & FLAG_MISSED:
- str += "FLAG_MISSED "
- if flags & FLAG_NEXT:
- str += "FLAG_NEXT "
- str += "\n"
-
- if p.observed != None:
- str += "OBSERVED: %s\n" % p.observed
- if p.session != None:
- str += "SESSION: %s\n" % p.session
- if p.next != None:
- str += "NEXT: %s\n" % p.next
- if p.missed:
- if noisy:
- str += "MISSED: " + `p.missed`
- else:
- str += "MISSED: " + `len(p.missed)`
- str += "\n"
- if p.msgs:
- if noisy:
- str += "MSGS: " + `p.msgs` + "\n"
- else:
- str += "MSGS: <%s> " % len(p.msgs)
- str += "\n"
- return str
-
-# testing function
-def swap(a, dir="", noisy=0):
- msg = ""
- while not msg:
- a.transport.seek(0)
- msg= a.transport.read()
- a.transport = DummyTransport()
- if not msg:
- a.sendNext()
- if noisy:
- print 6*dir + " " + pscope(msg)
- return msg
-
-def runTillEmpty(a, b, prob=1.0, noisy=0):
- msga = ''
- msgb = ''
- while a.omsgq or b.omsgq or a.weMissed or b.weMissed or ord(msga[0]) & (FLAG_NEXT | FLAG_MISSED) or ord(msgb[0]) & (FLAG_NEXT | FLAG_MISSED):
- if rand(0,1) < prob:
- msga = swap(a, '>', noisy)
- b.datagramReceived(msga)
- else:
- msga = swap(a, '>', 0)
- if rand(0,1) < prob:
- msgb = swap(b, '<', noisy)
- a.datagramReceived(msgb)
- else:
- msgb = swap(b, '<', 0)
-
-class UstrTests(unittest.TestCase):
- def u(self, seq):
- return ustr("%s%s" % (pack("!H", seq), 'foobar'))
-
- def testLT(self):
- self.failUnless(self.u(0) < self.u(1))
- self.failUnless(self.u(1) < self.u(2))
- self.failUnless(self.u(2**16 - 1) < self.u(0))
- self.failUnless(self.u(2**16 - 1) < self.u(1))
-
- self.failIf(self.u(1) < self.u(0))
- self.failIf(self.u(2) < self.u(1))
- self.failIf(self.u(0) < self.u(2**16 - 1))
- self.failIf(self.u(1) < self.u(2**16 - 1))
-
- def testLTE(self):
- self.failUnless(self.u(0) <= self.u(1))
- self.failUnless(self.u(1) <= self.u(2))
- self.failUnless(self.u(2) <= self.u(2))
- self.failUnless(self.u(2**16 - 1) <= self.u(0))
- self.failUnless(self.u(2**16 - 1) <= self.u(1))
- self.failUnless(self.u(2**16 - 1) <= self.u(2**16))
-
- self.failIf(self.u(1) <= self.u(0))
- self.failIf(self.u(2) <= self.u(1))
- self.failIf(self.u(0) <= self.u(2**16 - 1))
- self.failIf(self.u(1) <= self.u(2**16 - 1))
-
- def testGT(self):
- self.failUnless(self.u(1) > self.u(0))
- self.failUnless(self.u(2) > self.u(1))
- self.failUnless(self.u(0) > self.u(2**16 - 1))
- self.failUnless(self.u(1) > self.u(2**16 - 1))
-
- self.failIf(self.u(0) > self.u(1))
- self.failIf(self.u(1) > self.u(2))
- self.failIf(self.u(2**16 - 1) > self.u(0))
- self.failIf(self.u(2**16 - 1) > self.u(1))
-
- def testGTE(self):
- self.failUnless(self.u(1) >= self.u(0))
- self.failUnless(self.u(2) >= self.u(1))
- self.failUnless(self.u(2) >= self.u(2))
- self.failUnless(self.u(0) >= self.u(0))
- self.failUnless(self.u(1) >= self.u(1))
- self.failUnless(self.u(2**16 - 1) >= self.u(2**16 - 1))
-
- self.failIf(self.u(0) >= self.u(1))
- self.failIf(self.u(1) >= self.u(2))
- self.failIf(self.u(2**16 - 1) >= self.u(0))
- self.failIf(self.u(2**16 - 1) >= self.u(1))
-
- def testEQ(self):
- self.failUnless(self.u(0) == self.u(0))
- self.failUnless(self.u(1) == self.u(1))
- self.failUnless(self.u(2**16 - 1) == self.u(2**16-1))
-
- self.failIf(self.u(0) == self.u(1))
- self.failIf(self.u(1) == self.u(0))
- self.failIf(self.u(2**16 - 1) == self.u(0))
-
- def testNEQ(self):
- self.failUnless(self.u(1) != self.u(0))
- self.failUnless(self.u(2) != self.u(1))
- self.failIf(self.u(2) != self.u(2))
- self.failIf(self.u(0) != self.u(0))
- self.failIf(self.u(1) != self.u(1))
- self.failIf(self.u(2**16 - 1) != self.u(2**16 - 1))
-
-
-class SimpleTest(unittest.TestCase):
- def setUp(self):
- self.noisy = 0
- self.a = AirhookConnection()
- self.a.makeConnection(DummyTransport())
- self.a.addr = ('127.0.0.1', 4444)
- self.b = AirhookConnection()
- self.b.makeConnection(DummyTransport())
- self.b.addr = ('127.0.0.1', 4444)
-
- def testReallySimple(self):
- # connect to eachother and send a few packets, observe sequence incrementing
- a = self.a
- b = self.b
- self.assertEqual(a.state, pending)
- self.assertEqual(b.state, pending)
- self.assertEqual(a.outSeq, 0)
- self.assertEqual(b.outSeq, 0)
- self.assertEqual(a.obSeq, 0)
- self.assertEqual(b.obSeq, 0)
-
- msg = swap(a, '>', self.noisy)
- self.assertEqual(a.state, sent)
- self.assertEqual(a.outSeq, 1)
- self.assertEqual(a.obSeq, 0)
-
- b.datagramReceived(msg)
- self.assertEqual(b.inSeq, 0)
- self.assertEqual(b.obSeq, 0)
- msg = swap(b, '<', self.noisy)
- self.assertEqual(b.state, sent)
- self.assertEqual(b.outSeq, 1)
-
- a.datagramReceived(msg)
- self.assertEqual(a.state, confirmed)
- self.assertEqual(a.obSeq, 0)
- self.assertEqual(a.inSeq, 0)
- msg = swap(a, '>', self.noisy)
- self.assertEqual(a.outSeq, 2)
-
- b.datagramReceived(msg)
- self.assertEqual(b.state, confirmed)
- self.assertEqual(b.obSeq, 0)
- self.assertEqual(b.inSeq, 1)
- msg = swap(b, '<', self.noisy)
- self.assertEqual(b.outSeq, 2)
-
- a.datagramReceived(msg)
- self.assertEqual(a.outSeq, 2)
- self.assertEqual(a.inSeq, 1)
- self.assertEqual(a.obSeq, 1)
-
-class BasicTests(unittest.TestCase):
- def setUp(self):
- self.noisy = 0
- self.a = AirhookConnection()
- self.a.makeConnection(DummyTransport())
- self.a.addr = ('127.0.0.1', 4444)
- self.b = AirhookConnection()
- self.b.makeConnection(DummyTransport())
- self.b.addr = ('127.0.0.1', 4444)
- self.a.protocol = Receiver()
- self.b.protocol = Receiver()
-
- def testSimple(self):
- a = self.a
- b = self.b
-
- TESTMSG = "Howdy, Y'All!"
- a.omsgq.append(TESTMSG)
- a.sendNext()
- msg = swap(a, '>', self.noisy)
-
- b.datagramReceived(msg)
- msg = swap(b, '<', self.noisy)
- a.datagramReceived(msg)
- msg = swap(a, '>', self.noisy)
- b.datagramReceived(msg)
-
- self.assertEqual(b.inMsg, 1)
- self.assertEqual(len(b.protocol.q), 1)
- self.assertEqual(b.protocol.q[0], TESTMSG)
-
- msg = swap(b, '<', self.noisy)
-
- a.datagramReceived(msg)
- msg = swap(a, '>', self.noisy)
- b.datagramReceived(msg)
-
- def testLostFirst(self):
- a = self.a
- b = self.b
-
- TESTMSG = "Howdy, Y'All!"
- TESTMSG2 = "Yee Haw"
-
- a.omsgq.append(TESTMSG)
- msg = swap(a, '>', self.noisy)
- b.datagramReceived(msg)
- msg = swap(b, '<', self.noisy)
- self.assertEqual(b.state, sent)
- a.datagramReceived(msg)
- msg = swap(a, '>', self.noisy)
-
- del(msg) # dropping first message
-
- a.omsgq.append(TESTMSG2)
- msg = swap(a, '>', self.noisy)
-
- b.datagramReceived(msg)
- self.assertEqual(b.state, confirmed)
- self.assertEqual(len(b.protocol.q), 1)
- self.assertEqual(b.protocol.q[0], TESTMSG2)
- self.assertEqual(b.weMissed, [(1, 0)])
- msg = swap(b, '<', self.noisy)
-
- a.datagramReceived(msg)
-
- msg = swap(a, '>', self.noisy)
-
- b.datagramReceived(msg)
- self.assertEqual(len(b.protocol.q), 2)
- b.protocol.q.sort()
- l = [TESTMSG2, TESTMSG]
- l.sort()
- self.assertEqual(b.protocol.q,l)
-
- msg = swap(b, '<', self.noisy)
-
- a.datagramReceived(msg)
- msg = swap(a, '>', self.noisy)
- b.datagramReceived(msg)
-
- msg = swap(b, '<', self.noisy)
- a.datagramReceived(msg)
- msg = swap(a, '>', self.noisy)
- b.datagramReceived(msg)
-
- msg = swap(b, '<', self.noisy)
- a.datagramReceived(msg)
- msg = swap(a, '>', self.noisy)
-
- self.assertEqual(len(b.protocol.q), 2)
- b.protocol.q.sort()
- l = [TESTMSG2, TESTMSG]
- l.sort()
- self.assertEqual(b.protocol.q,l)
-
- def testLostSecond(self):
- a = self.a
- b = self.b
-
- TESTMSG = "Howdy, Y'All!"
- TESTMSG2 = "Yee Haw"
-
- a.omsgq.append(TESTMSG)
- msg = swap(a, '>', self.noisy)
- b.datagramReceived(msg)
- msg = swap(b, '<', self.noisy)
- self.assertEqual(b.state, sent)
- a.datagramReceived(msg)
- msg = swap(a, '>', self.noisy)
-
- a.omsgq.append(TESTMSG2)
- msg2 = swap(a, '>', self.noisy)
- del(msg2) # dropping second message
-
- assert(a.outMsgs[1] != None)
-
- b.datagramReceived(msg)
- self.assertEqual(b.state, confirmed)
- self.assertEqual(len(b.protocol.q), 1)
- self.assertEqual(b.protocol.q[0], TESTMSG)
- self.assertEqual(b.inMsg, 1)
- self.assertEqual(b.weMissed, [])
- msg = swap(b, '<', self.noisy)
-
- a.datagramReceived(msg)
- assert(a.outMsgs[1] != None)
- msg = swap(a, '>', self.noisy)
-
- b.datagramReceived(msg)
- self.assertEqual(b.state, confirmed)
- self.assertEqual(len(b.protocol.q), 1)
- self.assertEqual(b.protocol.q[0], TESTMSG)
- self.assertEqual(b.weMissed, [(2, 1)])
- msg = swap(b, '<', self.noisy)
-
- a.datagramReceived(msg)
- msg = swap(a, '>', self.noisy)
-
- b.datagramReceived(msg)
- self.assertEqual(len(b.protocol.q), 2)
- b.protocol.q.sort()
- l = [TESTMSG2, TESTMSG]
- l.sort()
- self.assertEqual(b.protocol.q,l)
-
- msg = swap(b, '<', self.noisy)
-
- a.datagramReceived(msg)
- msg = swap(a, '>', self.noisy)
-
- b.datagramReceived(msg)
-
- msg = swap(b, '<', self.noisy)
-
- a.datagramReceived(msg)
- msg = swap(a, '>', self.noisy)
-
- b.datagramReceived(msg)
-
- msg = swap(b, '<', self.noisy)
-
- a.datagramReceived(msg)
-
-
- msg = swap(a, '>', self.noisy)
-
- self.assertEqual(len(b.protocol.q), 2)
- b.protocol.q.sort()
- l = [TESTMSG2, TESTMSG]
- l.sort()
- self.assertEqual(b.protocol.q,l)
-
- def testDoubleDouble(self):
- a = self.a
- b = self.b
-
- TESTMSGA = "Howdy, Y'All!"
- TESTMSGB = "Yee Haw"
- TESTMSGC = "FOO BAR"
- TESTMSGD = "WING WANG"
-
- a.omsgq.append(TESTMSGA)
- a.omsgq.append(TESTMSGB)
-
- b.omsgq.append(TESTMSGC)
- b.omsgq.append(TESTMSGD)
-
-
- msg = swap(a, '>', self.noisy)
-
-
- b.datagramReceived(msg)
-
- msg = swap(b, '<', self.noisy)
- self.assertEqual(b.state, sent)
- a.datagramReceived(msg)
-
- msg = swap(a, '>', self.noisy)
-
- b.datagramReceived(msg)
- self.assertEqual(len(b.protocol.q), 2)
- l = [TESTMSGA, TESTMSGB]
- l.sort();b.protocol.q.sort()
- self.assertEqual(b.protocol.q, l)
- self.assertEqual(b.inMsg, 2)
-
- msg = swap(b, '<', self.noisy)
- a.datagramReceived(msg)
-
- self.assertEqual(len(a.protocol.q), 2)
- l = [TESTMSGC, TESTMSGD]
- l.sort();a.protocol.q.sort()
- self.assertEqual(a.protocol.q, l)
- self.assertEqual(a.inMsg, 2)
-
- def testDoubleDoubleProb(self, prob=0.25):
- a = self.a
- b = self.b
-
- TESTMSGA = "Howdy, Y'All!"
- TESTMSGB = "Yee Haw"
- TESTMSGC = "FOO BAR"
- TESTMSGD = "WING WANG"
-
- a.omsgq.append(TESTMSGA)
- a.omsgq.append(TESTMSGB)
-
- b.omsgq.append(TESTMSGC)
- b.omsgq.append(TESTMSGD)
-
- runTillEmpty(a, b, prob, self.noisy)
-
- self.assertEqual(a.state, confirmed)
- self.assertEqual(b.state, confirmed)
- self.assertEqual(len(b.protocol.q), 2)
- l = [TESTMSGA, TESTMSGB]
- l.sort();b.protocol.q.sort()
- self.assertEqual(b.protocol.q, l)
-
- self.assertEqual(len(a.protocol.q), 2)
- l = [TESTMSGC, TESTMSGD]
- l.sort();a.protocol.q.sort()
- self.assertEqual(a.protocol.q, l)
-
- def testOneWayBlast(self, num = 2**12):
- a = self.a
- b = self.b
-
- import sha
-
-
- for i in xrange(num):
- a.omsgq.append(sha.sha(`i`).digest())
- runTillEmpty(a, b, noisy=self.noisy)
-
- self.assertEqual(len(b.protocol.q), num)
-
- def testTwoWayBlast(self, num = 2**12, prob=0.5):
- a = self.a
- b = self.b
-
- import sha
-
-
- for i in xrange(num):
- a.omsgq.append(sha.sha('a' + `i`).digest())
- b.omsgq.append(sha.sha('b' + `i`).digest())
-
- runTillEmpty(a, b, prob, self.noisy)
-
-
- self.assertEqual(len(a.protocol.q), num)
- self.assertEqual(len(b.protocol.q), num)
-
- def testLimitMessageNumbers(self):
- a = self.a
- b = self.b
- import sha
-
- msg = swap(a, noisy=self.noisy)
- b.datagramReceived(msg)
-
- msg = swap(b, noisy=self.noisy)
- a.datagramReceived(msg)
-
-
- for i in range(5000):
- a.omsgq.append(sha.sha('a' + 'i').digest())
-
- for i in range(5000 / 255):
- msg = swap(a, noisy=self.noisy)
- self.assertEqual(a.obSeq, 0)
- self.assertEqual(a.next, 255)
- self.assertEqual(a.outMsgNums[(a.outSeq-1) % 256], 254)
-
- def testConnectionReset(self):
- self.testTwoWayBlast()
- self.b.protocol.q = []
- a = self.a
- b = self.b
- msg = swap(a, noisy=self.noisy)
- b.datagramReceived(msg)
-
- msg = swap(b, noisy=self.noisy)
- a.datagramReceived(msg)
-
- a.omsgq.append("TESTING")
- msg = swap(a, noisy=self.noisy)
- b.datagramReceived(msg)
-
- msg = swap(b, noisy=self.noisy)
- a.datagramReceived(msg)
-
- self.assertEqual(b.protocol.q[0], "TESTING")
- self.assertEqual(b.state, confirmed)
-
- self.a = AirhookConnection()
- self.a.makeConnection(DummyTransport())
- self.a.addr = ('127.0.0.1', 4444)
- a = self.a
-
- a.omsgq.append("TESTING2")
- msg = swap(a, noisy=self.noisy)
- b.datagramReceived(msg)
-
- msg = swap(b, noisy=self.noisy)
- a.datagramReceived(msg)
-
- self.assertEqual(len(b.protocol.q), 1)
- msg = swap(a, noisy=self.noisy)
- b.datagramReceived(msg)
-
- msg = swap(b, noisy=self.noisy)
- a.datagramReceived(msg)
-
- self.assertEqual(len(b.protocol.q), 2)
- self.assertEqual(b.protocol.q[1], "TESTING2")
-
- def testRecipientReset(self):
- self.testTwoWayBlast()
- self.b.protocol.q = []
- self.noisy = 0
- a = self.a
- b = self.b
- msg = swap(a, noisy=self.noisy)
- b.datagramReceived(msg)
-
- msg = swap(b, noisy=self.noisy)
- a.datagramReceived(msg)
-
- a.omsgq.append("TESTING")
- msg = swap(a, noisy=self.noisy)
- b.datagramReceived(msg)
-
- msg = swap(b, noisy=self.noisy)
- a.datagramReceived(msg)
-
- self.assertEqual(b.protocol.q[0], "TESTING")
- self.assertEqual(b.state, confirmed)
-
- self.b = AirhookConnection()
- self.b.makeConnection(DummyTransport())
- self.b.protocol = Receiver()
- self.b.addr = ('127.0.0.1', 4444)
- b = self.b
-
- msg = swap(a, noisy=self.noisy)
- b.datagramReceived(msg)
-
- msg = swap(b, noisy=self.noisy)
- a.datagramReceived(msg)
-
- a.omsgq.append("TESTING2")
- self.assertEqual(len(b.protocol.q), 0)
- msg = swap(a, noisy=self.noisy)
- b.datagramReceived(msg)
-
- msg = swap(b, noisy=self.noisy)
- a.datagramReceived(msg)
-
- msg = swap(a, noisy=self.noisy)
- b.datagramReceived(msg)
-
- msg = swap(b, noisy=self.noisy)
- a.datagramReceived(msg)
-
- self.assertEqual(len(b.protocol.q), 1)
- self.assertEqual(b.protocol.q[0], "TESTING2")
-
-
-class StreamTests(unittest.TestCase):
- def setUp(self):
- self.noisy = 0
- self.a = StreamConnection()
- self.a.makeConnection(DummyTransport())
- self.a.addr = ('127.0.0.1', 4444)
- self.b = StreamConnection()
- self.b.makeConnection(DummyTransport())
- self.b.addr = ('127.0.0.1', 4444)
- self.a.protocol = StreamReceiver()
- self.b.protocol = StreamReceiver()
-
- def testStreamSimple(self, num = 2**12, prob=1.0):
- f = open('/dev/urandom', 'r')
- a = self.a
- b = self.b
-
- MSGA = f.read(num)
- MSGB = f.read(num)
- self.a.write(MSGA)
- self.b.write(MSGB)
-
- runTillEmpty(a, b, prob, self.noisy)
-
- self.assertEqual(len(a.protocol.buf), len(MSGB))
- self.assertEqual(len(b.protocol.buf), len(MSGA))
- self.assertEqual(a.protocol.buf, MSGB)
- self.assertEqual(b.protocol.buf, MSGA)
-
- def testStreamLossy(self, num = 2**12, prob=0.5):
- self.testStreamSimple(num, prob)
-
-class SimpleReactor(unittest.TestCase):
- def setUp(self):
- self.noisy = 0
- self.a = makeReceiver(2020)
- self.b = makeReceiver(2021)
- self.ac = self.a.connectionForAddr(('127.0.0.1', 2021))
- self.bc = self.b.connectionForAddr(('127.0.0.1', 2020))
- self.ac.noisy = self.noisy
- self.bc.noisy = self.noisy
- def testSimple(self):
- msg = "Testing 1, 2, 3"
- self.ac.write(msg)
- reactor.iterate()
- reactor.iterate()
- reactor.iterate()
- self.assertEqual(self.bc.state, confirmed)
- self.assertEqual(self.bc.protocol.q, [msg])
-
-class SimpleReactorEcho(unittest.TestCase):
- def setUp(self):
- self.noisy = 0
- self.a = makeReceiver(2022)
- self.b = makeEcho(2023)
- self.ac = self.a.connectionForAddr(('127.0.0.1', 2023))
- self.bc = self.b.connectionForAddr(('127.0.0.1', 2022))
- def testSimple(self):
- msg = "Testing 1, 2, 3"
- self.ac.write(msg)
- reactor.iterate()
- reactor.iterate()
- reactor.iterate()
- reactor.iterate()
- self.assertEqual(self.ac.protocol.q, [msg])
- reactor.iterate()
- reactor.iterate()
- reactor.iterate()
- self.assertEqual(self.ac.protocol.q, [msg])
-
-
-class SimpleReactorStream(unittest.TestCase):
- def setUp(self):
- self.noisy = 0
- self.a = makeStreamReceiver(2024)
- self.b = makeStreamReceiver(2025)
- self.ac = self.a.connectionForAddr(('127.0.0.1', 2025))
- self.bc = self.b.connectionForAddr(('127.0.0.1', 2024))
- def testSimple(self):
- msg = "Testing 1, 2, 3"
- self.ac.write(msg)
- reactor.iterate()
- reactor.iterate()
- reactor.iterate()
- self.assertEqual(self.bc.protocol.buf, msg)
-
-class SimpleReactorStreamBig(unittest.TestCase):
- def setUp(self):
- self.noisy = 0
- self.a = makeStreamReceiver(2026)
- self.b = makeStreamReceiver(2027)
- self.ac = self.a.connectionForAddr(('127.0.0.1', 2027))
- self.bc = self.b.connectionForAddr(('127.0.0.1', 2026))
- def testBig(self):
- msg = open('/dev/urandom').read(4096)
- self.ac.write(msg)
- reactor.iterate()
- reactor.iterate()
- reactor.iterate()
- reactor.iterate()
- reactor.iterate()
- reactor.iterate()
- reactor.iterate()
- self.assertEqual(self.bc.protocol.buf, msg)
-
-class EchoReactorStreamBig(unittest.TestCase):
- def setUp(self):
- self.noisy = 0
- self.a = makeStreamReceiver(2028)
- self.b = makeEcho(2029)
- self.ac = self.a.connectionForAddr(('127.0.0.1', 2029))
- def testBig(self):
- msg = open('/dev/urandom').read(256)
- self.ac.write(msg)
- reactor.iterate()
- reactor.iterate()
- reactor.iterate()
- reactor.iterate()
- reactor.iterate()
- self.assertEqual(self.ac.protocol.buf, msg)
-
-