]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - krpc.py
major cleanup, updated for twisted
[quix0rs-apt-p2p.git] / krpc.py
diff --git a/krpc.py b/krpc.py
index 5f277692e1abf2e73af66c6e3b1665490e030b37..24a0f16c9a06f4c703f401bda85f473f2f4fd318 100644 (file)
--- a/krpc.py
+++ b/krpc.py
@@ -5,10 +5,12 @@ import airhook
 from twisted.internet.defer import Deferred
 from twisted.protocols import basic
 from bencode import bencode, bdecode
+from twisted.internet import protocol
+
 from twisted.internet import reactor
 import time
 
-import hash
+import khash as hash
 
 KRPC_TIMEOUT = 60
 
@@ -17,110 +19,143 @@ KRPC_ERROR_METHOD_UNKNOWN = 2
 KRPC_ERROR_RECEIVED_UNKNOWN = 3
 KRPC_ERROR_TIMEOUT = 4
 
-class KRPC(basic.NetstringReceiver):
-    noisy = 1
-    def __init__(self):
-        self.tids = {}
+# commands
+TID = 'tid'
+REQ = 'req'
+RSP = 'rsp'
+TYP = 'typ'
+ARG = 'arg'
+ERR = 'err'
+
+class hostbroker(protocol.DatagramProtocol):       
+    def __init__(self, server):
+        self.noisy = 0
+        self.server = server
+        # this should be changed to storage that drops old entries
+        self.connections = {}
+        
+    def datagramReceived(self, datagram, addr):
+        #print `addr`, `datagram`
+        #if addr != self.addr:
+        c = self.connectionForAddr(addr)
+        c.datagramReceived(datagram)
+        #if c.idle():
+        #    del self.connections[addr]
 
+    def connectionForAddr(self, addr):
+        if addr == self.addr:
+            raise Exception
+        if not self.connections.has_key(addr):
+            conn = self.protocol(addr, self.server, self.transport)
+            self.connections[addr] = conn
+        else:
+            conn = self.connections[addr]
+        return conn
+
+    def makeConnection(self, transport):
+        protocol.DatagramProtocol.makeConnection(self, transport)
+        tup = transport.getHost()
+        self.addr = (tup.host, tup.port)
 
-    def dataRecieved(self, data):
-        basic.NetstringReceiver(self, data)
-        if self.brokenPeer:
-            self.resetConnection()
-            
-    def resetConnection(self):
-        self.brokenPeer = 0
-        self._readerState = basic.LENGTH
-        self._readerLength = 0
+## connection
+class KRPC:
+    noisy = 1
+    def __init__(self, addr, server, transport):
+        self.transport = transport
+        self.factory = server
+        self.addr = addr
+        self.tids = {}
 
-    def stringReceived(self, str):
+    def datagramReceived(self, str):
         # bdecode
         try:
             msg = bdecode(str)
         except Exception, e:
-            if self.naisy:
+            if self.noisy:
                 print "response decode error: " + `e`
             self.d.errback()
         else:
+            #if self.noisy:
+            #    print msg
             # look at msg type
-            if msg['typ']  == 'req':
+            if msg[TYP]  == REQ:
                 ilen = len(str)
                 # if request
                 #      tell factory to handle
-                f = getattr(self.factory ,"krpc_" + msg['req'], None)
+                f = getattr(self.factory ,"krpc_" + msg[REQ], None)
                 if f and callable(f):
-                    msg['arg']['_krpc_sender'] =  self.transport.addr
+                    msg[ARG]['_krpc_sender'] =  self.addr
                     try:
-                        ret = apply(f, (), msg['arg'])
+                        ret = apply(f, (), msg[ARG])
                     except Exception, e:
                         ## send error
-                        out = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`})
+                        out = bencode({TID:msg[TID], TYP:ERR, ERR :`e`})
                         olen = len(out)
-                        self.sendString(out)
+                        self.transport.write(out, self.addr)
                     else:
                         if ret:
                             #  make response
-                            out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
+                            out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
                         else:
-                            out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}})
+                            out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
                         #      send response
                         olen = len(out)
-                        self.sendString(out)
+                        self.transport.write(out, self.addr)
 
                 else:
                     if self.noisy:
-                        print "don't know about method %s" % msg['req']
+                        print "don't know about method %s" % msg[REQ]
                     # unknown method
-                    out = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
+                    out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
                     olen = len(out)
-                    self.sendString(out)
+                    self.transport.write(out, self.addr)
                 if self.noisy:
-                    print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.transport.addr, self.factory.node.port, 
-                                                    ilen, msg['req'], olen)
-            elif msg['typ'] == 'rsp':
+                    print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.addr, self.factory.node.port, 
+                                                    ilen, msg[REQ], olen)
+            elif msg[TYP] == RSP:
                 # if response
                 #      lookup tid
-                if self.tids.has_key(msg['tid']):
-                    df = self.tids[msg['tid']]
+                if self.tids.has_key(msg[TID]):
+                    df = self.tids[msg[TID]]
                     #  callback
-                    del(self.tids[msg['tid']])
-                    df.callback({'rsp' : msg['rsp'], '_krpc_sender': self.transport.addr})
+                    del(self.tids[msg[TID]])
+                    df.callback({RSP : msg[RSP], '_krpc_sender': self.addr})
                 else:
-                    print 'timeout ' + `msg['rsp']['sender']`
+                    print 'timeout ' + `msg[RSP]['sender']`
                     # no tid, this transaction timed out already...
-            elif msg['typ'] == 'err':
+            elif msg[TYP] == ERR:
                 # if error
                 #      lookup tid
-                if self.tids.has_key(msg['tid']):
-                    df = self.tids[msg['tid']]
+                if self.tids.has_key(msg[TID]):
+                    df = self.tids[msg[TID]]
                     #  callback
-                    df.errback(msg['err'])
-                    del(self.tids[msg['tid']])
+                    df.errback(msg[ERR])
+                    del(self.tids[msg[TID]])
                 else:
                     # day late and dollar short
                     pass
             else:
                 print "unknown message type " + `msg`
                 # unknown message type
-                df = self.tids[msg['tid']]
+                df = self.tids[msg[TID]]
                 #      callback
                 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
-                del(self.tids[msg['tid']])
+                del(self.tids[msg[TID]])
                 
     def sendRequest(self, method, args):
         # make message
         # send it
-        msg = {'tid' : hash.newID(), 'typ' : 'req',  'req' : method, 'arg' : args}
+        msg = {TID : hash.newTID(), TYP : REQ,  REQ : method, ARG : args}
         str = bencode(msg)
         d = Deferred()
-        self.tids[msg['tid']] = d
-        def timeOut(tids = self.tids, id = msg['tid']):
+        self.tids[msg[TID]] = d
+        def timeOut(tids = self.tids, id = msg[TID]):
             if tids.has_key(id):
                 df = tids[id]
                 del(tids[id])
                 print ">>>>>> KRPC_ERROR_TIMEOUT"
                 df.errback(KRPC_ERROR_TIMEOUT)
         reactor.callLater(KRPC_TIMEOUT, timeOut)
-        self.sendString(str)
+        self.transport.write(str, self.addr)
         return d