1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """The KRPC communication protocol implementation.
6 @var KRPC_TIMEOUT: the number of seconds after which requests timeout
7 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
8 UDP packet without fragmentation
10 @var KRPC_ERROR: the code for a generic error
11 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
12 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
13 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
14 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
15 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
16 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
18 @var KRPC_ERROR_INTERNAL: the code for an internal error
19 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
20 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
21 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
23 @var TID: the identifier for the transaction ID
24 @var REQ: the identifier for a request packet
25 @var RSP: the identifier for a response packet
26 @var TYP: the identifier for the type of packet
27 @var ARG: the identifier for the argument to the request
28 @var ERR: the identifier for an error packet
30 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
31 KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
32 KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
33 KRPC_ERROR_RESPONSE_TOO_LONG
34 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
35 KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
36 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
40 from bencode import bencode, bdecode
41 from time import asctime
44 from twisted.internet.defer import Deferred
45 from twisted.internet import protocol, reactor
46 from twisted.python import log
47 from twisted.trial import unittest
49 from khash import newID
52 UDP_PACKET_LIMIT = 1472
56 KRPC_ERROR_SERVER_ERROR = 201
57 KRPC_ERROR_MALFORMED_PACKET = 202
58 KRPC_ERROR_METHOD_UNKNOWN = 203
59 KRPC_ERROR_MALFORMED_REQUEST = 204
60 KRPC_ERROR_INVALID_TOKEN = 205
61 KRPC_ERROR_RESPONSE_TOO_LONG = 206
64 KRPC_ERROR_INTERNAL = 100
65 KRPC_ERROR_RECEIVED_UNKNOWN = 101
66 KRPC_ERROR_TIMEOUT = 102
67 KRPC_ERROR_PROTOCOL_STOPPED = 103
77 class KrpcError(Exception):
78 """An error occurred in the KRPC protocol."""
81 def verifyMessage(msg):
82 """Check received message for corruption and errors.
84 @type msg: C{dictionary}
85 @param msg: the dictionary of information received on the connection
86 @raise KrpcError: if the message is corrupt
90 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
92 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
95 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
96 if type(msg[REQ]) != str:
97 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
99 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
100 if type(msg[ARG]) != dict:
101 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
102 elif msg[TYP] == RSP:
104 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
105 if type(msg[RSP]) != dict:
106 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
107 elif msg[TYP] == ERR:
109 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
110 if type(msg[ERR]) != list:
111 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
112 if len(msg[ERR]) != 2:
113 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
114 if type(msg[ERR][0]) not in (int, long):
115 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
116 if type(msg[ERR][1]) != str:
117 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
119 # raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
121 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
122 if type(msg[TID]) != str:
123 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
125 class hostbroker(protocol.DatagramProtocol):
126 """The factory for the KRPC protocol.
128 @type server: L{khashmir.Khashmir}
129 @ivar server: the main Khashmir program
130 @type config: C{dictionary}
131 @ivar config: the configuration parameters for the DHT
132 @type connections: C{dictionary}
133 @ivar connections: all the connections that have ever been made to the
134 protocol, keys are IP address and port pairs, values are L{KRPC}
135 protocols for the addresses
136 @ivar protocol: the protocol to use to handle incoming connections
138 @type addr: (C{string}, C{int})
139 @ivar addr: the IP address and port of this node
142 def __init__(self, server, config):
143 """Initialize the factory.
145 @type server: L{khashmir.Khashmir}
146 @param server: the main DHT program
147 @type config: C{dictionary}
148 @param config: the configuration parameters for the DHT
152 # this should be changed to storage that drops old entries
153 self.connections = {}
155 def datagramReceived(self, datagram, addr):
156 """Optionally create a new protocol object, and handle the new datagram.
158 @type datagram: C{string}
159 @param datagram: the data received from the transport.
160 @type addr: (C{string}, C{int})
161 @param addr: source IP address and port of datagram.
163 c = self.connectionForAddr(addr)
164 c.datagramReceived(datagram, addr)
166 # del self.connections[addr]
168 def connectionForAddr(self, addr):
169 """Get a protocol object for the source.
171 @type addr: (C{string}, C{int})
172 @param addr: source IP address and port of datagram.
174 # Don't connect to ourself
175 if addr == self.addr:
178 # Create a new protocol object if necessary
179 if not self.connections.has_key(addr):
180 conn = self.protocol(addr, self.server, self.transport, self.config['SPEW'])
181 self.connections[addr] = conn
183 conn = self.connections[addr]
186 def makeConnection(self, transport):
187 """Make a connection to a transport and save our address."""
188 protocol.DatagramProtocol.makeConnection(self, transport)
189 tup = transport.getHost()
190 self.addr = (tup.host, tup.port)
192 def stopProtocol(self):
193 """Stop all the open connections."""
194 for conn in self.connections.values():
196 protocol.DatagramProtocol.stopProtocol(self)
199 """The KRPC protocol implementation.
201 @ivar transport: the transport to use for the protocol
202 @type factory: L{khashmir.Khashmir}
203 @ivar factory: the main Khashmir program
204 @type addr: (C{string}, C{int})
205 @ivar addr: the IP address and port of the source node
206 @type noisy: C{boolean}
207 @ivar noisy: whether to log additional details of the protocol
208 @type tids: C{dictionary}
209 @ivar tids: the transaction IDs outstanding for requests, keys are the
210 transaction ID of the request, values are the deferreds to call with
212 @type stopped: C{boolean}
213 @ivar stopped: whether the protocol has been stopped
216 def __init__(self, addr, server, transport, spew = False):
217 """Initialize the protocol.
219 @type addr: (C{string}, C{int})
220 @param addr: the IP address and port of the source node
221 @type server: L{khashmir.Khashmir}
222 @param server: the main Khashmir program
223 @param transport: the transport to use for the protocol
224 @type spew: C{boolean}
225 @param spew: whether to log additional details of the protocol
226 (optional, defaults to False)
228 self.transport = transport
229 self.factory = server
235 def datagramReceived(self, data, addr):
236 """Process the new datagram.
238 @type data: C{string}
239 @param data: the data received from the transport.
240 @type addr: (C{string}, C{int})
241 @param addr: source IP address and port of datagram.
245 log.msg("stopped, dropping message from %r: %s" % (addr, data))
247 # Bdecode the message
252 log.msg("krpc bdecode error: ")
256 # Make sure the remote node isn't trying anything funny
260 log.msg("krpc message verification error: ")
265 log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
267 # Process it based on its type
271 # Requests are handled by the factory
272 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
273 msg[ARG]['_krpc_sender'] = self.addr
274 if f and callable(f):
276 ret = f(*(), **msg[ARG])
278 log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
280 olen = self._sendResponse(addr, msg[TID], ERR, [e[0], e[1]])
282 log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
284 olen = self._sendResponse(addr, msg[TID], ERR,
285 [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
287 log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
289 olen = self._sendResponse(addr, msg[TID], ERR,
290 [KRPC_ERROR_SERVER_ERROR, str(e)])
292 olen = self._sendResponse(addr, msg[TID], RSP, ret)
294 # Request for unknown method
295 log.msg("ERROR: don't know about method %s" % msg[REQ])
296 olen = self._sendResponse(addr, msg[TID], ERR,
297 [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
299 log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
300 ilen, msg[REQ], olen))
301 elif msg[TYP] == RSP:
302 # Responses get processed by their TID's deferred
303 if self.tids.has_key(msg[TID]):
304 df = self.tids[msg[TID]]
306 del(self.tids[msg[TID]])
307 df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
309 # no tid, this transaction timed out already...
311 log.msg('timeout: %r' % msg[RSP]['id'])
312 elif msg[TYP] == ERR:
313 # Errors get processed by their TID's deferred's errback
314 if self.tids.has_key(msg[TID]):
315 df = self.tids[msg[TID]]
316 del(self.tids[msg[TID]])
318 df.errback(KrpcError(*msg[ERR]))
320 # day late and dollar short, just log it
321 log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
324 # Received an unknown message type
326 log.msg("unknown message type: %r" % msg)
327 if msg[TID] in self.tids:
328 df = self.tids[msg[TID]]
329 del(self.tids[msg[TID]])
331 df.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
332 "Received an unknown message type: %r" % msg[TYP]))
334 def _sendResponse(self, addr, tid, msgType, response):
335 """Helper function for sending responses to nodes.
337 @type addr: (C{string}, C{int})
338 @param addr: source IP address and port of datagram.
339 @param tid: the transaction ID of the request
340 @param msgType: the type of message to respond with
341 @param response: the arguments for the response
347 # Create the response message
348 msg = {TID : tid, TYP : msgType, msgType : response}
351 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
355 # Make sure its not too long
356 if len(out) > UDP_PACKET_LIMIT:
357 # Can we remove some values to shorten it?
358 if 'values' in response:
359 # Save the original list of values
360 orig_values = response['values']
361 len_orig_values = len(bencode(orig_values))
363 # Caclulate the maximum value length possible
364 max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
365 assert max_len_values > 0
367 # Start with a calculation of how many values should be included
368 # (assumes all values are the same length)
369 per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
370 num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
372 # Do a linear search for the actual maximum number possible
373 bencoded_values = len(bencode(orig_values[:num_values]))
374 while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
375 bencoded_values += len(bencode(orig_values[num_values]))
377 while bencoded_values > max_len_values and num_values > 0:
379 bencoded_values -= len(bencode(orig_values[num_values]))
380 assert num_values > 0
383 response['values'] = orig_values[:num_values]
385 assert len(out) < UDP_PACKET_LIMIT
386 log.msg('Shortened a long packet from %d to %d values, new packet length: %d' %
387 (len(orig_values), num_values, len(out)))
389 # Too long a response, send an error
390 log.msg('Could not send response, too long: %d bytes' % len(out))
391 msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
395 # Unknown error, send an error message
396 msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
399 self.transport.write(out, addr)
402 def sendRequest(self, method, args):
403 """Send a request to the remote node.
405 @type method: C{string}
406 @param method: the methiod name to call on the remote node
407 @param args: the arguments to send to the remote node's method
410 raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
412 # Create the request message
413 msg = {TID : newID(), TYP : REQ, REQ : method, ARG : args}
415 log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
418 # Create the deferred and save it with the TID
420 self.tids[msg[TID]] = d
422 # Schedule a later timeout call
423 def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr):
424 """Call the deferred's errback if a timeout occurs."""
428 df.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % (method, addr)))
429 later = reactor.callLater(KRPC_TIMEOUT, timeOut)
431 # Cancel the timeout call if a response is received
432 def dropTimeOut(dict, later_call = later):
433 """Cancel the timeout call when a response is received."""
434 if later_call.active():
437 d.addBoth(dropTimeOut)
439 self.transport.write(data, self.addr)
443 """Timeout all pending requests."""
444 for df in self.tids.values():
445 df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, 'connection has been stopped while waiting for response'))
449 #{ For testing the KRPC protocol
450 def connectionForAddr(host, port):
453 class Receiver(protocol.Factory):
457 def krpc_store(self, msg, _krpc_sender):
460 def krpc_echo(self, msg, _krpc_sender):
462 def krpc_values(self, length, num, _krpc_sender):
463 return {'values': ['1'*length]*num}
467 a = hostbroker(af, {'SPEW': False})
469 p = reactor.listenUDP(port, a)
472 class KRPCTests(unittest.TestCase):
476 self.af, self.a, self.ap = make(1180)
477 self.bf, self.b, self.bp = make(1181)
480 self.ap.stopListening()
481 self.bp.stopListening()
483 def bufEquals(self, result, value):
484 self.failUnlessEqual(self.bf.buf, value)
486 def testSimpleMessage(self):
487 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
488 d.addCallback(self.bufEquals, ["This is a test."])
491 def testMessageBlast(self):
493 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
494 d.addCallback(self.bufEquals, ["This is a test."] * 100)
498 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
499 df.addCallback(self.gotMsg, "This is a test.")
502 def gotMsg(self, dict, should_be):
503 _krpc_sender = dict['_krpc_sender']
505 self.failUnlessEqual(msg['msg'], should_be)
507 def testManyEcho(self):
508 for i in xrange(100):
509 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
510 df.addCallback(self.gotMsg, "This is a test.")
513 def testMultiEcho(self):
514 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
515 df.addCallback(self.gotMsg, "This is a test.")
517 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
518 df.addCallback(self.gotMsg, "This is another test.")
520 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
521 df.addCallback(self.gotMsg, "This is yet another test.")
525 def testEchoReset(self):
526 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
527 df.addCallback(self.gotMsg, "This is a test.")
529 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
530 df.addCallback(self.gotMsg, "This is another test.")
531 df.addCallback(self.echoReset)
534 def echoReset(self, dict):
535 del(self.a.connections[('127.0.0.1', 1181)])
536 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
537 df.addCallback(self.gotMsg, "This is yet another test.")
540 def testUnknownMeth(self):
541 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
542 df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
545 def testMalformedRequest(self):
546 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
547 df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST)
550 def gotErr(self, err, should_be):
551 self.failUnlessEqual(err.value[0], should_be)
553 def testLongPackets(self):
554 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
555 df.addCallback(self.gotLongRsp)
558 def gotLongRsp(self, dict):
559 # Not quite accurate, but good enough
560 self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)