]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - airhook.py
added a simple tcp over airhook proxy, not fully tested
[quix0rs-apt-p2p.git] / airhook.py
index 96009d4a751b2fe42ab4b9feaae36a6d3a671384..397cf53e41c4eb1a8c2b4d04b14ae5e7dd5de133 100644 (file)
@@ -1,5 +1,6 @@
 ##  Airhook Protocol http://airhook.org/protocol.html
-##  Copyright 2002, Andrew Loewenstern, All Rights Reserved
+## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
 
 from random import uniform as rand
 from struct import pack, unpack
@@ -29,26 +30,39 @@ confirmed = 2
 
 
 
-class Airhook(protocol.DatagramProtocol):       
+class Airhook:       
     def __init__(self):
         self.noisy = None
         # this should be changed to storage that drops old entries
         self.connections = {}
         
     def datagramReceived(self, datagram, addr):
+        #print `addr`, `datagram`
+        #if addr != self.addr:
         self.connectionForAddr(addr).datagramReceived(datagram)
 
     def connectionForAddr(self, addr):
+        if addr == self.addr:
+            raise Exception
         if not self.connections.has_key(addr):
             conn = self.connection()
+            conn.addr = addr
+            conn.makeConnection(self.transport)
             conn.protocol = self.factory.buildProtocol(addr)
             conn.protocol.makeConnection(conn)
-            conn.makeConnection(self.transport)
-            conn.addr = addr
             self.connections[addr] = conn
         else:
             conn = self.connections[addr]
         return conn
+    def makeConnection(self, transport):
+        self.transport = transport
+        addr = transport.getHost()
+        self.addr = (addr.host, addr.port)
+
+    def doStart(self):
+        pass
+    def doStop(self):
+        pass
     
 class AirhookPacket:
     def __init__(self, msg):
@@ -84,41 +98,44 @@ class AirhookPacket:
                 self.msgs.append( msg[skip:skip+n])
                 skip += n
 
-class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConnectedTransport):
+class AirhookConnection:
     def __init__(self):        
         self.outSeq = 0  # highest sequence we have sent, can't be 255 more than obSeq
-        self.obSeq = 0   # highest sequence confirmed by remote
-        self.inSeq = 0   # last received sequence
         self.observed = None  # their session id
         self.sessionID = long(rand(0, 2**32))  # our session id
         
-        self.lastTransmit = -1  # time we last sent a packet with messages
-        self.lastReceieved = 0 # time we last received a packet with messages
+        self.lastTransmit = 0  # time we last sent a packet with messages
+        self.lastReceived = 0 # time we last received a packet with messages
         self.lastTransmitSeq = -1 # last sequence we sent a packet
-        self.state = pending
+        self.state = pending # one of pending, sent, confirmed
         
-        self.outMsgs = [None] * 256  # outgoing messages  (seq sent, message), index = message number
-        self.omsgq = [] # list of messages to go out
-        self.imsgq = [] # list of messages coming in
         self.sendSession = None  # send session/observed fields until obSeq > sendSession
         self.response = 0 # if we know we have a response now (like resending missed packets)
         self.noisy = 0
-        self.resetMessages()
-    
-    def resetMessages(self):
+        self.resetConnection()
+
+    def makeConnection(self, transport):
+        self.transport = transport
+        
+    def resetConnection(self):
         self.weMissed = []
+        self.outMsgs = [None] * 256  # outgoing messages  (seq sent, message), index = message number
         self.inMsg = 0   # next incoming message number
-        self.outMsgNums = [None] * 256 # outgoing message numbers i = outNum % 256
+        self.outMsgNums = [0] * 256 # outgoing message numbers i = outNum % 256
         self.next = 0  # next outgoing message number
+        self.scheduled = 0 # a sendNext is scheduled, don't schedule another
+        self.omsgq = [] # list of messages to go out
+        self.imsgq = [] # list of messages coming in
+        self.obSeq = 0   # highest sequence confirmed by remote
+        self.inSeq = 0   # last received sequence
 
     def datagramReceived(self, datagram):
         if not datagram:
             return
+        if self.noisy:
+            print `datagram`
         p = AirhookPacket(datagram)
         
-        # check to make sure sequence number isn't out of order
-        if (p.seq - self.inSeq) % 2**16 >= 256:
-            return
             
         # check for state change
         if self.state == pending:
@@ -126,12 +143,14 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
                 if p.observed == self.sessionID:
                     self.observed = p.session
                     self.state = confirmed
+                    self.response = 1
                 else:
-                    # bogus packet!
-                    return
+                    self.observed = p.session
+                    self.response = 1
             elif p.session != None:
                 self.observed = p.session
