]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - krpc.py
Make it also work from the command-line.
[quix0rs-apt-p2p.git] / krpc.py
diff --git a/krpc.py b/krpc.py
index b2700e14b9cd74e418ed1c9c5e5518f23b36bd27..8a6009250e619f88606629d21c1d142ce61bad31 100644 (file)
--- a/krpc.py
+++ b/krpc.py
-import airhook
-from twisted.internet.defer import Deferred
-from twisted.protocols import basic
+## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
+
 from bencode import bencode, bdecode
-from twisted.internet import reactor
-import time
+from time import asctime
+import sys
+from traceback import format_exception
 
-import hash
+from twisted.internet.defer import Deferred
+from twisted.internet import protocol
+from twisted.internet import reactor
 
-KRPC_TIMEOUT = 30
+KRPC_TIMEOUT = 20
 
 KRPC_ERROR = 1
 KRPC_ERROR_METHOD_UNKNOWN = 2
 KRPC_ERROR_RECEIVED_UNKNOWN = 3
 KRPC_ERROR_TIMEOUT = 4
 
-class KRPC(basic.NetstringReceiver):
+# commands
+TID = 't'
+REQ = 'q'
+RSP = 'r'
+TYP = 'y'
+ARG = 'a'
+ERR = 'e'
+
+class hostbroker(protocol.DatagramProtocol):       
+    def __init__(self, server):
+        self.server = server
+        # this should be changed to storage that drops old entries
+        self.connections = {}
+        
+    def datagramReceived(self, datagram, addr):
+        #print `addr`, `datagram`
+        #if addr != self.addr:
+        c = self.connectionForAddr(addr)
+        c.datagramReceived(datagram, addr)
+        #if c.idle():
+        #    del self.connections[addr]
+
+    def connectionForAddr(self, addr):
+        if addr == self.addr:
+            raise Exception
+        if not self.connections.has_key(addr):
+            conn = self.protocol(addr, self.server, self.transport)
+            self.connections[addr] = conn
+        else:
+            conn = self.connections[addr]
+        return conn
+
+    def makeConnection(self, transport):
+        protocol.DatagramProtocol.makeConnection(self, transport)
+        tup = transport.getHost()
+        self.addr = (tup.host, tup.port)
+
+## connection
+class KRPC:
     noisy = 1
-    def __init__(self):
+    def __init__(self, addr, server, transport):
+        self.transport = transport
+        self.factory = server
+        self.addr = addr
         self.tids = {}
+        self.mtid = 0
 
-    def resetConnection(self):
-        self.brokenPeer = 0
-        self._readerState = basic.LENGTH
-        self._readerLength = 0
-
-    def stringReceived(self, str):
+    def datagramReceived(self, str, addr):
         # bdecode
         try:
             msg = bdecode(str)
         except Exception, e:
-            if self.naisy:
+            if self.noisy:
                 print "response decode error: " + `e`
-            self.d.errback()
         else:
+            #if self.noisy:
+            #    print msg
             # look at msg type
-            if msg['typ']  == 'req':
+            if msg[TYP]  == REQ:
                 ilen = len(str)
                 # if request
                 #      tell factory to handle
-                f = getattr(self.factory ,"krpc_" + msg['req'], None)
+                f = getattr(self.factory ,"krpc_" + msg[REQ], None)
+                msg[ARG]['_krpc_sender'] =  self.addr
                 if f and callable(f):
-                    msg['arg']['_krpc_sender'] =  self.transport.addr
                     try:
-                        ret = apply(f, (), msg['arg'])
+                        ret = apply(f, (), msg[ARG])
                     except Exception, e:
                         ## send error
-                        str = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`})
-                        olen = len(str)
-                        self.sendString(str)
+                        out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
+                        olen = len(out)
+                        self.transport.write(out, addr)
                     else:
                         if ret:
                             #  make response
-                            str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
+                            out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
                         else:
-                            str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}})
+                            out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
                         #      send response
-                        olen = len(str)
-                        self.sendString(str)
+                        olen = len(out)
+                        self.transport.write(out, addr)
 
                 else:
                     if self.noisy:
-                        print "don't know about method %s" % msg['req']
+                        print "don't know about method %s" % msg[REQ]
                     # unknown method
-                    str = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
-                    olen = len(str)
-                    self.sendString(str)
+                    out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
+                    olen = len(out)
+                    self.transport.write(out, addr)
                 if self.noisy:
-                    print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.transport.addr, self.factory.node.port, 
-                                                    ilen, msg['req'], olen)
-            elif msg['typ'] == 'rsp':
+                    print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port, 
+                                                    ilen, msg[REQ], olen)
+            elif msg[TYP] == RSP:
                 # if response
                 #      lookup tid
-                if self.tids.has_key(msg['tid']):
-                    df = self.tids[msg['tid']]
+                if self.tids.has_key(msg[TID]):
+                    df = self.tids[msg[TID]]
                     #  callback
-                    del(self.tids[msg['tid']])
-                    df.callback({'rsp' : msg['rsp'], '_krpc_sender': self.transport.addr})
-                # no tid, this transaction timed out already...
-            elif msg['typ'] == 'err':
+                    del(self.tids[msg[TID]])
+                    df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
+                else:
+                    print 'timeout ' + `msg[RSP]['id']`
+                    # no tid, this transaction timed out already...
+            elif msg[TYP] == ERR:
                 # if error
                 #      lookup tid
-                df = self.tids[msg['tid']]
-                #      callback
-                df.errback(msg['err'])
-                del(self.tids[msg['tid']])
+                if self.tids.has_key(msg[TID]):
+                    df = self.tids[msg[TID]]
+                    #  callback
+                    df.errback(msg[ERR])
+                    del(self.tids[msg[TID]])
+                else:
+                    # day late and dollar short
+                    pass
             else:
+                print "unknown message type " + `msg`
                 # unknown message type
-                df = self.tids[msg['tid']]
+                df = self.tids[msg[TID]]
                 #      callback
                 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
-                del(self.tids[msg['tid']])
+                del(self.tids[msg[TID]])
                 
     def sendRequest(self, method, args):
         # make message
         # send it
-        msg = {'tid' : hash.newID(), 'typ' : 'req',  'req' : method, 'arg' : args}
+        msg = {TID : chr(self.mtid), TYP : REQ,  REQ : method, ARG : args}
+        self.mtid = (self.mtid + 1) % 256
         str = bencode(msg)
         d = Deferred()
-        self.tids[msg['tid']] = d
-        def timeOut(tids = self.tids, id = msg['tid']):
+        self.tids[msg[TID]] = d
+        def timeOut(tids = self.tids, id = msg[TID]):
             if tids.has_key(id):
                 df = tids[id]
                 del(tids[id])
                 print ">>>>>> KRPC_ERROR_TIMEOUT"
                 df.errback(KRPC_ERROR_TIMEOUT)
         reactor.callLater(KRPC_TIMEOUT, timeOut)
-        self.sendString(str)
+        self.transport.write(str, self.addr)
         return d