X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_dht_Khashmir%2Fkrpc.py;fp=apt_dht_Khashmir%2Fkrpc.py;h=8a6009250e619f88606629d21c1d142ce61bad31;hb=dd75e47b4d4ee40dae492753a226d5a42ac73c1c;hp=0000000000000000000000000000000000000000;hpb=de97c2578c709b19e96c0419c9482a5487ab20ce;p=quix0rs-apt-p2p.git diff --git a/apt_dht_Khashmir/krpc.py b/apt_dht_Khashmir/krpc.py new file mode 100644 index 0000000..8a60092 --- /dev/null +++ b/apt_dht_Khashmir/krpc.py @@ -0,0 +1,159 @@ +## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +from bencode import bencode, bdecode +from time import asctime +import sys +from traceback import format_exception + +from twisted.internet.defer import Deferred +from twisted.internet import protocol +from twisted.internet import reactor + +KRPC_TIMEOUT = 20 + +KRPC_ERROR = 1 +KRPC_ERROR_METHOD_UNKNOWN = 2 +KRPC_ERROR_RECEIVED_UNKNOWN = 3 +KRPC_ERROR_TIMEOUT = 4 + +# commands +TID = 't' +REQ = 'q' +RSP = 'r' +TYP = 'y' +ARG = 'a' +ERR = 'e' + +class hostbroker(protocol.DatagramProtocol): + def __init__(self, server): + self.server = server + # this should be changed to storage that drops old entries + self.connections = {} + + def datagramReceived(self, datagram, addr): + #print `addr`, `datagram` + #if addr != self.addr: + c = self.connectionForAddr(addr) + c.datagramReceived(datagram, addr) + #if c.idle(): + # del self.connections[addr] + + def connectionForAddr(self, addr): + if addr == self.addr: + raise Exception + if not self.connections.has_key(addr): + conn = self.protocol(addr, self.server, self.transport) + self.connections[addr] = conn + else: + conn = self.connections[addr] + return conn + + def makeConnection(self, transport): + protocol.DatagramProtocol.makeConnection(self, transport) + tup = transport.getHost() + self.addr = (tup.host, tup.port) + +## connection +class KRPC: + noisy = 1 + def __init__(self, addr, server, transport): + self.transport = transport + self.factory = server + self.addr = addr + self.tids = {} + self.mtid = 0 + + def datagramReceived(self, str, addr): + # bdecode + try: + msg = bdecode(str) + except Exception, e: + if self.noisy: + print "response decode error: " + `e` + else: + #if self.noisy: + # print msg + # look at msg type + if msg[TYP] == REQ: + ilen = len(str) + # if request + # tell factory to handle + f = getattr(self.factory ,"krpc_" + msg[REQ], None) + msg[ARG]['_krpc_sender'] = self.addr + if f and callable(f): + try: + ret = apply(f, (), msg[ARG]) + except Exception, e: + ## send error + out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`}) + olen = len(out) + self.transport.write(out, addr) + else: + if ret: + # make response + out = bencode({TID : msg[TID], TYP : RSP, RSP : ret}) + else: + out = bencode({TID : msg[TID], TYP : RSP, RSP : {}}) + # send response + olen = len(out) + self.transport.write(out, addr) + + else: + if self.noisy: + print "don't know about method %s" % msg[REQ] + # unknown method + out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN}) + olen = len(out) + self.transport.write(out, addr) + if self.noisy: + print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port, + ilen, msg[REQ], olen) + elif msg[TYP] == RSP: + # if response + # lookup tid + if self.tids.has_key(msg[TID]): + df = self.tids[msg[TID]] + # callback + del(self.tids[msg[TID]]) + df.callback({'rsp' : msg[RSP], '_krpc_sender': addr}) + else: + print 'timeout ' + `msg[RSP]['id']` + # no tid, this transaction timed out already... + elif msg[TYP] == ERR: + # if error + # lookup tid + if self.tids.has_key(msg[TID]): + df = self.tids[msg[TID]] + # callback + df.errback(msg[ERR]) + del(self.tids[msg[TID]]) + else: + # day late and dollar short + pass + else: + print "unknown message type " + `msg` + # unknown message type + df = self.tids[msg[TID]] + # callback + df.errback(KRPC_ERROR_RECEIVED_UNKNOWN) + del(self.tids[msg[TID]]) + + def sendRequest(self, method, args): + # make message + # send it + msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args} + self.mtid = (self.mtid + 1) % 256 + str = bencode(msg) + d = Deferred() + self.tids[msg[TID]] = d + def timeOut(tids = self.tids, id = msg[TID]): + if tids.has_key(id): + df = tids[id] + del(tids[id]) + print ">>>>>> KRPC_ERROR_TIMEOUT" + df.errback(KRPC_ERROR_TIMEOUT) + reactor.callLater(KRPC_TIMEOUT, timeOut) + self.transport.write(str, self.addr) + return d +