-                self.state = sent
+                self.response = 1
+            else:
                 self.response = 1
         elif self.state == sent:
             if p.observed != None and p.session != None:
@@ -144,18 +163,34 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
                     self.observed = p.session
                 elif self.observed != p.session:
                     self.state = pending
-                    self.resetMessages()
+                    self.observed = p.session
+                    self.resetConnection()
+                    self.response = 1
+                    if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
+                        self.protocol.resetConnection()
                     self.inSeq = p.seq
-            self.response = 1
+                    self.schedule()
+                    return
+            elif p.session == None and p.observed == None:
+                self.response = 1
+                self.schedule()
+    
         elif self.state == confirmed:
             if p.session != None or p.observed != None :
-                if p.session != self.observed or p.observed != self.sessionID:
+                if (p.session != None and p.session != self.observed) or (p.observed != None and p.observed != self.sessionID):
                     self.state = pending
-                    if seq == 0:
-                        self.resetMessages()
-                        self.inSeq = p.seq
+                    self.observed = p.session
+                    self.resetConnection()
+                    self.inSeq = p.seq
+                    if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
+                        self.protocol.resetConnection()
+                    self.schedule()
+                    return
+        # check to make sure sequence number isn't out of order
+        if (p.seq - self.inSeq) % 2**16 >= 256:
+            return
     
-        if self.state != pending:      
+        if self.state == confirmed:    
             msgs = []          
             missed = []
             
@@ -203,6 +238,9 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
         
         # session / observed logic
         if self.state == pending:
+            if self.observed != None:
+                flags = flags | FLAG_OBSERVED
+                ids +=  pack("!L", self.observed)
             flags = flags | FLAG_SESSION
             ids +=  pack("!L", self.sessionID)
             self.state = sent
@@ -215,7 +253,7 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
                 ids +=  pack("!L", self.sessionID)
 
         else:
-            if self.state == sent or self.sendSession:
+            if self.state == sent or self.sendSession != None:
                 if self.state == confirmed and (self.obSeq - self.sendSession) % 2**16 < 256:
                     self.sendSession = None
                 else:
@@ -226,7 +264,7 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
         if self.obSeq >= 0:
             self.weMissed = filter(lambda a: a[0] > self.obSeq, self.weMissed)
 
-        if self.weMissed:
+        if len(self.weMissed) > 0:
             flags = flags | FLAG_MISSED
             missed += chr(len(self.weMissed) - 1)
             for i in self.weMissed:
@@ -262,28 +300,35 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
         self.lastTransmit = time()
         self.transport.write(packet, self.addr)
         
+        self.scheduled = 0
         self.schedule()
         
     def timeToSend(self):
+        if self.state == pending:
+            return (1, 0)
+
+        outstanding = (256 + (((self.next - 1) % 256) - self.outMsgNums[self.obSeq % 256])) % 256
         # any outstanding messages and are we not too far ahead of our counterparty?
-        if self.omsgq and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256:
-            return 1
+        if len(self.omsgq) > 0 and self.state != sent and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256:
+        #if len(self.omsgq) > 0 and self.state != sent and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and not outstanding:
+            return (1, 0)
+
         # do we explicitly need to send a response?
-        elif self.response:
+        if self.response:
             self.response = 0
-            return 1
+            return (1, 0)
         # have we not sent anything in a while?
-        elif time() - self.lastTransmit > 1.0:
-            return 1
-        
+        #elif time() - self.lastTransmit > 1.0:
+        #return (1, 1)
+            
         # nothing to send
-        return 0
+        return (0, 0)
 
     def schedule(self):
-        if self.timeToSend():
-            reactor.callLater(0, self.sendNext)
-        else:
-            reactor.callLater(1, self.sendNext)
+        tts, t = self.timeToSend()
+        if tts and not self.scheduled:
+            self.scheduled = 1
+            reactor.callLater(t, self.sendNext)
         
     def write(self, data):
         # micropackets can only be 255 bytes or less
@@ -331,10 +376,20 @@ class StreamConnection(AirhookConnection):
     """
     def __init__(self):
         AirhookConnection.__init__(self)
+        self.resetStream()
+        
+    def resetStream(self):
         self.oseq = 0
         self.iseq = 0
         self.q = []
 
+    def resetConnection(self):
+        AirhookConnection.resetConnection(self)
+        self.resetStream()
+
+    def loseConnection(self):
+        pass
+    
     def dataCameIn(self):
         # put 'em together
         for msg in self.imsgq:
@@ -355,7 +410,6 @@ class StreamConnection(AirhookConnection):
             self.omsgq.insert(0, p)
             data = data[253:]
             self.oseq = (self.oseq + 1) % 2**16
-
         self.schedule()
         
     def writeSequence(self, sequence):