## Airhook Protocol http://airhook.org/protocol.html
-## Copyright 2002, Andrew Loewenstern, All Rights Reserved
+## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
from random import uniform as rand
from struct import pack, unpack
-class Airhook(protocol.DatagramProtocol):
+class Airhook:
def __init__(self):
self.noisy = None
# this should be changed to storage that drops old entries
self.connections = {}
def datagramReceived(self, datagram, addr):
+ #print `addr`, `datagram`
+ #if addr != self.addr:
self.connectionForAddr(addr).datagramReceived(datagram)
def connectionForAddr(self, addr):
+ if addr == self.addr:
+ raise Exception
if not self.connections.has_key(addr):
conn = self.connection()
+ conn.addr = addr
+ conn.makeConnection(self.transport)
conn.protocol = self.factory.buildProtocol(addr)
conn.protocol.makeConnection(conn)
- conn.makeConnection(self.transport)
- conn.addr = addr
self.connections[addr] = conn
else:
conn = self.connections[addr]
return conn
+ def makeConnection(self, transport):
+ self.transport = transport
+ addr = transport.getHost()
+ self.addr = (addr.host, addr.port)
+
+ def doStart(self):
+ pass
+ def doStop(self):
+ pass
class AirhookPacket:
def __init__(self, msg):
self.msgs.append( msg[skip:skip+n])
skip += n
-class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConnectedTransport):
+class AirhookConnection:
def __init__(self):
self.outSeq = 0 # highest sequence we have sent, can't be 255 more than obSeq
- self.obSeq = 0 # highest sequence confirmed by remote
- self.inSeq = 0 # last received sequence
self.observed = None # their session id
self.sessionID = long(rand(0, 2**32)) # our session id
- self.lastTransmit = -1 # time we last sent a packet with messages
- self.lastReceieved = 0 # time we last received a packet with messages
+ self.lastTransmit = 0 # time we last sent a packet with messages
+ self.lastReceived = 0 # time we last received a packet with messages
self.lastTransmitSeq = -1 # last sequence we sent a packet
- self.state = pending
+ self.state = pending # one of pending, sent, confirmed
- self.outMsgs = [None] * 256 # outgoing messages (seq sent, message), index = message number
- self.omsgq = [] # list of messages to go out
- self.imsgq = [] # list of messages coming in
self.sendSession = None # send session/observed fields until obSeq > sendSession
self.response = 0 # if we know we have a response now (like resending missed packets)
self.noisy = 0
- self.resetMessages()
-
- def resetMessages(self):
+ self.resetConnection()
+
+ def makeConnection(self, transport):
+ self.transport = transport
+
+ def resetConnection(self):
self.weMissed = []
+ self.outMsgs = [None] * 256 # outgoing messages (seq sent, message), index = message number
self.inMsg = 0 # next incoming message number
- self.outMsgNums = [None] * 256 # outgoing message numbers i = outNum % 256
+ self.outMsgNums = [0] * 256 # outgoing message numbers i = outNum % 256
self.next = 0 # next outgoing message number
+ self.scheduled = 0 # a sendNext is scheduled, don't schedule another
+ self.omsgq = [] # list of messages to go out
+ self.imsgq = [] # list of messages coming in
+ self.obSeq = 0 # highest sequence confirmed by remote
+ self.inSeq = 0 # last received sequence
def datagramReceived(self, datagram):
if not datagram:
return
+ if self.noisy:
+ print `datagram`
p = AirhookPacket(datagram)
- # check to make sure sequence number isn't out of order
- if (p.seq - self.inSeq) % 2**16 >= 256:
- return
# check for state change
if self.state == pending:
if p.observed == self.sessionID:
self.observed = p.session
self.state = confirmed
+ self.response = 1
else:
- # bogus packet!
- return
+ self.observed = p.session
+ self.response = 1
elif p.session != None:
self.observed = p.session
- self.state = sent
+ self.response = 1
+ else:
self.response = 1
elif self.state == sent:
if p.observed != None and p.session != None:
self.observed = p.session
elif self.observed != p.session:
self.state = pending
- self.resetMessages()
+ self.observed = p.session
+ self.resetConnection()
+ self.response = 1
+ if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
+ self.protocol.resetConnection()
self.inSeq = p.seq
- self.response = 1
+ self.schedule()
+ return
+ elif p.session == None and p.observed == None:
+ self.response = 1
+ self.schedule()
+
elif self.state == confirmed:
if p.session != None or p.observed != None :
- if p.session != self.observed or p.observed != self.sessionID:
+ if (p.session != None and p.session != self.observed) or (p.observed != None and p.observed != self.sessionID):
self.state = pending
- if seq == 0:
- self.resetMessages()
- self.inSeq = p.seq
+ self.observed = p.session
+ self.resetConnection()
+ self.inSeq = p.seq
+ if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
+ self.protocol.resetConnection()
+ self.schedule()
+ return
+ # check to make sure sequence number isn't out of order
+ if (p.seq - self.inSeq) % 2**16 >= 256:
+ return
- if self.state != pending:
+ if self.state == confirmed:
msgs = []
missed = []
# session / observed logic
if self.state == pending:
+ if self.observed != None:
+ flags = flags | FLAG_OBSERVED
+ ids += pack("!L", self.observed)
flags = flags | FLAG_SESSION
ids += pack("!L", self.sessionID)
self.state = sent
ids += pack("!L", self.sessionID)
else:
- if self.state == sent or self.sendSession:
+ if self.state == sent or self.sendSession != None:
if self.state == confirmed and (self.obSeq - self.sendSession) % 2**16 < 256:
self.sendSession = None
else:
if self.obSeq >= 0:
self.weMissed = filter(lambda a: a[0] > self.obSeq, self.weMissed)
- if self.weMissed:
+ if len(self.weMissed) > 0:
flags = flags | FLAG_MISSED
missed += chr(len(self.weMissed) - 1)
for i in self.weMissed:
self.lastTransmit = time()
self.transport.write(packet, self.addr)
+ self.scheduled = 0
self.schedule()
def timeToSend(self):
+ if self.state == pending:
+ return (1, 0)
+
+ outstanding = (256 + (((self.next - 1) % 256) - self.outMsgNums[self.obSeq % 256])) % 256
# any outstanding messages and are we not too far ahead of our counterparty?
- if self.omsgq and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256:
- return 1
+ if len(self.omsgq) > 0 and self.state != sent and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256:
+ #if len(self.omsgq) > 0 and self.state != sent and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and not outstanding:
+ return (1, 0)
+
# do we explicitly need to send a response?
- elif self.response:
+ if self.response:
self.response = 0
- return 1
+ return (1, 0)
# have we not sent anything in a while?
- elif time() - self.lastTransmit > 1.0:
- return 1
-
+ #elif time() - self.lastTransmit > 1.0:
+ #return (1, 1)
+
# nothing to send
- return 0
+ return (0, 0)
def schedule(self):
- if self.timeToSend():
- reactor.callLater(0, self.sendNext)
- else:
- reactor.callLater(1, self.sendNext)
+ tts, t = self.timeToSend()
+ if tts and not self.scheduled:
+ self.scheduled = 1
+ reactor.callLater(t, self.sendNext)
def write(self, data):
# micropackets can only be 255 bytes or less
"""
def __init__(self):
AirhookConnection.__init__(self)
+ self.resetStream()
+
+ def resetStream(self):
self.oseq = 0
self.iseq = 0
self.q = []
+ def resetConnection(self):
+ AirhookConnection.resetConnection(self)
+ self.resetStream()
+
+ def loseConnection(self):
+ pass
+
def dataCameIn(self):
# put 'em together
for msg in self.imsgq:
self.omsgq.insert(0, p)
data = data[253:]
self.oseq = (self.oseq + 1) % 2**16
-
self.schedule()
def writeSequence(self, sequence):