X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_dht_Khashmir%2Fkrpc.py;h=a4fbacc0a1fb1a6ed3b1744d8900c32ee36594ef;hp=40ab8d84145e3e7e5650f67abbb7edd95b785861;hb=bdfaa8f29cbc5fa9d791e967e3ad848a5a1641ee;hpb=e6beef78406c3093fc4eb98c4293b9257b44aa61 diff --git a/apt_dht_Khashmir/krpc.py b/apt_dht_Khashmir/krpc.py index 40ab8d8..a4fbacc 100644 --- a/apt_dht_Khashmir/krpc.py +++ b/apt_dht_Khashmir/krpc.py @@ -1,21 +1,70 @@ ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information +"""The KRPC communication protocol implementation. + +@var KRPC_TIMEOUT: the number of seconds after which requests timeout +@var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a + UDP packet without fragmentation + +@var KRPC_ERROR: the code for a generic error +@var KRPC_ERROR_SERVER_ERROR: the code for a server error +@var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error +@var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error +@var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error +@var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error +@var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error + +@var KRPC_ERROR_INTERNAL: the code for an internal error +@var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error +@var KRPC_ERROR_TIMEOUT: the code for a timeout error +@var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error + +@var TID: the identifier for the transaction ID +@var REQ: the identifier for a request packet +@var RSP: the identifier for a response packet +@var TYP: the identifier for the type of packet +@var ARG: the identifier for the argument to the request +@var ERR: the identifier for an error packet + +@group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR, + KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN, + KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN, + KRPC_ERROR_RESPONSE_TOO_LONG +@group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN, + KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED +@group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR + +""" + from bencode import bencode, bdecode from time import asctime -import sys -from traceback import format_exception +from math import ceil from twisted.internet.defer import Deferred from twisted.internet import protocol, reactor +from twisted.python import log from twisted.trial import unittest +from khash import newID + KRPC_TIMEOUT = 20 +UDP_PACKET_LIMIT = 1472 + +# Remote node errors +KRPC_ERROR = 200 +KRPC_ERROR_SERVER_ERROR = 201 +KRPC_ERROR_MALFORMED_PACKET = 202 +KRPC_ERROR_METHOD_UNKNOWN = 203 +KRPC_ERROR_MALFORMED_REQUEST = 204 +KRPC_ERROR_INVALID_TOKEN = 205 +KRPC_ERROR_RESPONSE_TOO_LONG = 206 -KRPC_ERROR = 1 -KRPC_ERROR_METHOD_UNKNOWN = 2 -KRPC_ERROR_RECEIVED_UNKNOWN = 3 -KRPC_ERROR_TIMEOUT = 4 +# Local errors +KRPC_ERROR_INTERNAL = 100 +KRPC_ERROR_RECEIVED_UNKNOWN = 101 +KRPC_ERROR_TIMEOUT = 102 +KRPC_ERROR_PROTOCOL_STOPPED = 103 # commands TID = 't' @@ -25,27 +74,108 @@ TYP = 'y' ARG = 'a' ERR = 'e' -class ProtocolError(Exception): +class KrpcError(Exception): + """An error occurred in the KRPC protocol.""" pass -class hostbroker(protocol.DatagramProtocol): +def verifyMessage(msg): + """Check received message for corruption and errors. + + @type msg: C{dictionary} + @param msg: the dictionary of information received on the connection + @raise KrpcError: if the message is corrupt + """ + + if type(msg) != dict: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary") + if TYP not in msg: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type") + if msg[TYP] == REQ: + if REQ not in msg: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified") + if type(msg[REQ]) != str: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string") + if ARG not in msg: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request") + if type(msg[ARG]) != dict: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary") + elif msg[TYP] == RSP: + if RSP not in msg: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified") + if type(msg[RSP]) != dict: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary") + elif msg[TYP] == ERR: + if ERR not in msg: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified") + if type(msg[ERR]) != list: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list") + if len(msg[ERR]) != 2: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list") + if type(msg[ERR][0]) not in (int, long): + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number") + if type(msg[ERR][1]) != str: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string") +# else: +# raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type") + if TID not in msg: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified") + if type(msg[TID]) != str: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string") + +class hostbroker(protocol.DatagramProtocol): + """The factory for the KRPC protocol. + + @type server: L{khashmir.Khashmir} + @ivar server: the main Khashmir program + @type config: C{dictionary} + @ivar config: the configuration parameters for the DHT + @type connections: C{dictionary} + @ivar connections: all the connections that have ever been made to the + protocol, keys are IP address and port pairs, values are L{KRPC} + protocols for the addresses + @ivar protocol: the protocol to use to handle incoming connections + (added externally) + @type addr: (C{string}, C{int}) + @ivar addr: the IP address and port of this node + """ + def __init__(self, server, config): + """Initialize the factory. + + @type server: L{khashmir.Khashmir} + @param server: the main DHT program + @type config: C{dictionary} + @param config: the configuration parameters for the DHT + """ self.server = server self.config = config # this should be changed to storage that drops old entries self.connections = {} def datagramReceived(self, datagram, addr): - #print `addr`, `datagram` - #if addr != self.addr: + """Optionally create a new protocol object, and handle the new datagram. + + @type datagram: C{string} + @param datagram: the data received from the transport. + @type addr: (C{string}, C{int}) + @param addr: source IP address and port of datagram. + """ c = self.connectionForAddr(addr) c.datagramReceived(datagram, addr) #if c.idle(): # del self.connections[addr] def connectionForAddr(self, addr): + """Get a protocol object for the source. + + @type addr: (C{string}, C{int}) + @param addr: source IP address and port of datagram. + """ + # Don't connect to ourself if addr == self.addr: - raise Exception + raise KrcpError + + # Create a new protocol object if necessary if not self.connections.has_key(addr): conn = self.protocol(addr, self.server, self.transport, self.config['SPEW']) self.connections[addr] = conn @@ -54,138 +184,269 @@ class hostbroker(protocol.DatagramProtocol): return conn def makeConnection(self, transport): + """Make a connection to a transport and save our address.""" protocol.DatagramProtocol.makeConnection(self, transport) tup = transport.getHost() self.addr = (tup.host, tup.port) def stopProtocol(self): + """Stop all the open connections.""" for conn in self.connections.values(): conn.stop() protocol.DatagramProtocol.stopProtocol(self) -## connection class KRPC: + """The KRPC protocol implementation. + + @ivar transport: the transport to use for the protocol + @type factory: L{khashmir.Khashmir} + @ivar factory: the main Khashmir program + @type addr: (C{string}, C{int}) + @ivar addr: the IP address and port of the source node + @type noisy: C{boolean} + @ivar noisy: whether to log additional details of the protocol + @type tids: C{dictionary} + @ivar tids: the transaction IDs outstanding for requests, keys are the + transaction ID of the request, values are the deferreds to call with + the results + @type stopped: C{boolean} + @ivar stopped: whether the protocol has been stopped + """ + def __init__(self, addr, server, transport, spew = False): + """Initialize the protocol. + + @type addr: (C{string}, C{int}) + @param addr: the IP address and port of the source node + @type server: L{khashmir.Khashmir} + @param server: the main Khashmir program + @param transport: the transport to use for the protocol + @type spew: C{boolean} + @param spew: whether to log additional details of the protocol + (optional, defaults to False) + """ self.transport = transport self.factory = server self.addr = addr self.noisy = spew self.tids = {} - self.mtid = 0 self.stopped = False - def datagramReceived(self, str, addr): + def datagramReceived(self, data, addr): + """Process the new datagram. + + @type data: C{string} + @param data: the data received from the transport. + @type addr: (C{string}, C{int}) + @param addr: source IP address and port of datagram. + """ if self.stopped: if self.noisy: - print "stopped, dropping message from", addr, str - # bdecode + log.msg("stopped, dropping message from %r: %s" % (addr, data)) + + # Bdecode the message try: - msg = bdecode(str) + msg = bdecode(data) except Exception, e: if self.noisy: - print "response decode error: " + `e` - else: - if self.noisy: - print self.factory.port, "received from", addr, self.addr, ":", msg - # look at msg type - if msg[TYP] == REQ: - ilen = len(str) - # if request - # tell factory to handle - f = getattr(self.factory ,"krpc_" + msg[REQ], None) - msg[ARG]['_krpc_sender'] = self.addr - if f and callable(f): - try: - ret = f(*(), **msg[ARG]) - except Exception, e: - olen = self._sendResponse(addr, msg[TID], ERR, `format_exception(type(e), e, sys.exc_info()[2])`) - else: - olen = self._sendResponse(addr, msg[TID], RSP, ret) - else: - if self.noisy: - print "don't know about method %s" % msg[REQ] - # unknown method - olen = self._sendResponse(addr, msg[TID], ERR, KRPC_ERROR_METHOD_UNKNOWN) - if self.noisy: - print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port, - ilen, msg[REQ], olen) - elif msg[TYP] == RSP: - # if response - # lookup tid - if self.tids.has_key(msg[TID]): - df = self.tids[msg[TID]] - # callback - del(self.tids[msg[TID]]) - df.callback({'rsp' : msg[RSP], '_krpc_sender': addr}) - else: - print 'timeout ' + `msg[RSP]['id']` - # no tid, this transaction timed out already... - elif msg[TYP] == ERR: - # if error - # lookup tid - if self.tids.has_key(msg[TID]): - df = self.tids[msg[TID]] - # callback - df.errback(msg[ERR]) - del(self.tids[msg[TID]]) + log.msg("krpc bdecode error: ") + log.err(e) + return + + # Make sure the remote node isn't trying anything funny + try: + verifyMessage(msg) + except Exception, e: + log.msg("krpc message verification error: ") + log.err(e) + return + + if self.noisy: + log.msg("%d received from %r: %s" % (self.factory.port, addr, msg)) + + # Process it based on its type + if msg[TYP] == REQ: + ilen = len(data) + + # Requests are handled by the factory + f = getattr(self.factory ,"krpc_" + msg[REQ], None) + msg[ARG]['_krpc_sender'] = self.addr + if f and callable(f): + try: + ret = f(*(), **msg[ARG]) + except KrpcError, e: + log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ]) + log.err(e) + olen = self._sendResponse(addr, msg[TID], ERR, [e[0], e[1]]) + except TypeError, e: + log.msg('Got a malformed request for: krpc_%s' % msg[REQ]) + log.err(e) + olen = self._sendResponse(addr, msg[TID], ERR, + [KRPC_ERROR_MALFORMED_REQUEST, str(e)]) + except Exception, e: + log.msg('Got an unknown error while running: krpc_%s' % msg[REQ]) + log.err(e) + olen = self._sendResponse(addr, msg[TID], ERR, + [KRPC_ERROR_SERVER_ERROR, str(e)]) else: - # day late and dollar short - pass + olen = self._sendResponse(addr, msg[TID], RSP, ret) else: - print "unknown message type " + `msg` - # unknown message type + # Request for unknown method + log.msg("ERROR: don't know about method %s" % msg[REQ]) + olen = self._sendResponse(addr, msg[TID], ERR, + [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])]) + if self.noisy: + log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port, + ilen, msg[REQ], olen)) + elif msg[TYP] == RSP: + # Responses get processed by their TID's deferred + if self.tids.has_key(msg[TID]): df = self.tids[msg[TID]] # callback - df.errback(KRPC_ERROR_RECEIVED_UNKNOWN) del(self.tids[msg[TID]]) + df.callback({'rsp' : msg[RSP], '_krpc_sender': addr}) + else: + # no tid, this transaction timed out already... + if self.noisy: + log.msg('timeout: %r' % msg[RSP]['id']) + elif msg[TYP] == ERR: + # Errors get processed by their TID's deferred's errback + if self.tids.has_key(msg[TID]): + df = self.tids[msg[TID]] + del(self.tids[msg[TID]]) + # callback + df.errback(KrpcError(*msg[ERR])) + else: + # day late and dollar short, just log it + log.msg("Got an error for an unknown request: %r" % (msg[ERR], )) + pass + else: + # Received an unknown message type + if self.noisy: + log.msg("unknown message type: %r" % msg) + if msg[TID] in self.tids: + df = self.tids[msg[TID]] + del(self.tids[msg[TID]]) + # callback + df.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN, + "Received an unknown message type: %r" % msg[TYP])) def _sendResponse(self, addr, tid, msgType, response): + """Helper function for sending responses to nodes. + + @type addr: (C{string}, C{int}) + @param addr: source IP address and port of datagram. + @param tid: the transaction ID of the request + @param msgType: the type of message to respond with + @param response: the arguments for the response + """ if not response: response = {} + + try: + # Create the response message + msg = {TID : tid, TYP : msgType, msgType : response} + + if self.noisy: + log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg)) + + out = bencode(msg) - msg = {TID : tid, TYP : msgType, msgType : response} - - if self.noisy: - print self.factory.port, "responding to", addr, ":", msg + # Make sure its not too long + if len(out) > UDP_PACKET_LIMIT: + # Can we remove some values to shorten it? + if 'values' in response: + # Save the original list of values + orig_values = response['values'] + len_orig_values = len(bencode(orig_values)) + + # Caclulate the maximum value length possible + max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT) + assert max_len_values > 0 + + # Start with a calculation of how many values should be included + # (assumes all values are the same length) + per_value = (float(len_orig_values) - 2.0) / float(len(orig_values)) + num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value)) + + # Do a linear search for the actual maximum number possible + bencoded_values = len(bencode(orig_values[:num_values])) + while bencoded_values < max_len_values and num_values + 1 < len(orig_values): + bencoded_values += len(bencode(orig_values[num_values])) + num_values += 1 + while bencoded_values > max_len_values and num_values > 0: + num_values -= 1 + bencoded_values -= len(bencode(orig_values[num_values])) + assert num_values > 0 + + # Encode the result + response['values'] = orig_values[:num_values] + out = bencode(msg) + assert len(out) < UDP_PACKET_LIMIT + log.msg('Shortened a long packet from %d to %d values, new packet length: %d' % + (len(orig_values), num_values, len(out))) + else: + # Too long a response, send an error + log.msg('Could not send response, too long: %d bytes' % len(out)) + msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]} + out = bencode(msg) - out = bencode(msg) + except Exception, e: + # Unknown error, send an error message + msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]} + out = bencode(msg) + self.transport.write(out, addr) return len(out) def sendRequest(self, method, args): + """Send a request to the remote node. + + @type method: C{string} + @param method: the methiod name to call on the remote node + @param args: the arguments to send to the remote node's method + """ if self.stopped: - raise ProtocolError, "connection has been stopped" - # make message - # send it - msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args} - self.mtid = (self.mtid + 1) % 256 + raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped") + + # Create the request message + msg = {TID : newID(), TYP : REQ, REQ : method, ARG : args} if self.noisy: - print self.factory.port, "sending to", self.addr, ":", msg - str = bencode(msg) + log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg)) + data = bencode(msg) + + # Create the deferred and save it with the TID d = Deferred() self.tids[msg[TID]] = d - def timeOut(tids = self.tids, id = msg[TID], msg = msg): + + # Schedule a later timeout call + def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr): + """Call the deferred's errback if a timeout occurs.""" if tids.has_key(id): df = tids[id] del(tids[id]) - print ">>>>>> KRPC_ERROR_TIMEOUT" - df.errback(ProtocolError('timeout waiting for %r' % msg)) + df.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % (method, addr))) later = reactor.callLater(KRPC_TIMEOUT, timeOut) + + # Cancel the timeout call if a response is received def dropTimeOut(dict, later_call = later): + """Cancel the timeout call when a response is received.""" if later_call.active(): later_call.cancel() return dict d.addBoth(dropTimeOut) - self.transport.write(str, self.addr) + + self.transport.write(data, self.addr) return d def stop(self): """Timeout all pending requests.""" for df in self.tids.values(): - df.errback(ProtocolError('connection has been closed')) + df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, 'connection has been stopped while waiting for response')) self.tids = {} self.stopped = True - + +#{ For testing the KRPC protocol def connectionForAddr(host, port): return host @@ -195,8 +456,11 @@ class Receiver(protocol.Factory): self.buf = [] def krpc_store(self, msg, _krpc_sender): self.buf += [msg] + return {} def krpc_echo(self, msg, _krpc_sender): - return msg + return {'msg': msg} + def krpc_values(self, length, num, _krpc_sender): + return {'values': ['1'*length]*num} def make(port): af = Receiver() @@ -206,6 +470,8 @@ def make(port): return af, a, p class KRPCTests(unittest.TestCase): + timeout = 2 + def setUp(self): self.af, self.a, self.ap = make(1180) self.bf, self.b, self.bp = make(1181) @@ -236,7 +502,7 @@ class KRPCTests(unittest.TestCase): def gotMsg(self, dict, should_be): _krpc_sender = dict['_krpc_sender'] msg = dict['rsp'] - self.failUnlessEqual(msg, should_be) + self.failUnlessEqual(msg['msg'], should_be) def testManyEcho(self): for i in xrange(100): @@ -273,8 +539,23 @@ class KRPCTests(unittest.TestCase): def testUnknownMeth(self): df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."}) - df.addErrback(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN) + df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN) + return df + + def testMalformedRequest(self): + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'}) + df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST) return df def gotErr(self, err, should_be): - self.failUnlessEqual(err.value, should_be) + self.failUnlessEqual(err.value[0], should_be) + + def testLongPackets(self): + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000}) + df.addCallback(self.gotLongRsp) + return df + + def gotLongRsp(self, dict): + # Not quite accurate, but good enough + self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT) + \ No newline at end of file