1 ## Airhook Protocol http://airhook.org/protocol.html
2 ## Copyright 2002, Andrew Loewenstern, All Rights Reserved
4 from random import uniform as rand
5 from struct import pack, unpack
7 from StringIO import StringIO
9 from bisect import insort_left
11 from twisted.internet import protocol
12 from twisted.internet import reactor
22 MAX_PACKET_SIZE = 1496
29 def setDelegate(self, delegate):
30 self.delegate = delegate
31 def getDelegate(self):
33 def msgDelegate(self, method, args=(), kwargs={}):
34 if hasattr(self, 'delegate') and hasattr(self.delegate, method) and callable(getattr(self.delegate, method)):
35 apply(getattr(self.delegate, method) , args, kwargs)
37 class Airhook(protocol.DatagramProtocol):
39 def __init__(self, connection_class):
40 self.connection_class = connection_class
41 def startProtocol(self):
44 def datagramReceived(self, datagram, addr):
46 if not flag & FLAG_AIRHOOK: # first bit always must be 0
47 conn = self.connectionForAddr(addr)
48 conn.datagramReceieved(datagram)
50 def connectionForAddr(self, addr):
51 if not self.connections.has_key(addr):
52 conn = connection_class(self.transport, addr, self.delegate)
53 self.connections[addr] = conn
54 return self.connections[addr]
58 def __init__(self, msg):
60 self.oseq = ord(msg[1])
61 self.seq = unpack("!H", msg[2:4])[0]
62 self.flags = ord(msg[0])
69 if self.flags & FLAG_OBSERVED:
70 self.observed = unpack("!L", msg[skip:skip+4])[0]
72 if self.flags & FLAG_SESSION:
73 self.session = unpack("!L", msg[skip:skip+4])[0]
75 if self.flags & FLAG_NEXT:
76 self.next = ord(msg[skip])
78 if self.flags & FLAG_MISSED:
79 num = ord(msg[skip]) + 1
82 self.missed.append( ord(msg[skip+i]))
84 if self.flags & FLAG_NEXT:
85 while len(msg) - skip > 0:
86 n = ord(msg[skip]) + 1
88 self.msgs.append( msg[skip:skip+n])
91 class AirhookConnection(Delegate):
92 def __init__(self, transport, addr, delegate):
93 self.delegate = delegate
95 type, self.host, self.port = addr
96 self.transport = transport
98 self.outSeq = 0 # highest sequence we have sent, can't be 255 more than obSeq
99 self.obSeq = 0 # highest sequence confirmed by remote
100 self.inSeq = 0 # last received sequence
101 self.observed = None # their session id
102 self.sessionID = long(rand(0, 2**32)) # our session id
104 self.lastTransmit = -1 # time we last sent a packet with messages
105 self.lastReceieved = 0 # time we last received a packet with messages
106 self.lastTransmitSeq = -1 # last sequence we sent a packet
109 self.outMsgs = [None] * 256 # outgoing messages (seq sent, message), index = message number
110 self.omsgq = [] # list of messages to go out
111 self.imsgq = [] # list of messages coming in
112 self.sendSession = None # send session/observed fields until obSeq > sendSession
116 def resetMessages(self):
118 self.inMsg = 0 # next incoming message number
119 self.outMsgNums = [None] * 256 # outgoing message numbers i = outNum % 256
120 self.next = 0 # next outgoing message number
122 def datagramReceived(self, datagram):
125 response = 0 # if we know we have a response now (like resending missed packets)
126 p = AirhookPacket(datagram)
128 # check to make sure sequence number isn't out of order
129 if (p.seq - self.inSeq) % 2**16 >= 256:
132 # check for state change
133 if self.state == pending:
134 if p.observed != None and p.session != None:
135 if p.observed == self.sessionID:
136 self.observed = p.session
137 self.state = confirmed
141 elif p.session != None:
142 self.observed = p.session
145 elif self.state == sent:
146 if p.observed != None and p.session != None:
147 if p.observed == self.sessionID:
148 self.observed = p.session
149 self.sendSession = self.outSeq
150 self.state = confirmed
151 if p.session != None:
152 if not self.observed:
153 self.observed = p.session
154 elif self.observed != p.session:
159 elif self.state == confirmed:
160 if p.session != None or p.observed != None :
161 if p.session != self.observed or p.observed != self.sessionID:
167 if self.state != pending:
171 # see if they need us to resend anything
174 if self.outMsgs[i] != None:
175 self.omsgq.append(self.outMsgs[i])
176 self.outMsgs[i] = None
178 # see if we missed any messages
180 missed_count = (p.next - self.inMsg) % 256
182 self.lastReceived = time()
183 for i in range(missed_count):
184 missed += [(self.outSeq, (self.inMsg + i) % 256)]
186 self.weMissed += missed
187 # record highest message number seen
188 self.inMsg = (p.next + len(p.msgs)) % 256
190 # append messages, update sequence
193 if self.state == confirmed:
194 # unpack the observed sequence
195 tseq = unpack('!H', pack('!H', self.outSeq)[0] + chr(p.oseq))[0]
196 if ((self.outSeq - tseq)) % 2**16 > 255:
197 tseq = unpack('!H', chr(ord(pack('!H', self.outSeq)[0]) - 1) + chr(p.oseq))[0]
203 reactor.callLater(0, self.sendNext)
204 self.lastReceived = time()
209 header = chr(self.inSeq & 255) + pack("!H", self.outSeq)
214 # session / observed logic
215 if self.state == pending:
216 flags = flags | FLAG_SESSION
217 ids += pack("!L", self.sessionID)
219 elif self.state == sent:
220 if self.observed != None:
221 flags = flags | FLAG_SESSION | FLAG_OBSERVED
222 ids += pack("!LL", self.observed, self.sessionID)
224 flags = flags | FLAG_SESSION
225 ids += pack("!L", self.sessionID)
228 if self.state == sent or self.sendSession:
229 if self.state == confirmed and (self.obSeq - self.sendSession) % 2**16 < 256:
230 self.sendSession = None
232 flags = flags | FLAG_SESSION | FLAG_OBSERVED
233 ids += pack("!LL", self.observed, self.sessionID)
237 self.weMissed = filter(lambda a: a[0] > self.obSeq, self.weMissed)
240 flags = flags | FLAG_MISSED
241 missed += chr(len(self.weMissed) - 1)
242 for i in self.weMissed:
245 # append any outgoing messages
246 if self.state == confirmed and self.omsgq:
248 outstanding = (256 + (((self.next - 1) % 256) - self.outMsgNums[self.obSeq % 256])) % 256
249 while len(self.omsgq) and outstanding < 255 and len(self.omsgq[-1]) + len(msgs) + len(missed) + len(ids) + len(header) + 1 <= MAX_PACKET_SIZE:
250 msg = self.omsgq.pop()
251 msgs += chr(len(msg) - 1) + msg
252 self.outMsgs[self.next] = msg
253 self.next = (self.next + 1) % 256
255 # update outgoing message stat
257 flags = flags | FLAG_NEXT
259 self.lastTransmitSeq = self.outSeq
260 #self.outMsgNums[self.outSeq % 256] = first
262 self.outMsgNums[self.outSeq % 256] = (self.next - 1) % 256
264 # do we need a NEXT flag despite not having sent any messages?
265 if not flags & FLAG_NEXT and (256 + (((self.next - 1) % 256) - self.outMsgNums[self.obSeq % 256])) % 256 > 0:
266 flags = flags | FLAG_NEXT
267 ids += chr(self.next)
269 # update stats and send packet
270 packet = chr(flags) + header + ids + missed + msgs
271 self.outSeq = (self.outSeq + 1) % 2**16
272 self.lastTransmit = time()
273 self.transport.write(packet)
276 if self.omsgq and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256:
277 reactor.callLater(0, self.sendNext)
279 reactor.callLater(1, self.sendNext)
282 def dataCameIn(self):
284 called when we get a packet bearing messages
285 delegate must do something with the messages or they will get dropped
287 self.msgDelegate('dataCameIn', (self.host, self.port, self.imsgq))
288 if hasattr(self, 'delegate') and self.delegate != None:
293 this subclass of string encapsulates each ordered message, caches it's sequence number,
294 and has comparison functions to sort by sequence number
297 if not hasattr(self, 'seq'):
298 self.seq = unpack("!H", self[0:2])[0]
300 def __lt__(self, other):
301 return (self.getseq() - other.getseq()) % 2**16 > 255
302 def __le__(self, other):
303 return (self.getseq() - other.getseq()) % 2**16 > 255 or self.__eq__(other)
304 def __eq__(self, other):
305 return self.getseq() == other.getseq()
306 def __ne__(self, other):
307 return self.getseq() != other.getseq()
308 def __gt__(self, other):
309 return (self.getseq() - other.getseq()) % 2**16 < 256 and not self.__eq__(other)
310 def __ge__(self, other):
311 return (self.getseq() - other.getseq()) % 2**16 < 256
313 class OrderedConnection(AirhookConnection):
315 this implements a simple protocol for ordered messages over airhook
316 the first two octets of each message are interpreted as a 16-bit sequence number
317 253 bytes are used for payload
319 def __init__(self, transport, addr, delegate):
320 AirhookConnection.__init__(self, transport, addr, delegate)
325 def dataCameIn(self):
327 for msg in self.imsgq:
328 insort_left(self.q, ustr(msg))
331 while self.q and self.iseq == self.q[0].getseq():
332 data += self.q[0][2:]
334 self.iseq = (self.iseq + 1) % 2**16
336 self.msgDelegate('dataCameIn', (self.host, self.port, data))
338 def sendSomeData(self, data):
339 # chop it up and queue it up
341 p = pack("!H", self.oseq) + data[:253]
342 self.omsgq.insert(0, p)
344 self.oseq = (self.oseq + 1) % 2**16