1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from bencode import bencode, bdecode
5 from time import asctime
7 from traceback import format_exception
9 from twisted.internet.defer import Deferred
10 from twisted.internet import protocol, reactor
11 from twisted.python import log
12 from twisted.trial import unittest
14 from khash import newID
19 KRPC_ERROR_METHOD_UNKNOWN = 2
20 KRPC_ERROR_RECEIVED_UNKNOWN = 3
21 KRPC_ERROR_TIMEOUT = 4
31 class ProtocolError(Exception):
34 class hostbroker(protocol.DatagramProtocol):
35 def __init__(self, server, config):
38 # this should be changed to storage that drops old entries
41 def datagramReceived(self, datagram, addr):
42 #print `addr`, `datagram`
43 #if addr != self.addr:
44 c = self.connectionForAddr(addr)
45 c.datagramReceived(datagram, addr)
47 # del self.connections[addr]
49 def connectionForAddr(self, addr):
52 if not self.connections.has_key(addr):
53 conn = self.protocol(addr, self.server, self.transport, self.config['SPEW'])
54 self.connections[addr] = conn
56 conn = self.connections[addr]
59 def makeConnection(self, transport):
60 protocol.DatagramProtocol.makeConnection(self, transport)
61 tup = transport.getHost()
62 self.addr = (tup.host, tup.port)
64 def stopProtocol(self):
65 for conn in self.connections.values():
67 protocol.DatagramProtocol.stopProtocol(self)
71 def __init__(self, addr, server, transport, spew = False):
72 self.transport = transport
79 def datagramReceived(self, str, addr):
82 log.msg("stopped, dropping message from %r: %s" % (addr, str))
88 log.msg("response decode error: ")
92 log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
97 # tell factory to handle
98 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
99 msg[ARG]['_krpc_sender'] = self.addr
100 if f and callable(f):
102 ret = f(*(), **msg[ARG])
104 olen = self._sendResponse(addr, msg[TID], ERR, `format_exception(type(e), e, sys.exc_info()[2])`)
106 olen = self._sendResponse(addr, msg[TID], RSP, ret)
109 log.msg("don't know about method %s" % msg[REQ])
111 olen = self._sendResponse(addr, msg[TID], ERR, KRPC_ERROR_METHOD_UNKNOWN)
113 log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
114 ilen, msg[REQ], olen))
115 elif msg[TYP] == RSP:
118 if self.tids.has_key(msg[TID]):
119 df = self.tids[msg[TID]]
121 del(self.tids[msg[TID]])
122 df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
124 # no tid, this transaction timed out already...
126 log.msg('timeout: %r' % msg[RSP]['id'])
127 elif msg[TYP] == ERR:
130 if self.tids.has_key(msg[TID]):
131 df = self.tids[msg[TID]]
134 del(self.tids[msg[TID]])
136 # day late and dollar short
140 log.msg("unknown message type: %r" % msg)
141 # unknown message type
142 df = self.tids[msg[TID]]
144 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
145 del(self.tids[msg[TID]])
147 def _sendResponse(self, addr, tid, msgType, response):
151 msg = {TID : tid, TYP : msgType, msgType : response}
154 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
157 self.transport.write(out, addr)
160 def sendRequest(self, method, args):
162 raise ProtocolError, "connection has been stopped"
165 msg = {TID : newID(), TYP : REQ, REQ : method, ARG : args}
167 log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
170 self.tids[msg[TID]] = d
171 def timeOut(tids = self.tids, id = msg[TID], msg = msg):
175 log.msg(">>>>>> KRPC_ERROR_TIMEOUT")
176 df.errback(ProtocolError('timeout waiting for %r' % msg))
177 later = reactor.callLater(KRPC_TIMEOUT, timeOut)
178 def dropTimeOut(dict, later_call = later):
179 if later_call.active():
182 d.addBoth(dropTimeOut)
183 self.transport.write(str, self.addr)
187 """Timeout all pending requests."""
188 for df in self.tids.values():
189 df.errback(ProtocolError('connection has been closed'))
193 def connectionForAddr(host, port):
196 class Receiver(protocol.Factory):
200 def krpc_store(self, msg, _krpc_sender):
202 def krpc_echo(self, msg, _krpc_sender):
207 a = hostbroker(af, {'SPEW': False})
209 p = reactor.listenUDP(port, a)
212 class KRPCTests(unittest.TestCase):
214 self.af, self.a, self.ap = make(1180)
215 self.bf, self.b, self.bp = make(1181)
218 self.ap.stopListening()
219 self.bp.stopListening()
221 def bufEquals(self, result, value):
222 self.failUnlessEqual(self.bf.buf, value)
224 def testSimpleMessage(self):
225 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
226 d.addCallback(self.bufEquals, ["This is a test."])
229 def testMessageBlast(self):
231 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
232 d.addCallback(self.bufEquals, ["This is a test."] * 100)
236 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
237 df.addCallback(self.gotMsg, "This is a test.")
240 def gotMsg(self, dict, should_be):
241 _krpc_sender = dict['_krpc_sender']
243 self.failUnlessEqual(msg, should_be)
245 def testManyEcho(self):
246 for i in xrange(100):
247 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
248 df.addCallback(self.gotMsg, "This is a test.")
251 def testMultiEcho(self):
252 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
253 df.addCallback(self.gotMsg, "This is a test.")
255 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
256 df.addCallback(self.gotMsg, "This is another test.")
258 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
259 df.addCallback(self.gotMsg, "This is yet another test.")
263 def testEchoReset(self):
264 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
265 df.addCallback(self.gotMsg, "This is a test.")
267 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
268 df.addCallback(self.gotMsg, "This is another test.")
269 df.addCallback(self.echoReset)
272 def echoReset(self, dict):
273 del(self.a.connections[('127.0.0.1', 1181)])
274 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
275 df.addCallback(self.gotMsg, "This is yet another test.")
278 def testUnknownMeth(self):
279 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
280 df.addErrback(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
283 def gotErr(self, err, should_be):
284 self.failUnlessEqual(err.value, should_be)