]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht_Khashmir/krpc.py
Moved the files to appropriate package directories.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / krpc.py
diff --git a/apt_dht_Khashmir/krpc.py b/apt_dht_Khashmir/krpc.py
new file mode 100644 (file)
index 0000000..8a60092
--- /dev/null
@@ -0,0 +1,159 @@
+## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
+
+from bencode import bencode, bdecode
+from time import asctime
+import sys
+from traceback import format_exception
+
+from twisted.internet.defer import Deferred
+from twisted.internet import protocol
+from twisted.internet import reactor
+
+KRPC_TIMEOUT = 20
+
+KRPC_ERROR = 1
+KRPC_ERROR_METHOD_UNKNOWN = 2
+KRPC_ERROR_RECEIVED_UNKNOWN = 3
+KRPC_ERROR_TIMEOUT = 4
+
+# commands
+TID = 't'
+REQ = 'q'
+RSP = 'r'
+TYP = 'y'
+ARG = 'a'
+ERR = 'e'
+
+class hostbroker(protocol.DatagramProtocol):       
+    def __init__(self, server):
+        self.server = server
+        # this should be changed to storage that drops old entries
+        self.connections = {}
+        
+    def datagramReceived(self, datagram, addr):
+        #print `addr`, `datagram`
+        #if addr != self.addr:
+        c = self.connectionForAddr(addr)
+        c.datagramReceived(datagram, addr)
+        #if c.idle():
+        #    del self.connections[addr]
+
+    def connectionForAddr(self, addr):
+        if addr == self.addr:
+            raise Exception
+        if not self.connections.has_key(addr):
+            conn = self.protocol(addr, self.server, self.transport)
+            self.connections[addr] = conn
+        else:
+            conn = self.connections[addr]
+        return conn
+
+    def makeConnection(self, transport):
+        protocol.DatagramProtocol.makeConnection(self, transport)
+        tup = transport.getHost()
+        self.addr = (tup.host, tup.port)
+
+## connection
+class KRPC:
+    noisy = 1
+    def __init__(self, addr, server, transport):
+        self.transport = transport
+        self.factory = server
+        self.addr = addr
+        self.tids = {}
+        self.mtid = 0
+
+    def datagramReceived(self, str, addr):
+        # bdecode
+        try:
+            msg = bdecode(str)
+        except Exception, e:
+            if self.noisy:
+                print "response decode error: " + `e`
+        else:
+            #if self.noisy:
+            #    print msg
+            # look at msg type
+            if msg[TYP]  == REQ:
+                ilen = len(str)
+                # if request
+                #      tell factory to handle
+                f = getattr(self.factory ,"krpc_" + msg[REQ], None)
+                msg[ARG]['_krpc_sender'] =  self.addr
+                if f and callable(f):
+                    try:
+                        ret = apply(f, (), msg[ARG])
+                    except Exception, e:
+                        ## send error
+                        out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
+                        olen = len(out)
+                        self.transport.write(out, addr)
+                    else:
+                        if ret:
+                            #  make response
+                            out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
+                        else:
+                            out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
+                        #      send response
+                        olen = len(out)
+                        self.transport.write(out, addr)
+
+                else:
+                    if self.noisy:
+                        print "don't know about method %s" % msg[REQ]
+                    # unknown method
+                    out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
+                    olen = len(out)
+                    self.transport.write(out, addr)
+                if self.noisy:
+                    print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port, 
+                                                    ilen, msg[REQ], olen)
+            elif msg[TYP] == RSP:
+                # if response
+                #      lookup tid
+                if self.tids.has_key(msg[TID]):
+                    df = self.tids[msg[TID]]
+                    #  callback
+                    del(self.tids[msg[TID]])
+                    df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
+                else:
+                    print 'timeout ' + `msg[RSP]['id']`
+                    # no tid, this transaction timed out already...
+            elif msg[TYP] == ERR:
+                # if error
+                #      lookup tid
+                if self.tids.has_key(msg[TID]):
+                    df = self.tids[msg[TID]]
+                    #  callback
+                    df.errback(msg[ERR])
+                    del(self.tids[msg[TID]])
+                else:
+                    # day late and dollar short
+                    pass
+            else:
+                print "unknown message type " + `msg`
+                # unknown message type
+                df = self.tids[msg[TID]]
+                #      callback
+                df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
+                del(self.tids[msg[TID]])
+                
+    def sendRequest(self, method, args):
+        # make message
+        # send it
+        msg = {TID : chr(self.mtid), TYP : REQ,  REQ : method, ARG : args}
+        self.mtid = (self.mtid + 1) % 256
+        str = bencode(msg)
+        d = Deferred()
+        self.tids[msg[TID]] = d
+        def timeOut(tids = self.tids, id = msg[TID]):
+            if tids.has_key(id):
+                df = tids[id]
+                del(tids[id])
+                print ">>>>>> KRPC_ERROR_TIMEOUT"
+                df.errback(KRPC_ERROR_TIMEOUT)
+        reactor.callLater(KRPC_TIMEOUT, timeOut)
+        self.transport.write(str, self.addr)
+        return d