From: Cameron Dale Date: Wed, 5 Mar 2008 23:02:44 +0000 (-0800) Subject: Document the DHT's krpc module. X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=commitdiff_plain;h=bdfaa8f29cbc5fa9d791e967e3ad848a5a1641ee Document the DHT's krpc module. --- diff --git a/apt_dht_Khashmir/krpc.py b/apt_dht_Khashmir/krpc.py index 1400030..a4fbacc 100644 --- a/apt_dht_Khashmir/krpc.py +++ b/apt_dht_Khashmir/krpc.py @@ -1,6 +1,42 @@ ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information +"""The KRPC communication protocol implementation. + +@var KRPC_TIMEOUT: the number of seconds after which requests timeout +@var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a + UDP packet without fragmentation + +@var KRPC_ERROR: the code for a generic error +@var KRPC_ERROR_SERVER_ERROR: the code for a server error +@var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error +@var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error +@var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error +@var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error +@var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error + +@var KRPC_ERROR_INTERNAL: the code for an internal error +@var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error +@var KRPC_ERROR_TIMEOUT: the code for a timeout error +@var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error + +@var TID: the identifier for the transaction ID +@var REQ: the identifier for a request packet +@var RSP: the identifier for a response packet +@var TYP: the identifier for the type of packet +@var ARG: the identifier for the argument to the request +@var ERR: the identifier for an error packet + +@group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR, + KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN, + KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN, + KRPC_ERROR_RESPONSE_TOO_LONG +@group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN, + KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED +@group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR + +""" + from bencode import bencode, bdecode from time import asctime from math import ceil @@ -39,6 +75,7 @@ ARG = 'a' ERR = 'e' class KrpcError(Exception): + """An error occurred in the KRPC protocol.""" pass def verifyMessage(msg): @@ -85,24 +122,60 @@ def verifyMessage(msg): if type(msg[TID]) != str: raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string") -class hostbroker(protocol.DatagramProtocol): +class hostbroker(protocol.DatagramProtocol): + """The factory for the KRPC protocol. + + @type server: L{khashmir.Khashmir} + @ivar server: the main Khashmir program + @type config: C{dictionary} + @ivar config: the configuration parameters for the DHT + @type connections: C{dictionary} + @ivar connections: all the connections that have ever been made to the + protocol, keys are IP address and port pairs, values are L{KRPC} + protocols for the addresses + @ivar protocol: the protocol to use to handle incoming connections + (added externally) + @type addr: (C{string}, C{int}) + @ivar addr: the IP address and port of this node + """ + def __init__(self, server, config): + """Initialize the factory. + + @type server: L{khashmir.Khashmir} + @param server: the main DHT program + @type config: C{dictionary} + @param config: the configuration parameters for the DHT + """ self.server = server self.config = config # this should be changed to storage that drops old entries self.connections = {} def datagramReceived(self, datagram, addr): - #print `addr`, `datagram` - #if addr != self.addr: + """Optionally create a new protocol object, and handle the new datagram. + + @type datagram: C{string} + @param datagram: the data received from the transport. + @type addr: (C{string}, C{int}) + @param addr: source IP address and port of datagram. + """ c = self.connectionForAddr(addr) c.datagramReceived(datagram, addr) #if c.idle(): # del self.connections[addr] def connectionForAddr(self, addr): + """Get a protocol object for the source. + + @type addr: (C{string}, C{int}) + @param addr: source IP address and port of datagram. + """ + # Don't connect to ourself if addr == self.addr: - raise Exception + raise KrcpError + + # Create a new protocol object if necessary if not self.connections.has_key(addr): conn = self.protocol(addr, self.server, self.transport, self.config['SPEW']) self.connections[addr] = conn @@ -111,18 +184,47 @@ class hostbroker(protocol.DatagramProtocol): return conn def makeConnection(self, transport): + """Make a connection to a transport and save our address.""" protocol.DatagramProtocol.makeConnection(self, transport) tup = transport.getHost() self.addr = (tup.host, tup.port) def stopProtocol(self): + """Stop all the open connections.""" for conn in self.connections.values(): conn.stop() protocol.DatagramProtocol.stopProtocol(self) -## connection class KRPC: + """The KRPC protocol implementation. + + @ivar transport: the transport to use for the protocol + @type factory: L{khashmir.Khashmir} + @ivar factory: the main Khashmir program + @type addr: (C{string}, C{int}) + @ivar addr: the IP address and port of the source node + @type noisy: C{boolean} + @ivar noisy: whether to log additional details of the protocol + @type tids: C{dictionary} + @ivar tids: the transaction IDs outstanding for requests, keys are the + transaction ID of the request, values are the deferreds to call with + the results + @type stopped: C{boolean} + @ivar stopped: whether the protocol has been stopped + """ + def __init__(self, addr, server, transport, spew = False): + """Initialize the protocol. + + @type addr: (C{string}, C{int}) + @param addr: the IP address and port of the source node + @type server: L{khashmir.Khashmir} + @param server: the main Khashmir program + @param transport: the transport to use for the protocol + @type spew: C{boolean} + @param spew: whether to log additional details of the protocol + (optional, defaults to False) + """ self.transport = transport self.factory = server self.addr = addr @@ -131,10 +233,18 @@ class KRPC: self.stopped = False def datagramReceived(self, data, addr): + """Process the new datagram. + + @type data: C{string} + @param data: the data received from the transport. + @type addr: (C{string}, C{int}) + @param addr: source IP address and port of datagram. + """ if self.stopped: if self.noisy: log.msg("stopped, dropping message from %r: %s" % (addr, data)) - # bdecode + + # Bdecode the message try: msg = bdecode(data) except Exception, e: @@ -143,6 +253,7 @@ class KRPC: log.err(e) return + # Make sure the remote node isn't trying anything funny try: verifyMessage(msg) except Exception, e: @@ -152,11 +263,12 @@ class KRPC: if self.noisy: log.msg("%d received from %r: %s" % (self.factory.port, addr, msg)) - # look at msg type + + # Process it based on its type if msg[TYP] == REQ: ilen = len(data) - # if request - # tell factory to handle + + # Requests are handled by the factory f = getattr(self.factory ,"krpc_" + msg[REQ], None) msg[ARG]['_krpc_sender'] = self.addr if f and callable(f): @@ -179,7 +291,7 @@ class KRPC: else: olen = self._sendResponse(addr, msg[TID], RSP, ret) else: - # unknown method + # Request for unknown method log.msg("ERROR: don't know about method %s" % msg[REQ]) olen = self._sendResponse(addr, msg[TID], ERR, [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])]) @@ -187,8 +299,7 @@ class KRPC: log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port, ilen, msg[REQ], olen)) elif msg[TYP] == RSP: - # if response - # lookup tid + # Responses get processed by their TID's deferred if self.tids.has_key(msg[TID]): df = self.tids[msg[TID]] # callback @@ -199,8 +310,7 @@ class KRPC: if self.noisy: log.msg('timeout: %r' % msg[RSP]['id']) elif msg[TYP] == ERR: - # if error - # lookup tid + # Errors get processed by their TID's deferred's errback if self.tids.has_key(msg[TID]): df = self.tids[msg[TID]] del(self.tids[msg[TID]]) @@ -211,9 +321,9 @@ class KRPC: log.msg("Got an error for an unknown request: %r" % (msg[ERR], )) pass else: + # Received an unknown message type if self.noisy: log.msg("unknown message type: %r" % msg) - # unknown message type if msg[TID] in self.tids: df = self.tids[msg[TID]] del(self.tids[msg[TID]]) @@ -222,10 +332,19 @@ class KRPC: "Received an unknown message type: %r" % msg[TYP])) def _sendResponse(self, addr, tid, msgType, response): + """Helper function for sending responses to nodes. + + @type addr: (C{string}, C{int}) + @param addr: source IP address and port of datagram. + @param tid: the transaction ID of the request + @param msgType: the type of message to respond with + @param response: the arguments for the response + """ if not response: response = {} try: + # Create the response message msg = {TID : tid, TYP : msgType, msgType : response} if self.noisy: @@ -233,7 +352,9 @@ class KRPC: out = bencode(msg) + # Make sure its not too long if len(out) > UDP_PACKET_LIMIT: + # Can we remove some values to shorten it? if 'values' in response: # Save the original list of values orig_values = response['values'] @@ -279,27 +400,42 @@ class KRPC: return len(out) def sendRequest(self, method, args): + """Send a request to the remote node. + + @type method: C{string} + @param method: the methiod name to call on the remote node + @param args: the arguments to send to the remote node's method + """ if self.stopped: raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped") - # make message - # send it + + # Create the request message msg = {TID : newID(), TYP : REQ, REQ : method, ARG : args} if self.noisy: log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg)) data = bencode(msg) + + # Create the deferred and save it with the TID d = Deferred() self.tids[msg[TID]] = d + + # Schedule a later timeout call def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr): + """Call the deferred's errback if a timeout occurs.""" if tids.has_key(id): df = tids[id] del(tids[id]) df.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % (method, addr))) later = reactor.callLater(KRPC_TIMEOUT, timeOut) + + # Cancel the timeout call if a response is received def dropTimeOut(dict, later_call = later): + """Cancel the timeout call when a response is received.""" if later_call.active(): later_call.cancel() return dict d.addBoth(dropTimeOut) + self.transport.write(data, self.addr) return d @@ -309,7 +445,8 @@ class KRPC: df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, 'connection has been stopped while waiting for response')) self.tids = {} self.stopped = True - + +#{ For testing the KRPC protocol def connectionForAddr(host, port): return host @@ -421,5 +558,4 @@ class KRPCTests(unittest.TestCase): def gotLongRsp(self, dict): # Not quite accurate, but good enough self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT) - \ No newline at end of file