1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """The KRPC communication protocol implementation.
6 @var KRPC_INITIAL_DELAY: the number of seconds after which to try resending
7 the request, the resends will wait twice as long each time
8 @var KRPC_TIMEOUT: the number of seconds after which requests timeout
9 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
10 UDP packet without fragmentation
12 @var KRPC_ERROR: the code for a generic error
13 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
14 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
15 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
16 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
17 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
18 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
20 @var KRPC_ERROR_INTERNAL: the code for an internal error
21 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
22 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
23 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
25 @var TID: the identifier for the transaction ID
26 @var REQ: the identifier for a request packet
27 @var RSP: the identifier for a response packet
28 @var TYP: the identifier for the type of packet
29 @var ARG: the identifier for the argument to the request
30 @var ERR: the identifier for an error packet
32 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
33 KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
34 KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
35 KRPC_ERROR_RESPONSE_TOO_LONG
36 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
37 KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
38 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
42 from bencode import bencode, bdecode
43 from datetime import datetime, timedelta
46 from twisted.internet import defer
47 from twisted.internet import protocol, reactor
48 from twisted.python import log
49 from twisted.trial import unittest
51 from khash import newID
53 KRPC_INITIAL_DELAY = 2
55 UDP_PACKET_LIMIT = 1472
59 KRPC_ERROR_SERVER_ERROR = 201
60 KRPC_ERROR_MALFORMED_PACKET = 202
61 KRPC_ERROR_METHOD_UNKNOWN = 203
62 KRPC_ERROR_MALFORMED_REQUEST = 204
63 KRPC_ERROR_INVALID_TOKEN = 205
64 KRPC_ERROR_RESPONSE_TOO_LONG = 206
67 KRPC_ERROR_INTERNAL = 100
68 KRPC_ERROR_RECEIVED_UNKNOWN = 101
69 KRPC_ERROR_TIMEOUT = 102
70 KRPC_ERROR_PROTOCOL_STOPPED = 103
80 class KrpcError(Exception):
81 """An error occurred in the KRPC protocol."""
84 def verifyMessage(msg):
85 """Check received message for corruption and errors.
87 @type msg: C{dictionary}
88 @param msg: the dictionary of information received on the connection
89 @raise KrpcError: if the message is corrupt
93 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
95 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
98 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
99 if type(msg[REQ]) != str:
100 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
102 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
103 if type(msg[ARG]) != dict:
104 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
105 elif msg[TYP] == RSP:
107 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
108 if type(msg[RSP]) != dict:
109 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
110 elif msg[TYP] == ERR:
112 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
113 if type(msg[ERR]) != list:
114 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
115 if len(msg[ERR]) != 2:
116 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
117 if type(msg[ERR][0]) not in (int, long):
118 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
119 if type(msg[ERR][1]) != str:
120 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
122 # raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
124 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
125 if type(msg[TID]) != str:
126 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
128 class hostbroker(protocol.DatagramProtocol):
129 """The factory for the KRPC protocol.
131 @type server: L{khashmir.Khashmir}
132 @ivar server: the main Khashmir program
133 @type stats: L{stats.StatsLogger}
134 @ivar stats: the statistics logger to save transport info
135 @type config: C{dictionary}
136 @ivar config: the configuration parameters for the DHT
137 @type connections: C{dictionary}
138 @ivar connections: all the connections that have ever been made to the
139 protocol, keys are IP address and port pairs, values are L{KRPC}
140 protocols for the addresses
141 @ivar protocol: the protocol to use to handle incoming connections
143 @type addr: (C{string}, C{int})
144 @ivar addr: the IP address and port of this node
147 def __init__(self, server, stats, config):
148 """Initialize the factory.
150 @type server: L{khashmir.Khashmir}
151 @param server: the main DHT program
152 @type stats: L{stats.StatsLogger}
153 @param stats: the statistics logger to save transport info
154 @type config: C{dictionary}
155 @param config: the configuration parameters for the DHT
160 # this should be changed to storage that drops old entries
161 self.connections = {}
163 def datagramReceived(self, datagram, addr):
164 """Optionally create a new protocol object, and handle the new datagram.
166 @type datagram: C{string}
167 @param datagram: the data received from the transport.
168 @type addr: (C{string}, C{int})
169 @param addr: source IP address and port of datagram.
171 c = self.connectionForAddr(addr)
172 c.datagramReceived(datagram, addr)
174 # del self.connections[addr]
176 def connectionForAddr(self, addr):
177 """Get a protocol object for the source.
179 @type addr: (C{string}, C{int})
180 @param addr: source IP address and port of datagram.
182 # Don't connect to ourself
183 if addr == self.addr:
186 # Create a new protocol object if necessary
187 if not self.connections.has_key(addr):
188 conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW'])
189 self.connections[addr] = conn
191 conn = self.connections[addr]
194 def makeConnection(self, transport):
195 """Make a connection to a transport and save our address."""
196 protocol.DatagramProtocol.makeConnection(self, transport)
197 tup = transport.getHost()
198 self.addr = (tup.host, tup.port)
200 def stopProtocol(self):
201 """Stop all the open connections."""
202 for conn in self.connections.values():
204 protocol.DatagramProtocol.stopProtocol(self)
206 class KrpcRequest(defer.Deferred):
207 """An outstanding request to a remote node.
209 @type protocol: L{KRPC}
210 @ivar protocol: the protocol to send data with
211 @ivar tid: the transaction ID of the request
212 @type method: C{string}
213 @ivar method: the name of the method to call on the remote node
214 @type data: C{string}
215 @ivar data: the message to send to the remote node
217 @ivar delay: the last timeout delay sent
218 @type start: C{datetime}
219 @ivar start: the time to request was started at
220 @type later: L{twisted.internet.interfaces.IDelayedCall}
221 @ivar later: the pending call to timeout the last sent request
224 def __init__(self, protocol, newTID, method, data):
225 """Initialize the request, and send it out.
227 @type protocol: L{KRPC}
228 @param protocol: the protocol to send data with
229 @param newTID: the transaction ID of the request
230 @type method: C{string}
231 @param method: the name of the method to call on the remote node
232 @type data: C{string}
233 @param data: the message to send to the remote node
235 defer.Deferred.__init__(self)
236 self.protocol = protocol
240 self.delay = KRPC_INITIAL_DELAY
241 self.start = datetime.now()
246 """Send the request to the remote node."""
247 assert not self.later, 'There is already a pending request'
248 self.later = reactor.callLater(self.delay, self.timeOut)
249 self.protocol.sendData(self.method, self.data)
252 """Check for a unrecoverable timeout, otherwise resend."""
254 delay = datetime.now() - self.start
255 if delay > timedelta(seconds = KRPC_TIMEOUT):
256 log.msg('%r timed out after %0.2f sec' %
257 (self.tid, delay.seconds + delay.microseconds/1000000.0))
258 self.protocol.timeOut(self.tid, self.method)
259 elif self.protocol.stopped:
260 log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
263 log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
266 def callback(self, resp):
268 defer.Deferred.callback(self, resp)
270 def errback(self, resp):
272 defer.Deferred.errback(self, resp)
274 def dropTimeOut(self):
275 """Cancel the timeout call when a response is received."""
276 if self.later and self.later.active():
281 """The KRPC protocol implementation.
283 @ivar transport: the transport to use for the protocol
284 @type factory: L{khashmir.Khashmir}
285 @ivar factory: the main Khashmir program
286 @type stats: L{stats.StatsLogger}
287 @ivar stats: the statistics logger to save transport info
288 @type addr: (C{string}, C{int})
289 @ivar addr: the IP address and port of the source node
290 @type noisy: C{boolean}
291 @ivar noisy: whether to log additional details of the protocol
292 @type tids: C{dictionary}
293 @ivar tids: the transaction IDs outstanding for requests, keys are the
294 transaction ID of the request, values are the deferreds to call with
296 @type stopped: C{boolean}
297 @ivar stopped: whether the protocol has been stopped
300 def __init__(self, addr, server, stats, transport, spew = False):
301 """Initialize the protocol.
303 @type addr: (C{string}, C{int})
304 @param addr: the IP address and port of the source node
305 @type server: L{khashmir.Khashmir}
306 @param server: the main Khashmir program
307 @type stats: L{stats.StatsLogger}
308 @param stats: the statistics logger to save transport info
309 @param transport: the transport to use for the protocol
310 @type spew: C{boolean}
311 @param spew: whether to log additional details of the protocol
312 (optional, defaults to False)
314 self.transport = transport
315 self.factory = server
322 def datagramReceived(self, data, addr):
323 """Process the new datagram.
325 @type data: C{string}
326 @param data: the data received from the transport.
327 @type addr: (C{string}, C{int})
328 @param addr: source IP address and port of datagram.
330 self.stats.receivedBytes(len(data))
333 log.msg("stopped, dropping message from %r: %s" % (addr, data))
335 # Bdecode the message
340 log.msg("krpc bdecode error: ")
344 # Make sure the remote node isn't trying anything funny
348 log.msg("krpc message verification error: ")
353 log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
355 # Process it based on its type
359 # Requests are handled by the factory
360 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
361 msg[ARG]['_krpc_sender'] = self.addr
362 if f and callable(f):
363 self.stats.receivedAction(msg[REQ])
365 ret = f(*(), **msg[ARG])
367 log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
369 self.stats.errorAction(msg[REQ])
370 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
373 log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
375 self.stats.errorAction(msg[REQ])
376 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
377 [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
379 log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
381 self.stats.errorAction(msg[REQ])
382 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
383 [KRPC_ERROR_SERVER_ERROR, str(e)])
385 olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
387 # Request for unknown method
388 log.msg("ERROR: don't know about method %s" % msg[REQ])
389 self.stats.receivedAction('unknown')
390 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
391 [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
393 log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
394 ilen, msg[REQ], olen))
395 elif msg[TYP] == RSP:
396 # Responses get processed by their TID's deferred
397 if self.tids.has_key(msg[TID]):
398 req = self.tids[msg[TID]]
400 del(self.tids[msg[TID]])
401 msg[RSP]['_krpc_sender'] = addr
402 req.callback(msg[RSP])
404 # no tid, this transaction was finished already...
406 log.msg('received response from %r for completed request: %r' %
407 (msg[RSP]['id'], msg[TID]))
408 elif msg[TYP] == ERR:
409 # Errors get processed by their TID's deferred's errback
410 if self.tids.has_key(msg[TID]):
411 req = self.tids[msg[TID]]
412 del(self.tids[msg[TID]])
414 req.errback(KrpcError(*msg[ERR]))
416 # no tid, this transaction was finished already...
417 log.msg('received an error %r from %r for completed request: %r' %
418 (msg[ERR], msg[RSP]['id'], msg[TID]))
420 # Received an unknown message type
422 log.msg("unknown message type: %r" % msg)
423 if msg[TID] in self.tids:
424 req = self.tids[msg[TID]]
425 del(self.tids[msg[TID]])
427 req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
428 "Received an unknown message type: %r" % msg[TYP]))
430 def _sendResponse(self, request, addr, tid, msgType, response):
431 """Helper function for sending responses to nodes.
433 @param request: the name of the requested method
434 @type addr: (C{string}, C{int})
435 @param addr: source IP address and port of datagram.
436 @param tid: the transaction ID of the request
437 @param msgType: the type of message to respond with
438 @param response: the arguments for the response
444 # Create the response message
445 msg = {TID : tid, TYP : msgType, msgType : response}
448 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
452 # Make sure its not too long
453 if len(out) > UDP_PACKET_LIMIT:
454 # Can we remove some values to shorten it?
455 if 'values' in response:
456 # Save the original list of values
457 orig_values = response['values']
458 len_orig_values = len(bencode(orig_values))
460 # Caclulate the maximum value length possible
461 max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
462 assert max_len_values > 0
464 # Start with a calculation of how many values should be included
465 # (assumes all values are the same length)
466 per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
467 num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
469 # Do a linear search for the actual maximum number possible
470 bencoded_values = len(bencode(orig_values[:num_values]))
471 while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
472 bencoded_values += len(bencode(orig_values[num_values]))
474 while bencoded_values > max_len_values and num_values > 0:
476 bencoded_values -= len(bencode(orig_values[num_values]))
477 assert num_values > 0
480 response['values'] = orig_values[:num_values]
482 assert len(out) < UDP_PACKET_LIMIT
483 log.msg('Shortened a long packet from %d to %d values, new packet length: %d' %
484 (len(orig_values), num_values, len(out)))
486 # Too long a response, send an error
487 log.msg('Could not send response, too long: %d bytes' % len(out))
488 self.stats.errorAction(request)
489 msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
493 # Unknown error, send an error message
494 self.stats.errorAction(request)
495 msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
498 self.stats.sentBytes(len(out))
499 self.transport.write(out, addr)
502 def sendRequest(self, method, args):
503 """Send a request to the remote node.
505 @type method: C{string}
506 @param method: the method name to call on the remote node
507 @param args: the arguments to send to the remote node's method
510 return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
511 "cannot send, connection has been stopped"))
513 # Create the request message
515 msg = {TID : newTID, TYP : REQ, REQ : method, ARG : args}
517 log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
520 # Create the request object and save it with the TID
521 req = KrpcRequest(self, newTID, method, data)
522 self.tids[newTID] = req
524 # Save the conclusion of the action
525 req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
526 callbackArgs = (method, datetime.now()),
527 errbackArgs = (method, datetime.now()))
531 def sendData(self, method, data):
532 """Write a request to the transport and save the stats.
534 @type method: C{string}
535 @param method: the name of the method to call on the remote node
536 @type data: C{string}
537 @param data: the message to send to the remote node
539 self.stats.sentAction(method)
540 self.stats.sentBytes(len(data))
542 self.transport.write(data, self.addr)
544 def timeOut(self, badTID, method):
545 """Call the deferred's errback if a timeout occurs.
547 @param badTID: the transaction ID of the request
548 @type method: C{string}
549 @param method: the name of the method that timed out on the remote node
551 if badTID in self.tids:
552 req = self.tids[badTID]
553 del(self.tids[badTID])
554 req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" %
555 (method, self.addr)))
557 log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
560 """Cancel all pending requests."""
561 for req in self.tids.values():
562 req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
563 'connection has been stopped while waiting for response'))
567 #{ For testing the KRPC protocol
568 def connectionForAddr(host, port):
571 class Receiver(protocol.Factory):
575 def krpc_store(self, msg, _krpc_sender):
578 def krpc_echo(self, msg, _krpc_sender):
580 def krpc_values(self, length, num, _krpc_sender):
581 return {'values': ['1'*length]*num}
584 from stats import StatsLogger
586 a = hostbroker(af, StatsLogger(None, None), {'SPEW': False})
588 p = reactor.listenUDP(port, a)
591 class KRPCTests(unittest.TestCase):
595 self.af, self.a, self.ap = make(1180)
596 self.bf, self.b, self.bp = make(1181)
599 self.ap.stopListening()
600 self.bp.stopListening()
602 def bufEquals(self, result, value):
603 self.failUnlessEqual(self.bf.buf, value)
605 def testSimpleMessage(self):
606 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
607 d.addCallback(self.bufEquals, ["This is a test."])
610 def testMessageBlast(self):
612 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
613 d.addCallback(self.bufEquals, ["This is a test."] * 100)
617 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
618 df.addCallback(self.gotMsg, "This is a test.")
621 def gotMsg(self, dict, should_be):
622 _krpc_sender = dict['_krpc_sender']
623 self.failUnlessEqual(dict['msg'], should_be)
625 def testManyEcho(self):
626 for i in xrange(100):
627 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
628 df.addCallback(self.gotMsg, "This is a test.")
631 def testMultiEcho(self):
632 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
633 df.addCallback(self.gotMsg, "This is a test.")
635 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
636 df.addCallback(self.gotMsg, "This is another test.")
638 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
639 df.addCallback(self.gotMsg, "This is yet another test.")
643 def testEchoReset(self):
644 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
645 df.addCallback(self.gotMsg, "This is a test.")
647 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
648 df.addCallback(self.gotMsg, "This is another test.")
649 df.addCallback(self.echoReset)
652 def echoReset(self, dict):
653 del(self.a.connections[('127.0.0.1', 1181)])
654 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
655 df.addCallback(self.gotMsg, "This is yet another test.")
658 def testUnknownMeth(self):
659 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
660 df = self.failUnlessFailure(df, KrpcError)
661 df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
664 def testMalformedRequest(self):
665 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
666 df = self.failUnlessFailure(df, KrpcError)
667 df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
670 def gotErr(self, value, should_be, *errorTypes):
671 self.failUnlessEqual(value[0], should_be)
673 self.flushLoggedErrors(*errorTypes)
675 def testLongPackets(self):
676 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
677 df.addCallback(self.gotLongRsp)
680 def gotLongRsp(self, dict):
681 # Not quite accurate, but good enough
682 self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)