1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """The KRPC communication protocol implementation.
6 @var KRPC_TIMEOUT: the number of seconds after which requests timeout
7 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
8 UDP packet without fragmentation
10 @var KRPC_ERROR: the code for a generic error
11 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
12 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
13 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
14 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
15 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
16 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
18 @var KRPC_ERROR_INTERNAL: the code for an internal error
19 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
20 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
21 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
23 @var TID: the identifier for the transaction ID
24 @var REQ: the identifier for a request packet
25 @var RSP: the identifier for a response packet
26 @var TYP: the identifier for the type of packet
27 @var ARG: the identifier for the argument to the request
28 @var ERR: the identifier for an error packet
30 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
31 KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
32 KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
33 KRPC_ERROR_RESPONSE_TOO_LONG
34 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
35 KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
36 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
40 from bencode import bencode, bdecode
41 from time import asctime
44 from twisted.internet.defer import Deferred
45 from twisted.internet import protocol, reactor
46 from twisted.python import log
47 from twisted.trial import unittest
49 from khash import newID
52 UDP_PACKET_LIMIT = 1472
56 KRPC_ERROR_SERVER_ERROR = 201
57 KRPC_ERROR_MALFORMED_PACKET = 202
58 KRPC_ERROR_METHOD_UNKNOWN = 203
59 KRPC_ERROR_MALFORMED_REQUEST = 204
60 KRPC_ERROR_INVALID_TOKEN = 205
61 KRPC_ERROR_RESPONSE_TOO_LONG = 206
64 KRPC_ERROR_INTERNAL = 100
65 KRPC_ERROR_RECEIVED_UNKNOWN = 101
66 KRPC_ERROR_TIMEOUT = 102
67 KRPC_ERROR_PROTOCOL_STOPPED = 103
77 class KrpcError(Exception):
78 """An error occurred in the KRPC protocol."""
81 def verifyMessage(msg):
82 """Check received message for corruption and errors.
84 @type msg: C{dictionary}
85 @param msg: the dictionary of information received on the connection
86 @raise KrpcError: if the message is corrupt
90 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
92 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
95 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
96 if type(msg[REQ]) != str:
97 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
99 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
100 if type(msg[ARG]) != dict:
101 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
102 elif msg[TYP] == RSP:
104 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
105 if type(msg[RSP]) != dict:
106 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
107 elif msg[TYP] == ERR:
109 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
110 if type(msg[ERR]) != list:
111 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
112 if len(msg[ERR]) != 2:
113 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
114 if type(msg[ERR][0]) not in (int, long):
115 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
116 if type(msg[ERR][1]) != str:
117 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
119 # raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
121 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
122 if type(msg[TID]) != str:
123 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
125 class hostbroker(protocol.DatagramProtocol):
126 """The factory for the KRPC protocol.
128 @type server: L{khashmir.Khashmir}
129 @ivar server: the main Khashmir program
130 @type stats: L{stats.StatsLogger}
131 @ivar stats: the statistics logger to save transport info
132 @type config: C{dictionary}
133 @ivar config: the configuration parameters for the DHT
134 @type connections: C{dictionary}
135 @ivar connections: all the connections that have ever been made to the
136 protocol, keys are IP address and port pairs, values are L{KRPC}
137 protocols for the addresses
138 @ivar protocol: the protocol to use to handle incoming connections
140 @type addr: (C{string}, C{int})
141 @ivar addr: the IP address and port of this node
144 def __init__(self, server, stats, config):
145 """Initialize the factory.
147 @type server: L{khashmir.Khashmir}
148 @param server: the main DHT program
149 @type stats: L{stats.StatsLogger}
150 @param stats: the statistics logger to save transport info
151 @type config: C{dictionary}
152 @param config: the configuration parameters for the DHT
157 # this should be changed to storage that drops old entries
158 self.connections = {}
160 def datagramReceived(self, datagram, addr):
161 """Optionally create a new protocol object, and handle the new datagram.
163 @type datagram: C{string}
164 @param datagram: the data received from the transport.
165 @type addr: (C{string}, C{int})
166 @param addr: source IP address and port of datagram.
168 c = self.connectionForAddr(addr)
169 c.datagramReceived(datagram, addr)
171 # del self.connections[addr]
173 def connectionForAddr(self, addr):
174 """Get a protocol object for the source.
176 @type addr: (C{string}, C{int})
177 @param addr: source IP address and port of datagram.
179 # Don't connect to ourself
180 if addr == self.addr:
183 # Create a new protocol object if necessary
184 if not self.connections.has_key(addr):
185 conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW'])
186 self.connections[addr] = conn
188 conn = self.connections[addr]
191 def makeConnection(self, transport):
192 """Make a connection to a transport and save our address."""
193 protocol.DatagramProtocol.makeConnection(self, transport)
194 tup = transport.getHost()
195 self.addr = (tup.host, tup.port)
197 def stopProtocol(self):
198 """Stop all the open connections."""
199 for conn in self.connections.values():
201 protocol.DatagramProtocol.stopProtocol(self)
203 class KrpcRequest(Deferred):
204 """An outstanding request to a remote node.
209 def __init__(self, protocol, newTID, method, data, initDelay):
210 Deferred.__init__(self)
211 self.protocol = protocol
215 self.delay = initDelay
219 assert not self.later, 'There is already a pending request'
220 self.later = reactor.callLater(self.delay, self.timeOut)
222 self.protocol.sendData(self.method, self.data)
225 """Call the deferred's errback if a timeout occurs."""
227 self.protocol.timeOut(self.tid, self.method)
229 def callback(self, resp):
231 Deferred.callback(self, resp)
233 def errback(self, resp):
235 Deferred.errback(self, resp)
237 def dropTimeOut(self):
238 """Cancel the timeout call when a response is received."""
239 if self.later and self.later.active():
244 """The KRPC protocol implementation.
246 @ivar transport: the transport to use for the protocol
247 @type factory: L{khashmir.Khashmir}
248 @ivar factory: the main Khashmir program
249 @type stats: L{stats.StatsLogger}
250 @ivar stats: the statistics logger to save transport info
251 @type addr: (C{string}, C{int})
252 @ivar addr: the IP address and port of the source node
253 @type noisy: C{boolean}
254 @ivar noisy: whether to log additional details of the protocol
255 @type tids: C{dictionary}
256 @ivar tids: the transaction IDs outstanding for requests, keys are the
257 transaction ID of the request, values are the deferreds to call with
259 @type stopped: C{boolean}
260 @ivar stopped: whether the protocol has been stopped
263 def __init__(self, addr, server, stats, transport, spew = False):
264 """Initialize the protocol.
266 @type addr: (C{string}, C{int})
267 @param addr: the IP address and port of the source node
268 @type server: L{khashmir.Khashmir}
269 @param server: the main Khashmir program
270 @type stats: L{stats.StatsLogger}
271 @param stats: the statistics logger to save transport info
272 @param transport: the transport to use for the protocol
273 @type spew: C{boolean}
274 @param spew: whether to log additional details of the protocol
275 (optional, defaults to False)
277 self.transport = transport
278 self.factory = server
285 def datagramReceived(self, data, addr):
286 """Process the new datagram.
288 @type data: C{string}
289 @param data: the data received from the transport.
290 @type addr: (C{string}, C{int})
291 @param addr: source IP address and port of datagram.
293 self.stats.receivedBytes(len(data))
296 log.msg("stopped, dropping message from %r: %s" % (addr, data))
298 # Bdecode the message
303 log.msg("krpc bdecode error: ")
307 # Make sure the remote node isn't trying anything funny
311 log.msg("krpc message verification error: ")
316 log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
318 # Process it based on its type
322 # Requests are handled by the factory
323 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
324 msg[ARG]['_krpc_sender'] = self.addr
325 if f and callable(f):
326 self.stats.receivedAction(msg[REQ])
328 ret = f(*(), **msg[ARG])
330 log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
332 self.stats.errorAction(msg[REQ])
333 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
336 log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
338 self.stats.errorAction(msg[REQ])
339 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
340 [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
342 log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
344 self.stats.errorAction(msg[REQ])
345 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
346 [KRPC_ERROR_SERVER_ERROR, str(e)])
348 olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
350 # Request for unknown method
351 log.msg("ERROR: don't know about method %s" % msg[REQ])
352 self.stats.receivedAction('unknown')
353 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
354 [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
356 log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
357 ilen, msg[REQ], olen))
358 elif msg[TYP] == RSP:
359 # Responses get processed by their TID's deferred
360 if self.tids.has_key(msg[TID]):
361 req = self.tids[msg[TID]]
363 del(self.tids[msg[TID]])
364 msg[RSP]['_krpc_sender'] = addr
365 req.callback(msg[RSP])
367 # no tid, this transaction timed out already...
369 log.msg('timeout: %r' % msg[RSP]['id'])
370 elif msg[TYP] == ERR:
371 # Errors get processed by their TID's deferred's errback
372 if self.tids.has_key(msg[TID]):
373 req = self.tids[msg[TID]]
374 del(self.tids[msg[TID]])
376 req.errback(KrpcError(*msg[ERR]))
378 # day late and dollar short, just log it
379 log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
382 # Received an unknown message type
384 log.msg("unknown message type: %r" % msg)
385 if msg[TID] in self.tids:
386 req = self.tids[msg[TID]]
387 del(self.tids[msg[TID]])
389 req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
390 "Received an unknown message type: %r" % msg[TYP]))
392 def _sendResponse(self, request, addr, tid, msgType, response):
393 """Helper function for sending responses to nodes.
395 @param request: the name of the requested method
396 @type addr: (C{string}, C{int})
397 @param addr: source IP address and port of datagram.
398 @param tid: the transaction ID of the request
399 @param msgType: the type of message to respond with
400 @param response: the arguments for the response
406 # Create the response message
407 msg = {TID : tid, TYP : msgType, msgType : response}
410 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
414 # Make sure its not too long
415 if len(out) > UDP_PACKET_LIMIT:
416 # Can we remove some values to shorten it?
417 if 'values' in response:
418 # Save the original list of values
419 orig_values = response['values']
420 len_orig_values = len(bencode(orig_values))
422 # Caclulate the maximum value length possible
423 max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
424 assert max_len_values > 0
426 # Start with a calculation of how many values should be included
427 # (assumes all values are the same length)
428 per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
429 num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
431 # Do a linear search for the actual maximum number possible
432 bencoded_values = len(bencode(orig_values[:num_values]))
433 while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
434 bencoded_values += len(bencode(orig_values[num_values]))
436 while bencoded_values > max_len_values and num_values > 0:
438 bencoded_values -= len(bencode(orig_values[num_values]))
439 assert num_values > 0
442 response['values'] = orig_values[:num_values]
444 assert len(out) < UDP_PACKET_LIMIT
445 log.msg('Shortened a long packet from %d to %d values, new packet length: %d' %
446 (len(orig_values), num_values, len(out)))
448 # Too long a response, send an error
449 log.msg('Could not send response, too long: %d bytes' % len(out))
450 self.stats.errorAction(request)
451 msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
455 # Unknown error, send an error message
456 self.stats.errorAction(request)
457 msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
460 self.stats.sentBytes(len(out))
461 self.transport.write(out, addr)
464 def sendRequest(self, method, args):
465 """Send a request to the remote node.
467 @type method: C{string}
468 @param method: the method name to call on the remote node
469 @param args: the arguments to send to the remote node's method
472 raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
474 # Create the request message
476 msg = {TID : newTID, TYP : REQ, REQ : method, ARG : args}
478 log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
481 # Create the deferred and save it with the TID
482 req = KrpcRequest(self, newTID, method, data, 10)
483 self.tids[newTID] = req
485 # Save the conclusion of the action
486 req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
487 callbackArgs = (method, ), errbackArgs = (method, ))
492 def sendData(self, method, data):
494 self.stats.sentAction(method)
495 self.stats.sentBytes(len(data))
497 self.transport.write(data, self.addr)
499 def timeOut(self, badTID, method):
500 """Call the deferred's errback if a timeout occurs."""
501 if badTID in self.tids:
502 req = self.tids[badTID]
503 del(self.tids[badTID])
504 req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" %
505 (method, self.addr)))
507 log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
510 """Timeout all pending requests."""
511 for req in self.tids.values():
512 req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
513 'connection has been stopped while waiting for response'))
517 #{ For testing the KRPC protocol
518 def connectionForAddr(host, port):
521 class Receiver(protocol.Factory):
525 def krpc_store(self, msg, _krpc_sender):
528 def krpc_echo(self, msg, _krpc_sender):
530 def krpc_values(self, length, num, _krpc_sender):
531 return {'values': ['1'*length]*num}
534 from stats import StatsLogger
536 a = hostbroker(af, StatsLogger(None, None, {}), {'SPEW': False})
538 p = reactor.listenUDP(port, a)
541 class KRPCTests(unittest.TestCase):
545 self.af, self.a, self.ap = make(1180)
546 self.bf, self.b, self.bp = make(1181)
549 self.ap.stopListening()
550 self.bp.stopListening()
552 def bufEquals(self, result, value):
553 self.failUnlessEqual(self.bf.buf, value)
555 def testSimpleMessage(self):
556 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
557 d.addCallback(self.bufEquals, ["This is a test."])
560 def testMessageBlast(self):
562 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
563 d.addCallback(self.bufEquals, ["This is a test."] * 100)
567 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
568 df.addCallback(self.gotMsg, "This is a test.")
571 def gotMsg(self, dict, should_be):
572 _krpc_sender = dict['_krpc_sender']
573 self.failUnlessEqual(dict['msg'], should_be)
575 def testManyEcho(self):
576 for i in xrange(100):
577 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
578 df.addCallback(self.gotMsg, "This is a test.")
581 def testMultiEcho(self):
582 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
583 df.addCallback(self.gotMsg, "This is a test.")
585 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
586 df.addCallback(self.gotMsg, "This is another test.")
588 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
589 df.addCallback(self.gotMsg, "This is yet another test.")
593 def testEchoReset(self):
594 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
595 df.addCallback(self.gotMsg, "This is a test.")
597 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
598 df.addCallback(self.gotMsg, "This is another test.")
599 df.addCallback(self.echoReset)
602 def echoReset(self, dict):
603 del(self.a.connections[('127.0.0.1', 1181)])
604 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
605 df.addCallback(self.gotMsg, "This is yet another test.")
608 def testUnknownMeth(self):
609 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
610 df = self.failUnlessFailure(df, KrpcError)
611 df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
614 def testMalformedRequest(self):
615 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
616 df = self.failUnlessFailure(df, KrpcError)
617 df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
620 def gotErr(self, value, should_be, *errorTypes):
621 self.failUnlessEqual(value[0], should_be)
623 self.flushLoggedErrors(*errorTypes)
625 def testLongPackets(self):
626 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
627 df.addCallback(self.gotLongRsp)
630 def gotLongRsp(self, dict):
631 # Not quite accurate, but good enough
632 self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)