1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from bencode import bencode, bdecode
5 from time import asctime
7 from traceback import format_exception
9 from twisted.internet.defer import Deferred
10 from twisted.internet import protocol, reactor
11 from twisted.trial import unittest
13 from khash import newID
18 KRPC_ERROR_METHOD_UNKNOWN = 2
19 KRPC_ERROR_RECEIVED_UNKNOWN = 3
20 KRPC_ERROR_TIMEOUT = 4
30 class ProtocolError(Exception):
33 class hostbroker(protocol.DatagramProtocol):
34 def __init__(self, server, config):
37 # this should be changed to storage that drops old entries
40 def datagramReceived(self, datagram, addr):
41 #print `addr`, `datagram`
42 #if addr != self.addr:
43 c = self.connectionForAddr(addr)
44 c.datagramReceived(datagram, addr)
46 # del self.connections[addr]
48 def connectionForAddr(self, addr):
51 if not self.connections.has_key(addr):
52 conn = self.protocol(addr, self.server, self.transport, self.config['SPEW'])
53 self.connections[addr] = conn
55 conn = self.connections[addr]
58 def makeConnection(self, transport):
59 protocol.DatagramProtocol.makeConnection(self, transport)
60 tup = transport.getHost()
61 self.addr = (tup.host, tup.port)
63 def stopProtocol(self):
64 for conn in self.connections.values():
66 protocol.DatagramProtocol.stopProtocol(self)
70 def __init__(self, addr, server, transport, spew = False):
71 self.transport = transport
78 def datagramReceived(self, str, addr):
81 print "stopped, dropping message from", addr, str
87 print "response decode error: " + `e`
90 print self.factory.port, "received from", addr, self.addr, ":", msg
95 # tell factory to handle
96 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
97 msg[ARG]['_krpc_sender'] = self.addr
100 ret = f(*(), **msg[ARG])
102 olen = self._sendResponse(addr, msg[TID], ERR, `format_exception(type(e), e, sys.exc_info()[2])`)
104 olen = self._sendResponse(addr, msg[TID], RSP, ret)
107 print "don't know about method %s" % msg[REQ]
109 olen = self._sendResponse(addr, msg[TID], ERR, KRPC_ERROR_METHOD_UNKNOWN)
111 print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port,
112 ilen, msg[REQ], olen)
113 elif msg[TYP] == RSP:
116 if self.tids.has_key(msg[TID]):
117 df = self.tids[msg[TID]]
119 del(self.tids[msg[TID]])
120 df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
122 print 'timeout ' + `msg[RSP]['id']`
123 # no tid, this transaction timed out already...
124 elif msg[TYP] == ERR:
127 if self.tids.has_key(msg[TID]):
128 df = self.tids[msg[TID]]
131 del(self.tids[msg[TID]])
133 # day late and dollar short
136 print "unknown message type " + `msg`
137 # unknown message type
138 df = self.tids[msg[TID]]
140 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
141 del(self.tids[msg[TID]])
143 def _sendResponse(self, addr, tid, msgType, response):
147 msg = {TID : tid, TYP : msgType, msgType : response}
150 print self.factory.port, "responding to", addr, ":", msg
153 self.transport.write(out, addr)
156 def sendRequest(self, method, args):
158 raise ProtocolError, "connection has been stopped"
161 msg = {TID : newID(), TYP : REQ, REQ : method, ARG : args}
163 print self.factory.port, "sending to", self.addr, ":", msg
166 self.tids[msg[TID]] = d
167 def timeOut(tids = self.tids, id = msg[TID], msg = msg):
171 print ">>>>>> KRPC_ERROR_TIMEOUT"
172 df.errback(ProtocolError('timeout waiting for %r' % msg))
173 later = reactor.callLater(KRPC_TIMEOUT, timeOut)
174 def dropTimeOut(dict, later_call = later):
175 if later_call.active():
178 d.addBoth(dropTimeOut)
179 self.transport.write(str, self.addr)
183 """Timeout all pending requests."""
184 for df in self.tids.values():
185 df.errback(ProtocolError('connection has been closed'))
189 def connectionForAddr(host, port):
192 class Receiver(protocol.Factory):
196 def krpc_store(self, msg, _krpc_sender):
198 def krpc_echo(self, msg, _krpc_sender):
203 a = hostbroker(af, {'SPEW': False})
205 p = reactor.listenUDP(port, a)
208 class KRPCTests(unittest.TestCase):
210 self.af, self.a, self.ap = make(1180)
211 self.bf, self.b, self.bp = make(1181)
214 self.ap.stopListening()
215 self.bp.stopListening()
217 def bufEquals(self, result, value):
218 self.failUnlessEqual(self.bf.buf, value)
220 def testSimpleMessage(self):
221 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
222 d.addCallback(self.bufEquals, ["This is a test."])
225 def testMessageBlast(self):
227 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
228 d.addCallback(self.bufEquals, ["This is a test."] * 100)
232 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
233 df.addCallback(self.gotMsg, "This is a test.")
236 def gotMsg(self, dict, should_be):
237 _krpc_sender = dict['_krpc_sender']
239 self.failUnlessEqual(msg, should_be)
241 def testManyEcho(self):
242 for i in xrange(100):
243 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
244 df.addCallback(self.gotMsg, "This is a test.")
247 def testMultiEcho(self):
248 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
249 df.addCallback(self.gotMsg, "This is a test.")
251 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
252 df.addCallback(self.gotMsg, "This is another test.")
254 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
255 df.addCallback(self.gotMsg, "This is yet another test.")
259 def testEchoReset(self):
260 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
261 df.addCallback(self.gotMsg, "This is a test.")
263 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
264 df.addCallback(self.gotMsg, "This is another test.")
265 df.addCallback(self.echoReset)
268 def echoReset(self, dict):
269 del(self.a.connections[('127.0.0.1', 1181)])
270 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
271 df.addCallback(self.gotMsg, "This is yet another test.")
274 def testUnknownMeth(self):
275 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
276 df.addErrback(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
279 def gotErr(self, err, should_be):
280 self.failUnlessEqual(err.value, should_be)