1 ## Airhook Protocol http://airhook.org/protocol.html
2 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
3 # see LICENSE.txt for license information
5 from random import uniform as rand
6 from struct import pack, unpack
9 from bisect import insort_left
11 from twisted.internet import protocol
12 from twisted.internet import abstract
13 from twisted.internet import reactor
14 from twisted.internet import app
15 from twisted.internet import interfaces
25 MAX_PACKET_SIZE = 1450
36 # this should be changed to storage that drops old entries
39 def datagramReceived(self, datagram, addr):
40 #print `addr`, `datagram`
41 #if addr != self.addr:
42 self.connectionForAddr(addr).datagramReceived(datagram)
44 def connectionForAddr(self, addr):
47 if not self.connections.has_key(addr):
48 conn = self.connection()
50 conn.makeConnection(self.transport)
51 conn.protocol = self.factory.buildProtocol(addr)
52 conn.protocol.makeConnection(conn)
53 self.connections[addr] = conn
55 conn = self.connections[addr]
57 def makeConnection(self, transport):
58 self.transport = transport
59 addr = transport.getHost()
60 self.addr = (addr.host, addr.port)
68 def __init__(self, msg):
70 self.oseq = ord(msg[1])
71 self.seq = unpack("!H", msg[2:4])[0]
72 self.flags = ord(msg[0])
79 if self.flags & FLAG_OBSERVED:
80 self.observed = unpack("!L", msg[skip:skip+4])[0]
82 if self.flags & FLAG_SESSION:
83 self.session = unpack("!L", msg[skip:skip+4])[0]
85 if self.flags & FLAG_NEXT:
86 self.next = ord(msg[skip])
88 if self.flags & FLAG_MISSED:
89 num = ord(msg[skip]) + 1
92 self.missed.append( ord(msg[skip+i]))
94 if self.flags & FLAG_NEXT:
95 while len(msg) - skip > 0:
96 n = ord(msg[skip]) + 1
98 self.msgs.append( msg[skip:skip+n])
101 class AirhookConnection:
103 self.outSeq = 0 # highest sequence we have sent, can't be 255 more than obSeq
104 self.observed = None # their session id
105 self.sessionID = long(rand(0, 2**32)) # our session id
107 self.lastTransmit = 0 # time we last sent a packet with messages
108 self.lastReceived = 0 # time we last received a packet with messages
109 self.lastTransmitSeq = -1 # last sequence we sent a packet
110 self.state = pending # one of pending, sent, confirmed
112 self.sendSession = None # send session/observed fields until obSeq > sendSession
113 self.response = 0 # if we know we have a response now (like resending missed packets)
115 self.resetConnection()
117 def makeConnection(self, transport):
118 self.transport = transport
120 def resetConnection(self):
122 self.outMsgs = [None] * 256 # outgoing messages (seq sent, message), index = message number
123 self.inMsg = 0 # next incoming message number
124 self.outMsgNums = [0] * 256 # outgoing message numbers i = outNum % 256
125 self.next = 0 # next outgoing message number
126 self.scheduled = 0 # a sendNext is scheduled, don't schedule another
127 self.omsgq = [] # list of messages to go out
128 self.imsgq = [] # list of messages coming in
129 self.obSeq = 0 # highest sequence confirmed by remote
130 self.inSeq = 0 # last received sequence
132 def datagramReceived(self, datagram):
137 p = AirhookPacket(datagram)
140 # check for state change
141 if self.state == pending:
142 if p.observed != None and p.session != None:
143 if p.observed == self.sessionID:
144 self.observed = p.session
145 self.state = confirmed
148 self.observed = p.session
150 elif p.session != None:
151 self.observed = p.session
155 elif self.state == sent:
156 if p.observed != None and p.session != None:
157 if p.observed == self.sessionID:
158 self.observed = p.session
159 self.sendSession = self.outSeq
160 self.state = confirmed
161 if p.session != None:
162 if not self.observed:
163 self.observed = p.session
164 elif self.observed != p.session:
166 self.observed = p.session
167 self.resetConnection()
169 if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
170 self.protocol.resetConnection()
174 elif p.session == None and p.observed == None:
178 elif self.state == confirmed:
179 if p.session != None or p.observed != None :
180 if (p.session != None and p.session != self.observed) or (p.observed != None and p.observed != self.sessionID):
182 self.observed = p.session
183 self.resetConnection()
185 if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
186 self.protocol.resetConnection()
189 # check to make sure sequence number isn't out of order
190 if (p.seq - self.inSeq) % 2**16 >= 256:
193 if self.state == confirmed:
197 # see if they need us to resend anything
199 if self.outMsgs[i] != None:
200 self.omsgq.append(self.outMsgs[i])
201 self.outMsgs[i] = None
203 # see if we missed any messages
205 missed_count = (p.next - self.inMsg) % 256
207 self.lastReceived = time()
208 for i in range(missed_count):
209 missed += [(self.outSeq, (self.inMsg + i) % 256)]
210 self.weMissed += missed
212 # record highest message number seen
213 self.inMsg = (p.next + len(p.msgs)) % 256
215 # append messages, update sequence
218 if self.state == confirmed:
219 # unpack the observed sequence
220 tseq = unpack('!H', pack('!H', self.outSeq)[0] + chr(p.oseq))[0]
221 if ((self.outSeq - tseq)) % 2**16 > 255:
222 tseq = unpack('!H', chr(ord(pack('!H', self.outSeq)[0]) - 1) + chr(p.oseq))[0]
227 self.lastReceived = time()
234 header = chr(self.inSeq & 255) + pack("!H", self.outSeq)
239 # session / observed logic
240 if self.state == pending:
241 if self.observed != None:
242 flags = flags | FLAG_OBSERVED
243 ids += pack("!L", self.observed)
244 flags = flags | FLAG_SESSION
245 ids += pack("!L", self.sessionID)
247 elif self.state == sent:
248 if self.observed != None:
249 flags = flags | FLAG_SESSION | FLAG_OBSERVED
250 ids += pack("!LL", self.observed, self.sessionID)
252 flags = flags | FLAG_SESSION
253 ids += pack("!L", self.sessionID)
256 if self.state == sent or self.sendSession != None:
257 if self.state == confirmed and (self.obSeq - self.sendSession) % 2**16 < 256:
258 self.sendSession = None
260 flags = flags | FLAG_SESSION | FLAG_OBSERVED
261 ids += pack("!LL", self.observed, self.sessionID)
265 self.weMissed = filter(lambda a: a[0] > self.obSeq, self.weMissed)
267 if len(self.weMissed) > 0:
268 flags = flags | FLAG_MISSED
269 missed += chr(len(self.weMissed) - 1)
270 for i in self.weMissed:
273 # append any outgoing messages
274 if self.state == confirmed and self.omsgq:
276 outstanding = (256 + (((self.next - 1) % 256) - self.outMsgNums[self.obSeq % 256])) % 256
277 while len(self.omsgq) and outstanding < 255 and len(self.omsgq[-1]) + len(msgs) + len(missed) + len(ids) + len(header) + 1 <= MAX_PACKET_SIZE:
278 msg = self.omsgq.pop()
279 msgs += chr(len(msg) - 1) + msg
280 self.outMsgs[self.next] = msg
281 self.next = (self.next + 1) % 256
283 # update outgoing message stat
285 flags = flags | FLAG_NEXT
287 self.lastTransmitSeq = self.outSeq
288 #self.outMsgNums[self.outSeq % 256] = first
290 self.outMsgNums[self.outSeq % 256] = (self.next - 1) % 256
292 # do we need a NEXT flag despite not having sent any messages?
293 if not flags & FLAG_NEXT and (256 + (((self.next - 1) % 256) - self.outMsgNums[self.obSeq % 256])) % 256 > 0:
294 flags = flags | FLAG_NEXT
295 ids += chr(self.next)
297 # update stats and send packet
298 packet = chr(flags) + header + ids + missed + msgs
299 self.outSeq = (self.outSeq + 1) % 2**16
300 self.lastTransmit = time()
301 self.transport.write(packet, self.addr)
306 def timeToSend(self):
307 if self.state == pending:
310 outstanding = (256 + (((self.next - 1) % 256) - self.outMsgNums[self.obSeq % 256])) % 256
311 # any outstanding messages and are we not too far ahead of our counterparty?
312 if len(self.omsgq) > 0 and self.state != sent and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256:
313 #if len(self.omsgq) > 0 and self.state != sent and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and not outstanding:
316 # do we explicitly need to send a response?
320 # have we not sent anything in a while?
321 #elif time() - self.lastTransmit > 1.0:
328 tts, t = self.timeToSend()
329 if tts and not self.scheduled:
331 reactor.callLater(t, self.sendNext)
333 def write(self, data):
334 # micropackets can only be 255 bytes or less
336 self.omsgq.insert(0, data)
339 def dataCameIn(self):
341 called when we get a packet bearing messages
343 for msg in self.imsgq:
344 self.protocol.dataReceived(msg)
349 this subclass of string encapsulates each ordered message, caches it's sequence number,
350 and has comparison functions to sort by sequence number
353 if not hasattr(self, 'seq'):
354 self.seq = unpack("!H", self[0:2])[0]
356 def __lt__(self, other):
357 return (self.getseq() - other.getseq()) % 2**16 > 255
358 def __le__(self, other):
359 return (self.getseq() - other.getseq()) % 2**16 > 255 or self.__eq__(other)
360 def __eq__(self, other):
361 return self.getseq() == other.getseq()
362 def __ne__(self, other):
363 return self.getseq() != other.getseq()
364 def __gt__(self, other):
365 return (self.getseq() - other.getseq()) % 2**16 < 256 and not self.__eq__(other)
366 def __ge__(self, other):
367 return (self.getseq() - other.getseq()) % 2**16 < 256
369 class StreamConnection(AirhookConnection):
371 this implements a simple protocol for a stream over airhook
372 this is done for convenience, instead of making it a twisted.internet.protocol....
373 the first two octets of each message are interpreted as a 16-bit sequence number
374 253 bytes are used for payload
378 AirhookConnection.__init__(self)
381 def resetStream(self):
386 def resetConnection(self):
387 AirhookConnection.resetConnection(self)
390 def loseConnection(self):
393 def dataCameIn(self):
395 for msg in self.imsgq:
396 insort_left(self.q, ustr(msg))
399 while self.q and self.iseq == self.q[0].getseq():
400 data += self.q[0][2:]
402 self.iseq = (self.iseq + 1) % 2**16
404 self.protocol.dataReceived(data)
406 def write(self, data):
407 # chop it up and queue it up
409 p = pack("!H", self.oseq) + data[:253]
410 self.omsgq.insert(0, p)
412 self.oseq = (self.oseq + 1) % 2**16
415 def writeSequence(self, sequence):
416 for data in sequence:
420 def listenAirhook(port, factory):
422 ah.connection = AirhookConnection
424 reactor.listenUDP(port, ah)
427 def listenAirhookStream(port, factory):
429 ah.connection = StreamConnection
431 reactor.listenUDP(port, ah)