X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_dht_Khashmir%2Fkrpc.py;h=441079c7c13c4a8020b98cdaaafeb51469362ca3;hb=7201466068edeef0e76aed0e088970863371ad3a;hp=8972c6c2102322acd36d4bf96a4cc7972ec1e838;hpb=eef3246c3e73078193310f8ebeb17013c83d0b2e;p=quix0rs-apt-p2p.git diff --git a/apt_dht_Khashmir/krpc.py b/apt_dht_Khashmir/krpc.py index 8972c6c..441079c 100644 --- a/apt_dht_Khashmir/krpc.py +++ b/apt_dht_Khashmir/krpc.py @@ -10,6 +10,8 @@ from twisted.internet.defer import Deferred from twisted.internet import protocol, reactor from twisted.trial import unittest +from khash import newID + KRPC_TIMEOUT = 20 KRPC_ERROR = 1 @@ -25,9 +27,13 @@ TYP = 'y' ARG = 'a' ERR = 'e' +class ProtocolError(Exception): + pass + class hostbroker(protocol.DatagramProtocol): - def __init__(self, server): + def __init__(self, server, config): self.server = server + self.config = config # this should be changed to storage that drops old entries self.connections = {} @@ -43,7 +49,7 @@ class hostbroker(protocol.DatagramProtocol): if addr == self.addr: raise Exception if not self.connections.has_key(addr): - conn = self.protocol(addr, self.server, self.transport) + conn = self.protocol(addr, self.server, self.transport, self.config['SPEW']) self.connections[addr] = conn else: conn = self.connections[addr] @@ -53,18 +59,26 @@ class hostbroker(protocol.DatagramProtocol): protocol.DatagramProtocol.makeConnection(self, transport) tup = transport.getHost() self.addr = (tup.host, tup.port) + + def stopProtocol(self): + for conn in self.connections.values(): + conn.stop() + protocol.DatagramProtocol.stopProtocol(self) ## connection class KRPC: - noisy = 1 - def __init__(self, addr, server, transport): + def __init__(self, addr, server, transport, spew = False): self.transport = transport self.factory = server self.addr = addr + self.noisy = spew self.tids = {} - self.mtid = 0 + self.stopped = False def datagramReceived(self, str, addr): + if self.stopped: + if self.noisy: + print "stopped, dropping message from", addr, str # bdecode try: msg = bdecode(str) @@ -73,7 +87,7 @@ class KRPC: print "response decode error: " + `e` else: if self.noisy: - print msg + print self.factory.port, "received from", addr, self.addr, ":", msg # look at msg type if msg[TYP] == REQ: ilen = len(str) @@ -83,29 +97,16 @@ class KRPC: msg[ARG]['_krpc_sender'] = self.addr if f and callable(f): try: - ret = apply(f, (), msg[ARG]) + ret = f(*(), **msg[ARG]) except Exception, e: - ## send error - out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`}) - olen = len(out) - self.transport.write(out, addr) + olen = self._sendResponse(addr, msg[TID], ERR, `format_exception(type(e), e, sys.exc_info()[2])`) else: - if ret: - # make response - out = bencode({TID : msg[TID], TYP : RSP, RSP : ret}) - else: - out = bencode({TID : msg[TID], TYP : RSP, RSP : {}}) - # send response - olen = len(out) - self.transport.write(out, addr) - + olen = self._sendResponse(addr, msg[TID], RSP, ret) else: if self.noisy: print "don't know about method %s" % msg[REQ] # unknown method - out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN}) - olen = len(out) - self.transport.write(out, addr) + olen = self._sendResponse(addr, msg[TID], ERR, KRPC_ERROR_METHOD_UNKNOWN) if self.noisy: print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port, ilen, msg[REQ], olen) @@ -139,20 +140,36 @@ class KRPC: df.errback(KRPC_ERROR_RECEIVED_UNKNOWN) del(self.tids[msg[TID]]) + def _sendResponse(self, addr, tid, msgType, response): + if not response: + response = {} + + msg = {TID : tid, TYP : msgType, msgType : response} + + if self.noisy: + print self.factory.port, "responding to", addr, ":", msg + + out = bencode(msg) + self.transport.write(out, addr) + return len(out) + def sendRequest(self, method, args): + if self.stopped: + raise ProtocolError, "connection has been stopped" # make message # send it - msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args} - self.mtid = (self.mtid + 1) % 256 + msg = {TID : newID(), TYP : REQ, REQ : method, ARG : args} + if self.noisy: + print self.factory.port, "sending to", self.addr, ":", msg str = bencode(msg) d = Deferred() self.tids[msg[TID]] = d - def timeOut(tids = self.tids, id = msg[TID]): + def timeOut(tids = self.tids, id = msg[TID], msg = msg): if tids.has_key(id): df = tids[id] del(tids[id]) print ">>>>>> KRPC_ERROR_TIMEOUT" - df.errback(KRPC_ERROR_TIMEOUT) + df.errback(ProtocolError('timeout waiting for %r' % msg)) later = reactor.callLater(KRPC_TIMEOUT, timeOut) def dropTimeOut(dict, later_call = later): if later_call.active(): @@ -161,6 +178,13 @@ class KRPC: d.addBoth(dropTimeOut) self.transport.write(str, self.addr) return d + + def stop(self): + """Timeout all pending requests.""" + for df in self.tids.values(): + df.errback(ProtocolError('connection has been closed')) + self.tids = {} + self.stopped = True def connectionForAddr(host, port): return host @@ -176,14 +200,13 @@ class Receiver(protocol.Factory): def make(port): af = Receiver() - a = hostbroker(af) + a = hostbroker(af, {'SPEW': False}) a.protocol = KRPC p = reactor.listenUDP(port, a) return af, a, p class KRPCTests(unittest.TestCase): def setUp(self): - KRPC.noisy = 0 self.af, self.a, self.ap = make(1180) self.bf, self.b, self.bp = make(1181) @@ -192,7 +215,7 @@ class KRPCTests(unittest.TestCase): self.bp.stopListening() def bufEquals(self, result, value): - self.assertEqual(self.bf.buf, value) + self.failUnlessEqual(self.bf.buf, value) def testSimpleMessage(self): d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) @@ -213,7 +236,7 @@ class KRPCTests(unittest.TestCase): def gotMsg(self, dict, should_be): _krpc_sender = dict['_krpc_sender'] msg = dict['rsp'] - self.assertEqual(msg, should_be) + self.failUnlessEqual(msg, should_be) def testManyEcho(self): for i in xrange(100): @@ -254,4 +277,4 @@ class KRPCTests(unittest.TestCase): return df def gotErr(self, err, should_be): - self.assertEqual(err.value, should_be) + self.failUnlessEqual(err.value, should_be)