from traceback import format_exception
from twisted.internet.defer import Deferred
-from twisted.internet import protocol
-from twisted.internet import reactor
+from twisted.internet import protocol, reactor
+from twisted.trial import unittest
KRPC_TIMEOUT = 20
ARG = 'a'
ERR = 'e'
+class ProtocolError(Exception):
+ pass
+
class hostbroker(protocol.DatagramProtocol):
def __init__(self, server):
self.server = server
protocol.DatagramProtocol.makeConnection(self, transport)
tup = transport.getHost()
self.addr = (tup.host, tup.port)
+
+ def stopProtocol(self):
+ for conn in self.connections.values():
+ conn.stop()
+ protocol.DatagramProtocol.stopProtocol(self)
## connection
class KRPC:
self.addr = addr
self.tids = {}
self.mtid = 0
+ self.stopped = False
def datagramReceived(self, str, addr):
+ if self.stopped:
+ if self.noisy:
+ print "stopped, dropping message from", addr, str
# bdecode
try:
msg = bdecode(str)
if self.noisy:
print "response decode error: " + `e`
else:
- #if self.noisy:
- # print msg
+ if self.noisy:
+ print msg
# look at msg type
if msg[TYP] == REQ:
ilen = len(str)
del(self.tids[msg[TID]])
def sendRequest(self, method, args):
+ if self.stopped:
+ raise ProtocolError, "connection has been stopped"
# make message
# send it
msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args}
str = bencode(msg)
d = Deferred()
self.tids[msg[TID]] = d
- def timeOut(tids = self.tids, id = msg[TID]):
+ def timeOut(tids = self.tids, id = msg[TID], msg = msg):
if tids.has_key(id):
df = tids[id]
del(tids[id])
print ">>>>>> KRPC_ERROR_TIMEOUT"
- df.errback(KRPC_ERROR_TIMEOUT)
- reactor.callLater(KRPC_TIMEOUT, timeOut)
+ df.errback(ProtocolError('timeout waiting for %r' % msg))
+ later = reactor.callLater(KRPC_TIMEOUT, timeOut)
+ def dropTimeOut(dict, later_call = later):
+ if later_call.active():
+ later_call.cancel()
+ return dict
+ d.addBoth(dropTimeOut)
self.transport.write(str, self.addr)
return d
+
+ def stop(self):
+ """Timeout all pending requests."""
+ for df in self.tids.values():
+ df.errback(ProtocolError('connection has been closed'))
+ self.tids = {}
+ self.stopped = True
+def connectionForAddr(host, port):
+ return host
+
+class Receiver(protocol.Factory):
+ protocol = KRPC
+ def __init__(self):
+ self.buf = []
+ def krpc_store(self, msg, _krpc_sender):
+ self.buf += [msg]
+ def krpc_echo(self, msg, _krpc_sender):
+ return msg
+
+def make(port):
+ af = Receiver()
+ a = hostbroker(af)
+ a.protocol = KRPC
+ p = reactor.listenUDP(port, a)
+ return af, a, p
+
+class KRPCTests(unittest.TestCase):
+ def setUp(self):
+ KRPC.noisy = 0
+ self.af, self.a, self.ap = make(1180)
+ self.bf, self.b, self.bp = make(1181)
+
+ def tearDown(self):
+ self.ap.stopListening()
+ self.bp.stopListening()
+
+ def bufEquals(self, result, value):
+ self.assertEqual(self.bf.buf, value)
+
+ def testSimpleMessage(self):
+ d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
+ d.addCallback(self.bufEquals, ["This is a test."])
+ return d
+
+ def testMessageBlast(self):
+ for i in range(100):
+ d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
+ d.addCallback(self.bufEquals, ["This is a test."] * 100)
+ return d
+
+ def testEcho(self):
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg, "This is a test.")
+ return df
+
+ def gotMsg(self, dict, should_be):
+ _krpc_sender = dict['_krpc_sender']
+ msg = dict['rsp']
+ self.assertEqual(msg, should_be)
+
+ def testManyEcho(self):
+ for i in xrange(100):
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg, "This is a test.")
+ return df
+
+ def testMultiEcho(self):
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg, "This is a test.")
+
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
+ df.addCallback(self.gotMsg, "This is another test.")
+
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
+ df.addCallback(self.gotMsg, "This is yet another test.")
+
+ return df
+
+ def testEchoReset(self):
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg, "This is a test.")
+
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
+ df.addCallback(self.gotMsg, "This is another test.")
+ df.addCallback(self.echoReset)
+ return df
+
+ def echoReset(self, dict):
+ del(self.a.connections[('127.0.0.1', 1181)])
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
+ df.addCallback(self.gotMsg, "This is yet another test.")
+ return df
+
+ def testUnknownMeth(self):
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
+ df.addErrback(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
+ return df
+
+ def gotErr(self, err, should_be):
+ self.assertEqual(err.value, should_be)