1 ## Airhook Protocol http://airhook.org/protocol.html
2 ## Copyright 2002, Andrew Loewenstern, All Rights Reserved
4 from random import uniform as rand
5 from struct import pack, unpack
8 from bisect import insort_left
10 from twisted.internet import protocol
11 from twisted.internet import abstract
12 from twisted.internet import reactor
13 from twisted.internet import app
14 from twisted.internet import interfaces
24 MAX_PACKET_SIZE = 1450
32 class Airhook(protocol.DatagramProtocol):
35 # this should be changed to storage that drops old entries
38 def datagramReceived(self, datagram, addr):
39 self.connectionForAddr(addr).datagramReceived(datagram)
41 def connectionForAddr(self, addr):
42 if not self.connections.has_key(addr):
43 conn = self.connection()
44 conn.protocol = self.factory.buildProtocol(addr)
45 conn.protocol.makeConnection(conn)
46 conn.makeConnection(self.transport)
48 self.connections[addr] = conn
50 conn = self.connections[addr]
54 def __init__(self, msg):
56 self.oseq = ord(msg[1])
57 self.seq = unpack("!H", msg[2:4])[0]
58 self.flags = ord(msg[0])
65 if self.flags & FLAG_OBSERVED:
66 self.observed = unpack("!L", msg[skip:skip+4])[0]
68 if self.flags & FLAG_SESSION:
69 self.session = unpack("!L", msg[skip:skip+4])[0]
71 if self.flags & FLAG_NEXT:
72 self.next = ord(msg[skip])
74 if self.flags & FLAG_MISSED:
75 num = ord(msg[skip]) + 1
78 self.missed.append( ord(msg[skip+i]))
80 if self.flags & FLAG_NEXT:
81 while len(msg) - skip > 0:
82 n = ord(msg[skip]) + 1
84 self.msgs.append( msg[skip:skip+n])
87 class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConnectedTransport):
89 self.outSeq = 0 # highest sequence we have sent, can't be 255 more than obSeq
90 self.obSeq = 0 # highest sequence confirmed by remote
91 self.inSeq = 0 # last received sequence
92 self.observed = None # their session id
93 self.sessionID = long(rand(0, 2**32)) # our session id
95 self.lastTransmit = -1 # time we last sent a packet with messages
96 self.lastReceieved = 0 # time we last received a packet with messages
97 self.lastTransmitSeq = -1 # last sequence we sent a packet
100 self.outMsgs = [None] * 256 # outgoing messages (seq sent, message), index = message number
101 self.omsgq = [] # list of messages to go out
102 self.imsgq = [] # list of messages coming in
103 self.sendSession = None # send session/observed fields until obSeq > sendSession
104 self.response = 0 # if we know we have a response now (like resending missed packets)
108 def resetMessages(self):
110 self.inMsg = 0 # next incoming message number
111 self.outMsgNums = [None] * 256 # outgoing message numbers i = outNum % 256
112 self.next = 0 # next outgoing message number
114 def datagramReceived(self, datagram):
117 p = AirhookPacket(datagram)
119 # check to make sure sequence number isn't out of order
120 if (p.seq - self.inSeq) % 2**16 >= 256:
123 # check for state change
124 if self.state == pending:
125 if p.observed != None and p.session != None:
126 if p.observed == self.sessionID:
127 self.observed = p.session
128 self.state = confirmed
132 elif p.session != None:
133 self.observed = p.session
136 elif self.state == sent:
137 if p.observed != None and p.session != None:
138 if p.observed == self.sessionID:
139 self.observed = p.session
140 self.sendSession = self.outSeq
141 self.state = confirmed
142 if p.session != None:
143 if not self.observed:
144 self.observed = p.session
145 elif self.observed != p.session:
150 elif self.state == confirmed:
151 if p.session != None or p.observed != None :
152 if p.session != self.observed or p.observed != self.sessionID:
158 if self.state != pending:
162 # see if they need us to resend anything
164 if self.outMsgs[i] != None:
165 self.omsgq.append(self.outMsgs[i])
166 self.outMsgs[i] = None
168 # see if we missed any messages
170 missed_count = (p.next - self.inMsg) % 256
172 self.lastReceived = time()
173 for i in range(missed_count):
174 missed += [(self.outSeq, (self.inMsg + i) % 256)]
175 self.weMissed += missed
177 # record highest message number seen
178 self.inMsg = (p.next + len(p.msgs)) % 256
180 # append messages, update sequence
183 if self.state == confirmed:
184 # unpack the observed sequence
185 tseq = unpack('!H', pack('!H', self.outSeq)[0] + chr(p.oseq))[0]
186 if ((self.outSeq - tseq)) % 2**16 > 255:
187 tseq = unpack('!H', chr(ord(pack('!H', self.outSeq)[0]) - 1) + chr(p.oseq))[0]
192 self.lastReceived = time()
199 header = chr(self.inSeq & 255) + pack("!H", self.outSeq)
204 # session / observed logic
205 if self.state == pending:
206 flags = flags | FLAG_SESSION
207 ids += pack("!L", self.sessionID)
209 elif self.state == sent:
210 if self.observed != None:
211 flags = flags | FLAG_SESSION | FLAG_OBSERVED
212 ids += pack("!LL", self.observed, self.sessionID)
214 flags = flags | FLAG_SESSION
215 ids += pack("!L", self.sessionID)
218 if self.state == sent or self.sendSession:
219 if self.state == confirmed and (self.obSeq - self.sendSession) % 2**16 < 256:
220 self.sendSession = None
222 flags = flags | FLAG_SESSION | FLAG_OBSERVED
223 ids += pack("!LL", self.observed, self.sessionID)
227 self.weMissed = filter(lambda a: a[0] > self.obSeq, self.weMissed)
230 flags = flags | FLAG_MISSED
231 missed += chr(len(self.weMissed) - 1)
232 for i in self.weMissed:
235 # append any outgoing messages
236 if self.state == confirmed and self.omsgq:
238 outstanding = (256 + (((self.next - 1) % 256) - self.outMsgNums[self.obSeq % 256])) % 256
239 while len(self.omsgq) and outstanding < 255 and len(self.omsgq[-1]) + len(msgs) + len(missed) + len(ids) + len(header) + 1 <= MAX_PACKET_SIZE:
240 msg = self.omsgq.pop()
241 msgs += chr(len(msg) - 1) + msg
242 self.outMsgs[self.next] = msg
243 self.next = (self.next + 1) % 256
245 # update outgoing message stat
247 flags = flags | FLAG_NEXT
249 self.lastTransmitSeq = self.outSeq
250 #self.outMsgNums[self.outSeq % 256] = first
252 self.outMsgNums[self.outSeq % 256] = (self.next - 1) % 256
254 # do we need a NEXT flag despite not having sent any messages?
255 if not flags & FLAG_NEXT and (256 + (((self.next - 1) % 256) - self.outMsgNums[self.obSeq % 256])) % 256 > 0:
256 flags = flags | FLAG_NEXT
257 ids += chr(self.next)
259 # update stats and send packet
260 packet = chr(flags) + header + ids + missed + msgs
261 self.outSeq = (self.outSeq + 1) % 2**16
262 self.lastTransmit = time()
263 self.transport.write(packet, self.addr)
267 def timeToSend(self):
268 # any outstanding messages and are we not too far ahead of our counterparty?
269 if self.omsgq and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256:
271 # do we explicitly need to send a response?
275 # have we not sent anything in a while?
276 elif time() - self.lastTransmit > 1.0:
283 if self.timeToSend():
284 reactor.callLater(0, self.sendNext)
286 reactor.callLater(1, self.sendNext)
288 def write(self, data):
289 # micropackets can only be 255 bytes or less
291 self.omsgq.insert(0, data)
294 def dataCameIn(self):
296 called when we get a packet bearing messages
298 for msg in self.imsgq:
299 self.protocol.dataReceived(msg)
304 this subclass of string encapsulates each ordered message, caches it's sequence number,
305 and has comparison functions to sort by sequence number
308 if not hasattr(self, 'seq'):
309 self.seq = unpack("!H", self[0:2])[0]
311 def __lt__(self, other):
312 return (self.getseq() - other.getseq()) % 2**16 > 255
313 def __le__(self, other):
314 return (self.getseq() - other.getseq()) % 2**16 > 255 or self.__eq__(other)
315 def __eq__(self, other):
316 return self.getseq() == other.getseq()
317 def __ne__(self, other):
318 return self.getseq() != other.getseq()
319 def __gt__(self, other):
320 return (self.getseq() - other.getseq()) % 2**16 < 256 and not self.__eq__(other)
321 def __ge__(self, other):
322 return (self.getseq() - other.getseq()) % 2**16 < 256
324 class StreamConnection(AirhookConnection):
326 this implements a simple protocol for a stream over airhook
327 this is done for convenience, instead of making it a twisted.internet.protocol....
328 the first two octets of each message are interpreted as a 16-bit sequence number
329 253 bytes are used for payload
333 AirhookConnection.__init__(self)
338 def dataCameIn(self):
340 for msg in self.imsgq:
341 insort_left(self.q, ustr(msg))
344 while self.q and self.iseq == self.q[0].getseq():
345 data += self.q[0][2:]
347 self.iseq = (self.iseq + 1) % 2**16
349 self.protocol.dataReceived(data)
351 def write(self, data):
352 # chop it up and queue it up
354 p = pack("!H", self.oseq) + data[:253]
355 self.omsgq.insert(0, p)
357 self.oseq = (self.oseq + 1) % 2**16
361 def writeSequence(self, sequence):
362 for data in sequence:
366 def listenAirhook(port, factory):
368 ah.connection = AirhookConnection
370 reactor.listenUDP(port, ah)
373 def listenAirhookStream(port, factory):
375 ah.connection = StreamConnection
377 reactor.listenUDP(port, ah)