X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_dht_Khashmir%2Fkrpc.py;h=2c1f7e83b972c1a4e8fb3cdfcacb64de4b87f2fe;hb=74e8a91349d0b55447de62ce72db692c177bcec7;hp=8a6009250e619f88606629d21c1d142ce61bad31;hpb=dd75e47b4d4ee40dae492753a226d5a42ac73c1c;p=quix0rs-apt-p2p.git diff --git a/apt_dht_Khashmir/krpc.py b/apt_dht_Khashmir/krpc.py index 8a60092..2c1f7e8 100644 --- a/apt_dht_Khashmir/krpc.py +++ b/apt_dht_Khashmir/krpc.py @@ -7,8 +7,8 @@ import sys from traceback import format_exception from twisted.internet.defer import Deferred -from twisted.internet import protocol -from twisted.internet import reactor +from twisted.internet import protocol, reactor +from twisted.trial import unittest KRPC_TIMEOUT = 20 @@ -25,6 +25,9 @@ TYP = 'y' ARG = 'a' ERR = 'e' +class ProtocolError(Exception): + pass + class hostbroker(protocol.DatagramProtocol): def __init__(self, server): self.server = server @@ -53,6 +56,11 @@ class hostbroker(protocol.DatagramProtocol): protocol.DatagramProtocol.makeConnection(self, transport) tup = transport.getHost() self.addr = (tup.host, tup.port) + + def stopProtocol(self): + for conn in self.connections.values(): + conn.stop() + protocol.DatagramProtocol.stopProtocol(self) ## connection class KRPC: @@ -63,8 +71,12 @@ class KRPC: self.addr = addr self.tids = {} self.mtid = 0 + self.stopped = False def datagramReceived(self, str, addr): + if self.stopped: + if self.noisy: + print "stopped, dropping message from", addr, str # bdecode try: msg = bdecode(str) @@ -72,8 +84,8 @@ class KRPC: if self.noisy: print "response decode error: " + `e` else: - #if self.noisy: - # print msg + if self.noisy: + print msg # look at msg type if msg[TYP] == REQ: ilen = len(str) @@ -140,6 +152,8 @@ class KRPC: del(self.tids[msg[TID]]) def sendRequest(self, method, args): + if self.stopped: + raise ProtocolError, "connection has been stopped" # make message # send it msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args} @@ -147,13 +161,118 @@ class KRPC: str = bencode(msg) d = Deferred() self.tids[msg[TID]] = d - def timeOut(tids = self.tids, id = msg[TID]): + def timeOut(tids = self.tids, id = msg[TID], msg = msg): if tids.has_key(id): df = tids[id] del(tids[id]) print ">>>>>> KRPC_ERROR_TIMEOUT" - df.errback(KRPC_ERROR_TIMEOUT) - reactor.callLater(KRPC_TIMEOUT, timeOut) + df.errback(ProtocolError('timeout waiting for %r' % msg)) + later = reactor.callLater(KRPC_TIMEOUT, timeOut) + def dropTimeOut(dict, later_call = later): + if later_call.active(): + later_call.cancel() + return dict + d.addBoth(dropTimeOut) self.transport.write(str, self.addr) return d + + def stop(self): + """Timeout all pending requests.""" + for df in self.tids.values(): + df.errback(ProtocolError('connection has been closed')) + self.tids = {} + self.stopped = True +def connectionForAddr(host, port): + return host + +class Receiver(protocol.Factory): + protocol = KRPC + def __init__(self): + self.buf = [] + def krpc_store(self, msg, _krpc_sender): + self.buf += [msg] + def krpc_echo(self, msg, _krpc_sender): + return msg + +def make(port): + af = Receiver() + a = hostbroker(af) + a.protocol = KRPC + p = reactor.listenUDP(port, a) + return af, a, p + +class KRPCTests(unittest.TestCase): + def setUp(self): + KRPC.noisy = 0 + self.af, self.a, self.ap = make(1180) + self.bf, self.b, self.bp = make(1181) + + def tearDown(self): + self.ap.stopListening() + self.bp.stopListening() + + def bufEquals(self, result, value): + self.assertEqual(self.bf.buf, value) + + def testSimpleMessage(self): + d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) + d.addCallback(self.bufEquals, ["This is a test."]) + return d + + def testMessageBlast(self): + for i in range(100): + d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) + d.addCallback(self.bufEquals, ["This is a test."] * 100) + return d + + def testEcho(self): + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg, "This is a test.") + return df + + def gotMsg(self, dict, should_be): + _krpc_sender = dict['_krpc_sender'] + msg = dict['rsp'] + self.assertEqual(msg, should_be) + + def testManyEcho(self): + for i in xrange(100): + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg, "This is a test.") + return df + + def testMultiEcho(self): + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg, "This is a test.") + + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."}) + df.addCallback(self.gotMsg, "This is another test.") + + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."}) + df.addCallback(self.gotMsg, "This is yet another test.") + + return df + + def testEchoReset(self): + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg, "This is a test.") + + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."}) + df.addCallback(self.gotMsg, "This is another test.") + df.addCallback(self.echoReset) + return df + + def echoReset(self, dict): + del(self.a.connections[('127.0.0.1', 1181)]) + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."}) + df.addCallback(self.gotMsg, "This is yet another test.") + return df + + def testUnknownMeth(self): + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."}) + df.addErrback(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN) + return df + + def gotErr(self, err, should_be): + self.assertEqual(err.value, should_be)