1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """The KRPC communication protocol implementation.
6 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
7 UDP packet without fragmentation
9 @var KRPC_ERROR: the code for a generic error
10 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
11 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
12 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
13 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
14 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
15 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
17 @var KRPC_ERROR_INTERNAL: the code for an internal error
18 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
19 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
20 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
22 @var TID: the identifier for the transaction ID
23 @var REQ: the identifier for a request packet
24 @var RSP: the identifier for a response packet
25 @var TYP: the identifier for the type of packet
26 @var ARG: the identifier for the argument to the request
27 @var ERR: the identifier for an error packet
29 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
30 KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
31 KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
32 KRPC_ERROR_RESPONSE_TOO_LONG
33 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
34 KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
35 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
39 from bencode import bencode, bdecode
40 from datetime import datetime, timedelta
43 from twisted.internet import defer
44 from twisted.internet import protocol, reactor
45 from twisted.python import log
46 from twisted.trial import unittest
48 from khash import newID
50 UDP_PACKET_LIMIT = 1472
54 KRPC_ERROR_SERVER_ERROR = 201
55 KRPC_ERROR_MALFORMED_PACKET = 202
56 KRPC_ERROR_METHOD_UNKNOWN = 203
57 KRPC_ERROR_MALFORMED_REQUEST = 204
58 KRPC_ERROR_INVALID_TOKEN = 205
59 KRPC_ERROR_RESPONSE_TOO_LONG = 206
62 KRPC_ERROR_INTERNAL = 100
63 KRPC_ERROR_RECEIVED_UNKNOWN = 101
64 KRPC_ERROR_TIMEOUT = 102
65 KRPC_ERROR_PROTOCOL_STOPPED = 103
75 class KrpcError(Exception):
76 """An error occurred in the KRPC protocol."""
79 def verifyMessage(msg):
80 """Check received message for corruption and errors.
82 @type msg: C{dictionary}
83 @param msg: the dictionary of information received on the connection
84 @raise KrpcError: if the message is corrupt
88 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
90 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
93 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
94 if type(msg[REQ]) != str:
95 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
97 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
98 if type(msg[ARG]) != dict:
99 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
100 elif msg[TYP] == RSP:
102 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
103 if type(msg[RSP]) != dict:
104 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
105 elif msg[TYP] == ERR:
107 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
108 if type(msg[ERR]) != list:
109 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
110 if len(msg[ERR]) != 2:
111 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
112 if type(msg[ERR][0]) not in (int, long):
113 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
114 if type(msg[ERR][1]) != str:
115 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
117 # raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
119 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
120 if type(msg[TID]) != str:
121 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
123 class hostbroker(protocol.DatagramProtocol):
124 """The factory for the KRPC protocol.
126 @type server: L{khashmir.Khashmir}
127 @ivar server: the main Khashmir program
128 @type stats: L{stats.StatsLogger}
129 @ivar stats: the statistics logger to save transport info
130 @type config: C{dictionary}
131 @ivar config: the configuration parameters for the DHT
132 @type connections: C{dictionary}
133 @ivar connections: all the connections that have ever been made to the
134 protocol, keys are IP address and port pairs, values are L{KRPC}
135 protocols for the addresses
136 @ivar protocol: the protocol to use to handle incoming connections
138 @type addr: (C{string}, C{int})
139 @ivar addr: the IP address and port of this node
142 def __init__(self, server, stats, config):
143 """Initialize the factory.
145 @type server: L{khashmir.Khashmir}
146 @param server: the main DHT program
147 @type stats: L{stats.StatsLogger}
148 @param stats: the statistics logger to save transport info
149 @type config: C{dictionary}
150 @param config: the configuration parameters for the DHT
155 # this should be changed to storage that drops old entries
156 self.connections = {}
158 def datagramReceived(self, datagram, addr):
159 """Optionally create a new protocol object, and handle the new datagram.
161 @type datagram: C{string}
162 @param datagram: the data received from the transport.
163 @type addr: (C{string}, C{int})
164 @param addr: source IP address and port of datagram.
166 c = self.connectionForAddr(addr)
167 c.datagramReceived(datagram, addr)
169 # del self.connections[addr]
171 def connectionForAddr(self, addr):
172 """Get a protocol object for the source.
174 @type addr: (C{string}, C{int})
175 @param addr: source IP address and port of datagram.
177 # Don't connect to ourself
178 if addr == self.addr:
181 # Create a new protocol object if necessary
182 if not self.connections.has_key(addr):
183 conn = self.protocol(addr, self.server, self.stats, self.transport, self.config)
184 self.connections[addr] = conn
186 conn = self.connections[addr]
189 def makeConnection(self, transport):
190 """Make a connection to a transport and save our address."""
191 protocol.DatagramProtocol.makeConnection(self, transport)
192 tup = transport.getHost()
193 self.addr = (tup.host, tup.port)
195 def stopProtocol(self):
196 """Stop all the open connections."""
197 for conn in self.connections.values():
199 protocol.DatagramProtocol.stopProtocol(self)
201 class KrpcRequest(defer.Deferred):
202 """An outstanding request to a remote node.
204 @type protocol: L{KRPC}
205 @ivar protocol: the protocol to send data with
206 @ivar tid: the transaction ID of the request
207 @type method: C{string}
208 @ivar method: the name of the method to call on the remote node
209 @type data: C{string}
210 @ivar data: the message to send to the remote node
211 @type config: C{dictionary}
212 @ivar config: the configuration parameters for the DHT
214 @ivar delay: the last timeout delay sent
215 @type start: C{datetime}
216 @ivar start: the time to request was started at
217 @type later: L{twisted.internet.interfaces.IDelayedCall}
218 @ivar later: the pending call to timeout the last sent request
221 def __init__(self, protocol, newTID, method, data, config):
222 """Initialize the request, and send it out.
224 @type protocol: L{KRPC}
225 @param protocol: the protocol to send data with
226 @param newTID: the transaction ID of the request
227 @type method: C{string}
228 @param method: the name of the method to call on the remote node
229 @type data: C{string}
230 @param data: the message to send to the remote node
231 @type config: C{dictionary}
232 @param config: the configuration parameters for the DHT
234 defer.Deferred.__init__(self)
235 self.protocol = protocol
240 self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
241 self.start = datetime.now()
243 reactor.callLater(0, self.send)
246 """Send the request to the remote node."""
247 assert not self.later, 'There is already a pending request'
248 self.later = reactor.callLater(self.delay, self.timeOut)
250 self.protocol.sendData(self.method, self.data)
255 """Check for a unrecoverable timeout, otherwise resend."""
257 delay = datetime.now() - self.start
258 if delay > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 14)):
259 log.msg('%r timed out after %0.2f sec' %
260 (self.tid, delay.seconds + delay.microseconds/1000000.0))
261 self.protocol.timeOut(self.tid, self.method)
262 elif self.protocol.stopped:
263 log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
266 log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
267 reactor.callLater(0, self.send)
269 def callback(self, resp):
271 defer.Deferred.callback(self, resp)
273 def errback(self, resp):
275 defer.Deferred.errback(self, resp)
277 def dropTimeOut(self):
278 """Cancel the timeout call when a response is received."""
279 if self.later and self.later.active():
284 """The KRPC protocol implementation.
286 @ivar transport: the transport to use for the protocol
287 @type factory: L{khashmir.Khashmir}
288 @ivar factory: the main Khashmir program
289 @type stats: L{stats.StatsLogger}
290 @ivar stats: the statistics logger to save transport info
291 @type addr: (C{string}, C{int})
292 @ivar addr: the IP address and port of the source node
293 @type config: C{dictionary}
294 @ivar config: the configuration parameters for the DHT
295 @type tids: C{dictionary}
296 @ivar tids: the transaction IDs outstanding for requests, keys are the
297 transaction ID of the request, values are the deferreds to call with
299 @type stopped: C{boolean}
300 @ivar stopped: whether the protocol has been stopped
303 def __init__(self, addr, server, stats, transport, config = {}):
304 """Initialize the protocol.
306 @type addr: (C{string}, C{int})
307 @param addr: the IP address and port of the source node
308 @type server: L{khashmir.Khashmir}
309 @param server: the main Khashmir program
310 @type stats: L{stats.StatsLogger}
311 @param stats: the statistics logger to save transport info
312 @param transport: the transport to use for the protocol
313 @type config: C{dictionary}
314 @param config: the configuration parameters for the DHT
315 (optional, defaults to using defaults)
317 self.transport = transport
318 self.factory = server
325 def datagramReceived(self, data, addr):
326 """Process the new datagram.
328 @type data: C{string}
329 @param data: the data received from the transport.
330 @type addr: (C{string}, C{int})
331 @param addr: source IP address and port of datagram.
333 self.stats.receivedBytes(len(data))
335 if self.config.get('SPEW', False):
336 log.msg("stopped, dropping message from %r: %s" % (addr, data))
338 # Bdecode the message
342 if self.config.get('SPEW', False):
343 log.msg("krpc bdecode error: ")
347 # Make sure the remote node isn't trying anything funny
351 log.msg("krpc message verification error: ")
355 if self.config.get('SPEW', False):
356 log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
358 # Process it based on its type
362 # Requests are handled by the factory
363 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
364 msg[ARG]['_krpc_sender'] = self.addr
365 if f and callable(f):
366 self.stats.receivedAction(msg[REQ])
368 ret = f(*(), **msg[ARG])
370 log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
372 self.stats.errorAction(msg[REQ])
373 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
376 log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
378 self.stats.errorAction(msg[REQ])
379 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
380 [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
382 log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
384 self.stats.errorAction(msg[REQ])
385 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
386 [KRPC_ERROR_SERVER_ERROR, str(e)])
388 olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
390 # Request for unknown method
391 log.msg("ERROR: don't know about method %s" % msg[REQ])
392 self.stats.receivedAction('unknown')
393 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
394 [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
396 log.msg("%s >>> %s %s %s" % (addr, ilen, msg[REQ], olen))
397 elif msg[TYP] == RSP:
398 # Responses get processed by their TID's deferred
399 if self.tids.has_key(msg[TID]):
400 req = self.tids[msg[TID]]
402 del(self.tids[msg[TID]])
403 msg[RSP]['_krpc_sender'] = addr
404 req.callback(msg[RSP])
406 # no tid, this transaction was finished already...
407 if self.config.get('SPEW', False):
408 log.msg('received response from %r for completed request: %r' %
409 (msg[RSP]['id'], msg[TID]))
410 elif msg[TYP] == ERR:
411 # Errors get processed by their TID's deferred's errback
412 if self.tids.has_key(msg[TID]):
413 req = self.tids[msg[TID]]
414 del(self.tids[msg[TID]])
416 req.errback(KrpcError(*msg[ERR]))
418 # no tid, this transaction was finished already...
419 log.msg('received an error %r from %r for completed request: %r' %
420 (msg[ERR], msg[RSP]['id'], msg[TID]))
422 # Received an unknown message type
423 if self.config.get('SPEW', False):
424 log.msg("unknown message type: %r" % msg)
425 if msg[TID] in self.tids:
426 req = self.tids[msg[TID]]
427 del(self.tids[msg[TID]])
429 req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
430 "Received an unknown message type: %r" % msg[TYP]))
432 def _sendResponse(self, request, addr, tid, msgType, response):
433 """Helper function for sending responses to nodes.
435 @param request: the name of the requested method
436 @type addr: (C{string}, C{int})
437 @param addr: source IP address and port of datagram.
438 @param tid: the transaction ID of the request
439 @param msgType: the type of message to respond with
440 @param response: the arguments for the response
446 # Create the response message
447 msg = {TID : tid, TYP : msgType, msgType : response}
449 if self.config.get('SPEW', False):
450 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
454 # Make sure its not too long
455 if len(out) > UDP_PACKET_LIMIT:
456 # Can we remove some values to shorten it?
457 if 'values' in response:
458 # Save the original list of values
459 orig_values = response['values']
460 len_orig_values = len(bencode(orig_values))
462 # Caclulate the maximum value length possible
463 max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
464 assert max_len_values > 0
466 # Start with a calculation of how many values should be included
467 # (assumes all values are the same length)
468 per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
469 num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
471 # Do a linear search for the actual maximum number possible
472 bencoded_values = len(bencode(orig_values[:num_values]))
473 while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
474 bencoded_values += len(bencode(orig_values[num_values]))
476 while bencoded_values > max_len_values and num_values > 0:
478 bencoded_values -= len(bencode(orig_values[num_values]))
479 assert num_values > 0
482 response['values'] = orig_values[:num_values]
484 assert len(out) < UDP_PACKET_LIMIT
485 log.msg('Shortened a long packet from %d to %d values, new packet length: %d' %
486 (len(orig_values), num_values, len(out)))
488 # Too long a response, send an error
489 log.msg('Could not send response, too long: %d bytes' % len(out))
490 self.stats.errorAction(request)
491 msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
495 # Unknown error, send an error message
496 self.stats.errorAction(request)
497 msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
500 self.stats.sentBytes(len(out))
501 self.transport.write(out, addr)
504 def sendRequest(self, method, args):
505 """Send a request to the remote node.
507 @type method: C{string}
508 @param method: the method name to call on the remote node
509 @param args: the arguments to send to the remote node's method
512 return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
513 "cannot send, connection has been stopped"))
515 # Create the request message
517 msg = {TID : newTID, TYP : REQ, REQ : method, ARG : args}
518 if self.config.get('SPEW', False):
519 log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
522 # Create the request object and save it with the TID
523 req = KrpcRequest(self, newTID, method, data, self.config)
524 self.tids[newTID] = req
526 # Save the conclusion of the action
527 req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
528 callbackArgs = (method, datetime.now()),
529 errbackArgs = (method, datetime.now()))
533 def sendData(self, method, data):
534 """Write a request to the transport and save the stats.
536 @type method: C{string}
537 @param method: the name of the method to call on the remote node
538 @type data: C{string}
539 @param data: the message to send to the remote node
541 self.transport.write(data, self.addr)
542 self.stats.sentAction(method)
543 self.stats.sentBytes(len(data))
545 def timeOut(self, badTID, method):
546 """Call the deferred's errback if a timeout occurs.
548 @param badTID: the transaction ID of the request
549 @type method: C{string}
550 @param method: the name of the method that timed out on the remote node
552 if badTID in self.tids:
553 req = self.tids[badTID]
554 del(self.tids[badTID])
555 req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" %
556 (method, self.addr)))
558 log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
561 """Cancel all pending requests."""
562 for req in self.tids.values():
563 req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
564 'connection has been stopped while waiting for response'))
568 #{ For testing the KRPC protocol
569 def connectionForAddr(host, port):
572 class Receiver(protocol.Factory):
576 def krpc_store(self, msg, _krpc_sender):
579 def krpc_echo(self, msg, _krpc_sender):
581 def krpc_values(self, length, num, _krpc_sender):
582 return {'values': ['1'*length]*num}
585 from stats import StatsLogger
587 a = hostbroker(af, StatsLogger(None, None),
588 {'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
590 p = reactor.listenUDP(port, a)
593 class KRPCTests(unittest.TestCase):
597 self.af, self.a, self.ap = make(1180)
598 self.bf, self.b, self.bp = make(1181)
601 self.ap.stopListening()
602 self.bp.stopListening()
604 def bufEquals(self, result, value):
605 self.failUnlessEqual(self.bf.buf, value)
607 def testSimpleMessage(self):
608 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
609 d.addCallback(self.bufEquals, ["This is a test."])
612 def testMessageBlast(self):
614 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
615 d.addCallback(self.bufEquals, ["This is a test."] * 100)
619 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
620 df.addCallback(self.gotMsg, "This is a test.")
623 def gotMsg(self, dict, should_be):
624 _krpc_sender = dict['_krpc_sender']
625 self.failUnlessEqual(dict['msg'], should_be)
627 def testManyEcho(self):
628 for i in xrange(100):
629 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
630 df.addCallback(self.gotMsg, "This is a test.")
633 def testMultiEcho(self):
634 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
635 df.addCallback(self.gotMsg, "This is a test.")
637 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
638 df.addCallback(self.gotMsg, "This is another test.")
640 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
641 df.addCallback(self.gotMsg, "This is yet another test.")
645 def testEchoReset(self):
646 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
647 df.addCallback(self.gotMsg, "This is a test.")
649 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
650 df.addCallback(self.gotMsg, "This is another test.")
651 df.addCallback(self.echoReset)
654 def echoReset(self, dict):
655 del(self.a.connections[('127.0.0.1', 1181)])
656 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
657 df.addCallback(self.gotMsg, "This is yet another test.")
660 def testUnknownMeth(self):
661 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
662 df = self.failUnlessFailure(df, KrpcError)
663 df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
666 def testMalformedRequest(self):
667 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
668 df = self.failUnlessFailure(df, KrpcError)
669 df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
672 def gotErr(self, value, should_be, *errorTypes):
673 self.failUnlessEqual(value[0], should_be)
675 self.flushLoggedErrors(*errorTypes)
677 def testLongPackets(self):
678 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
679 df.addCallback(self.gotLongRsp)
682 def gotLongRsp(self, dict):
683 # Not quite accurate, but good enough
684 self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)