1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from bencode import bencode, bdecode
5 from time import asctime
8 from twisted.internet.defer import Deferred
9 from twisted.internet import protocol, reactor
10 from twisted.python import log
11 from twisted.trial import unittest
13 from khash import newID
16 UDP_PACKET_LIMIT = 1472
20 KRPC_ERROR_SERVER_ERROR = 201
21 KRPC_ERROR_MALFORMED_PACKET = 202
22 KRPC_ERROR_METHOD_UNKNOWN = 203
23 KRPC_ERROR_MALFORMED_REQUEST = 204
24 KRPC_ERROR_INVALID_TOKEN = 205
25 KRPC_ERROR_RESPONSE_TOO_LONG = 206
28 KRPC_ERROR_INTERNAL = 100
29 KRPC_ERROR_RECEIVED_UNKNOWN = 101
30 KRPC_ERROR_TIMEOUT = 102
31 KRPC_ERROR_PROTOCOL_STOPPED = 103
41 class KrpcError(Exception):
44 def verifyMessage(msg):
45 """Check received message for corruption and errors.
47 @type msg: C{dictionary}
48 @param msg: the dictionary of information received on the connection
49 @raise KrpcError: if the message is corrupt
53 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
55 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
58 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
59 if type(msg[REQ]) != str:
60 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
62 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
63 if type(msg[ARG]) != dict:
64 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
67 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
68 if type(msg[RSP]) != dict:
69 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
72 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
73 if type(msg[ERR]) != list:
74 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
75 if len(msg[ERR]) != 2:
76 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
77 if type(msg[ERR][0]) not in (int, long):
78 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
79 if type(msg[ERR][1]) != str:
80 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
82 # raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
84 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
85 if type(msg[TID]) != str:
86 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
88 class hostbroker(protocol.DatagramProtocol):
89 def __init__(self, server, config):
92 # this should be changed to storage that drops old entries
95 def datagramReceived(self, datagram, addr):
96 #print `addr`, `datagram`
97 #if addr != self.addr:
98 c = self.connectionForAddr(addr)
99 c.datagramReceived(datagram, addr)
101 # del self.connections[addr]
103 def connectionForAddr(self, addr):
104 if addr == self.addr:
106 if not self.connections.has_key(addr):
107 conn = self.protocol(addr, self.server, self.transport, self.config['SPEW'])
108 self.connections[addr] = conn
110 conn = self.connections[addr]
113 def makeConnection(self, transport):
114 protocol.DatagramProtocol.makeConnection(self, transport)
115 tup = transport.getHost()
116 self.addr = (tup.host, tup.port)
118 def stopProtocol(self):
119 for conn in self.connections.values():
121 protocol.DatagramProtocol.stopProtocol(self)
125 def __init__(self, addr, server, transport, spew = False):
126 self.transport = transport
127 self.factory = server
133 def datagramReceived(self, data, addr):
136 log.msg("stopped, dropping message from %r: %s" % (addr, data))
142 log.msg("krpc bdecode error: ")
149 log.msg("krpc message verification error: ")
154 log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
159 # tell factory to handle
160 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
161 msg[ARG]['_krpc_sender'] = self.addr
162 if f and callable(f):
164 ret = f(*(), **msg[ARG])
166 log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
168 olen = self._sendResponse(addr, msg[TID], ERR, [e[0], e[1]])
170 log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
172 olen = self._sendResponse(addr, msg[TID], ERR,
173 [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
175 log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
177 olen = self._sendResponse(addr, msg[TID], ERR,
178 [KRPC_ERROR_SERVER_ERROR, str(e)])
180 olen = self._sendResponse(addr, msg[TID], RSP, ret)
183 log.msg("ERROR: don't know about method %s" % msg[REQ])
184 olen = self._sendResponse(addr, msg[TID], ERR,
185 [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
187 log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
188 ilen, msg[REQ], olen))
189 elif msg[TYP] == RSP:
192 if self.tids.has_key(msg[TID]):
193 df = self.tids[msg[TID]]
195 del(self.tids[msg[TID]])
196 df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
198 # no tid, this transaction timed out already...
200 log.msg('timeout: %r' % msg[RSP]['id'])
201 elif msg[TYP] == ERR:
204 if self.tids.has_key(msg[TID]):
205 df = self.tids[msg[TID]]
206 del(self.tids[msg[TID]])
208 df.errback(KrpcError(*msg[ERR]))
210 # day late and dollar short, just log it
211 log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
215 log.msg("unknown message type: %r" % msg)
216 # unknown message type
217 if msg[TID] in self.tids:
218 df = self.tids[msg[TID]]
219 del(self.tids[msg[TID]])
221 df.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
222 "Received an unknown message type: %r" % msg[TYP]))
224 def _sendResponse(self, addr, tid, msgType, response):
229 msg = {TID : tid, TYP : msgType, msgType : response}
232 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
236 if len(out) > UDP_PACKET_LIMIT:
237 if 'values' in response:
238 # Save the original list of values
239 orig_values = response['values']
240 len_orig_values = len(bencode(orig_values))
242 # Caclulate the maximum value length possible
243 max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
244 assert max_len_values > 0
246 # Start with a calculation of how many values should be included
247 # (assumes all values are the same length)
248 per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
249 num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
251 # Do a linear search for the actual maximum number possible
252 bencoded_values = len(bencode(orig_values[:num_values]))
253 while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
254 bencoded_values += len(bencode(orig_values[num_values]))
256 while bencoded_values > max_len_values and num_values > 0:
258 bencoded_values -= len(bencode(orig_values[num_values]))
259 assert num_values > 0
262 response['values'] = orig_values[:num_values]
264 assert len(out) < UDP_PACKET_LIMIT
265 log.msg('Shortened a long packet from %d to %d values, new packet length: %d' %
266 (len(orig_values), num_values, len(out)))
268 # Too long a response, send an error
269 log.msg('Could not send response, too long: %d bytes' % len(out))
270 msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
274 # Unknown error, send an error message
275 msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
278 self.transport.write(out, addr)
281 def sendRequest(self, method, args):
283 raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
286 msg = {TID : newID(), TYP : REQ, REQ : method, ARG : args}
288 log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
291 self.tids[msg[TID]] = d
292 def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr):
296 df.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % (method, addr)))
297 later = reactor.callLater(KRPC_TIMEOUT, timeOut)
298 def dropTimeOut(dict, later_call = later):
299 if later_call.active():
302 d.addBoth(dropTimeOut)
303 self.transport.write(data, self.addr)
307 """Timeout all pending requests."""
308 for df in self.tids.values():
309 df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, 'connection has been stopped while waiting for response'))
313 def connectionForAddr(host, port):
316 class Receiver(protocol.Factory):
320 def krpc_store(self, msg, _krpc_sender):
323 def krpc_echo(self, msg, _krpc_sender):
325 def krpc_values(self, length, num, _krpc_sender):
326 return {'values': ['1'*length]*num}
330 a = hostbroker(af, {'SPEW': False})
332 p = reactor.listenUDP(port, a)
335 class KRPCTests(unittest.TestCase):
339 self.af, self.a, self.ap = make(1180)
340 self.bf, self.b, self.bp = make(1181)
343 self.ap.stopListening()
344 self.bp.stopListening()
346 def bufEquals(self, result, value):
347 self.failUnlessEqual(self.bf.buf, value)
349 def testSimpleMessage(self):
350 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
351 d.addCallback(self.bufEquals, ["This is a test."])
354 def testMessageBlast(self):
356 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
357 d.addCallback(self.bufEquals, ["This is a test."] * 100)
361 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
362 df.addCallback(self.gotMsg, "This is a test.")
365 def gotMsg(self, dict, should_be):
366 _krpc_sender = dict['_krpc_sender']
368 self.failUnlessEqual(msg['msg'], should_be)
370 def testManyEcho(self):
371 for i in xrange(100):
372 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
373 df.addCallback(self.gotMsg, "This is a test.")
376 def testMultiEcho(self):
377 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
378 df.addCallback(self.gotMsg, "This is a test.")
380 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
381 df.addCallback(self.gotMsg, "This is another test.")
383 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
384 df.addCallback(self.gotMsg, "This is yet another test.")
388 def testEchoReset(self):
389 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
390 df.addCallback(self.gotMsg, "This is a test.")
392 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
393 df.addCallback(self.gotMsg, "This is another test.")
394 df.addCallback(self.echoReset)
397 def echoReset(self, dict):
398 del(self.a.connections[('127.0.0.1', 1181)])
399 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
400 df.addCallback(self.gotMsg, "This is yet another test.")
403 def testUnknownMeth(self):
404 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
405 df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
408 def testMalformedRequest(self):
409 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
410 df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST)
413 def gotErr(self, err, should_be):
414 self.failUnlessEqual(err.value[0], should_be)
416 def testLongPackets(self):
417 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
418 df.addCallback(self.gotLongRsp)
421 def gotLongRsp(self, dict):
422 # Not quite accurate, but good enough
423 self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)