X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=krpc.py;h=8a6009250e619f88606629d21c1d142ce61bad31;hb=3268020cc0bc1551f329867afa5ea08d8198d1fe;hp=40ed862ce69d340006f0162bf2dd4bbf2cfabeb8;hpb=67ea324e46570a42861fe6a342c626738134bbfc;p=quix0rs-apt-p2p.git diff --git a/krpc.py b/krpc.py index 40ed862..8a60092 100644 --- a/krpc.py +++ b/krpc.py @@ -1,102 +1,159 @@ -import airhook -from twisted.internet.defer import Deferred -from twisted.protocols import basic +## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + from bencode import bencode, bdecode -from twisted.internet import reactor -import time +from time import asctime +import sys +from traceback import format_exception -import hash +from twisted.internet.defer import Deferred +from twisted.internet import protocol +from twisted.internet import reactor -KRPC_TIMEOUT = 30 +KRPC_TIMEOUT = 20 KRPC_ERROR = 1 KRPC_ERROR_METHOD_UNKNOWN = 2 KRPC_ERROR_RECEIVED_UNKNOWN = 3 KRPC_ERROR_TIMEOUT = 4 -class KRPC(basic.NetstringReceiver): +# commands +TID = 't' +REQ = 'q' +RSP = 'r' +TYP = 'y' +ARG = 'a' +ERR = 'e' + +class hostbroker(protocol.DatagramProtocol): + def __init__(self, server): + self.server = server + # this should be changed to storage that drops old entries + self.connections = {} + + def datagramReceived(self, datagram, addr): + #print `addr`, `datagram` + #if addr != self.addr: + c = self.connectionForAddr(addr) + c.datagramReceived(datagram, addr) + #if c.idle(): + # del self.connections[addr] + + def connectionForAddr(self, addr): + if addr == self.addr: + raise Exception + if not self.connections.has_key(addr): + conn = self.protocol(addr, self.server, self.transport) + self.connections[addr] = conn + else: + conn = self.connections[addr] + return conn + + def makeConnection(self, transport): + protocol.DatagramProtocol.makeConnection(self, transport) + tup = transport.getHost() + self.addr = (tup.host, tup.port) + +## connection +class KRPC: noisy = 1 - def __init__(self): + def __init__(self, addr, server, transport): + self.transport = transport + self.factory = server + self.addr = addr self.tids = {} + self.mtid = 0 - def stringReceived(self, str): + def datagramReceived(self, str, addr): # bdecode try: msg = bdecode(str) except Exception, e: - print "response decode error: " + `e` - self.d.errback() + if self.noisy: + print "response decode error: " + `e` else: + #if self.noisy: + # print msg # look at msg type - if msg['typ'] == 'req': + if msg[TYP] == REQ: ilen = len(str) # if request # tell factory to handle - f = getattr(self.factory ,"krpc_" + msg['req'], None) + f = getattr(self.factory ,"krpc_" + msg[REQ], None) + msg[ARG]['_krpc_sender'] = self.addr if f and callable(f): - msg['arg']['_krpc_sender'] = self.transport.addr try: - ret = apply(f, (), msg['arg']) + ret = apply(f, (), msg[ARG]) except Exception, e: ## send error - str = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`}) - olen = len(str) - self.sendString(str) + out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`}) + olen = len(out) + self.transport.write(out, addr) else: if ret: # make response - str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret}) + out = bencode({TID : msg[TID], TYP : RSP, RSP : ret}) else: - str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : []}) + out = bencode({TID : msg[TID], TYP : RSP, RSP : {}}) # send response - olen = len(str) - self.sendString(str) + olen = len(out) + self.transport.write(out, addr) else: + if self.noisy: + print "don't know about method %s" % msg[REQ] # unknown method - str = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN}) - olen = len(str) - self.sendString(str) + out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN}) + olen = len(out) + self.transport.write(out, addr) if self.noisy: - print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.transport.addr, self.factory.node.port, - ilen, msg['req'], olen) - elif msg['typ'] == 'rsp': + print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port, + ilen, msg[REQ], olen) + elif msg[TYP] == RSP: # if response # lookup tid - if self.tids.has_key(msg['tid']): - df = self.tids[msg['tid']] + if self.tids.has_key(msg[TID]): + df = self.tids[msg[TID]] # callback - df.callback(msg['rsp']) - del(self.tids[msg['tid']]) - # no tid, perhaps this transaction timed out already... - elif msg['typ'] == 'err': + del(self.tids[msg[TID]]) + df.callback({'rsp' : msg[RSP], '_krpc_sender': addr}) + else: + print 'timeout ' + `msg[RSP]['id']` + # no tid, this transaction timed out already... + elif msg[TYP] == ERR: # if error # lookup tid - df = self.tids[msg['tid']] - # callback - df.errback(msg['err']) - del(self.tids[msg['tid']]) + if self.tids.has_key(msg[TID]): + df = self.tids[msg[TID]] + # callback + df.errback(msg[ERR]) + del(self.tids[msg[TID]]) + else: + # day late and dollar short + pass else: + print "unknown message type " + `msg` # unknown message type - df = self.tids[msg['tid']] + df = self.tids[msg[TID]] # callback df.errback(KRPC_ERROR_RECEIVED_UNKNOWN) - del(self.tids[msg['tid']]) + del(self.tids[msg[TID]]) def sendRequest(self, method, args): # make message # send it - msg = {'tid' : hash.newID(), 'typ' : 'req', 'req' : method, 'arg' : args} + msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args} + self.mtid = (self.mtid + 1) % 256 str = bencode(msg) - self.sendString(str) d = Deferred() - self.tids[msg['tid']] = d - - def timeOut(tids = self.tids, id = msg['tid']): + self.tids[msg[TID]] = d + def timeOut(tids = self.tids, id = msg[TID]): if tids.has_key(id): df = tids[id] del(tids[id]) + print ">>>>>> KRPC_ERROR_TIMEOUT" df.errback(KRPC_ERROR_TIMEOUT) reactor.callLater(KRPC_TIMEOUT, timeOut) + self.transport.write(str, self.addr) return d - \ No newline at end of file +