1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from bencode import bencode, bdecode
5 from time import asctime
7 from traceback import format_exception
9 from twisted.internet.defer import Deferred
10 from twisted.internet import protocol, reactor
11 from twisted.trial import unittest
16 KRPC_ERROR_METHOD_UNKNOWN = 2
17 KRPC_ERROR_RECEIVED_UNKNOWN = 3
18 KRPC_ERROR_TIMEOUT = 4
28 class ProtocolError(Exception):
31 class hostbroker(protocol.DatagramProtocol):
32 def __init__(self, server):
34 # this should be changed to storage that drops old entries
37 def datagramReceived(self, datagram, addr):
38 #print `addr`, `datagram`
39 #if addr != self.addr:
40 c = self.connectionForAddr(addr)
41 c.datagramReceived(datagram, addr)
43 # del self.connections[addr]
45 def connectionForAddr(self, addr):
48 if not self.connections.has_key(addr):
49 conn = self.protocol(addr, self.server, self.transport)
50 self.connections[addr] = conn
52 conn = self.connections[addr]
55 def makeConnection(self, transport):
56 protocol.DatagramProtocol.makeConnection(self, transport)
57 tup = transport.getHost()
58 self.addr = (tup.host, tup.port)
60 def stopProtocol(self):
61 for conn in self.connections.values():
63 protocol.DatagramProtocol.stopProtocol(self)
68 def __init__(self, addr, server, transport):
69 self.transport = transport
76 def datagramReceived(self, str, addr):
79 print "stopped, dropping message from", addr, str
85 print "response decode error: " + `e`
88 print self.factory.port, "received from", addr, self.addr, ":", msg
93 # tell factory to handle
94 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
95 msg[ARG]['_krpc_sender'] = self.addr
98 ret = apply(f, (), msg[ARG])
101 out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
104 print self.factory.port, "responding to", addr, self.addr, ":", out
105 self.transport.write(out, addr)
109 out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
111 out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
115 print self.factory.port, "responding to", addr, self.addr, ":", out
116 self.transport.write(out, addr)
120 print "don't know about method %s" % msg[REQ]
122 out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
125 print self.factory.port, "responding to", addr, self.addr, ":", out
126 self.transport.write(out, addr)
128 print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port,
129 ilen, msg[REQ], olen)
130 elif msg[TYP] == RSP:
133 if self.tids.has_key(msg[TID]):
134 df = self.tids[msg[TID]]
136 del(self.tids[msg[TID]])
137 df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
139 print 'timeout ' + `msg[RSP]['id']`
140 # no tid, this transaction timed out already...
141 elif msg[TYP] == ERR:
144 if self.tids.has_key(msg[TID]):
145 df = self.tids[msg[TID]]
148 del(self.tids[msg[TID]])
150 # day late and dollar short
153 print "unknown message type " + `msg`
154 # unknown message type
155 df = self.tids[msg[TID]]
157 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
158 del(self.tids[msg[TID]])
160 def sendRequest(self, method, args):
162 raise ProtocolError, "connection has been stopped"
165 msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args}
166 self.mtid = (self.mtid + 1) % 256
168 print self.factory.port, "sending to", self.addr, ":", msg
171 self.tids[msg[TID]] = d
172 def timeOut(tids = self.tids, id = msg[TID], msg = msg):
176 print ">>>>>> KRPC_ERROR_TIMEOUT"
177 df.errback(ProtocolError('timeout waiting for %r' % msg))
178 later = reactor.callLater(KRPC_TIMEOUT, timeOut)
179 def dropTimeOut(dict, later_call = later):
180 if later_call.active():
183 d.addBoth(dropTimeOut)
184 self.transport.write(str, self.addr)
188 """Timeout all pending requests."""
189 for df in self.tids.values():
190 df.errback(ProtocolError('connection has been closed'))
194 def connectionForAddr(host, port):
197 class Receiver(protocol.Factory):
201 def krpc_store(self, msg, _krpc_sender):
203 def krpc_echo(self, msg, _krpc_sender):
210 p = reactor.listenUDP(port, a)
213 class KRPCTests(unittest.TestCase):
216 self.af, self.a, self.ap = make(1180)
217 self.bf, self.b, self.bp = make(1181)
220 self.ap.stopListening()
221 self.bp.stopListening()
223 def bufEquals(self, result, value):
224 self.assertEqual(self.bf.buf, value)
226 def testSimpleMessage(self):
227 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
228 d.addCallback(self.bufEquals, ["This is a test."])
231 def testMessageBlast(self):
233 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
234 d.addCallback(self.bufEquals, ["This is a test."] * 100)
238 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
239 df.addCallback(self.gotMsg, "This is a test.")
242 def gotMsg(self, dict, should_be):
243 _krpc_sender = dict['_krpc_sender']
245 self.assertEqual(msg, should_be)
247 def testManyEcho(self):
248 for i in xrange(100):
249 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
250 df.addCallback(self.gotMsg, "This is a test.")
253 def testMultiEcho(self):
254 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
255 df.addCallback(self.gotMsg, "This is a test.")
257 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
258 df.addCallback(self.gotMsg, "This is another test.")
260 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
261 df.addCallback(self.gotMsg, "This is yet another test.")
265 def testEchoReset(self):
266 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
267 df.addCallback(self.gotMsg, "This is a test.")
269 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
270 df.addCallback(self.gotMsg, "This is another test.")
271 df.addCallback(self.echoReset)
274 def echoReset(self, dict):
275 del(self.a.connections[('127.0.0.1', 1181)])
276 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
277 df.addCallback(self.gotMsg, "This is yet another test.")
280 def testUnknownMeth(self):
281 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
282 df.addErrback(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
285 def gotErr(self, err, should_be):
286 self.assertEqual(err.value, should_be)