## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
# see LICENSE.txt for license information
+"""The KRPC communication protocol implementation.
+@var KRPC_TIMEOUT: the number of seconds after which requests timeout
+@var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
+ UDP packet without fragmentation
+@var KRPC_ERROR: the code for a generic error
+@var KRPC_ERROR_SERVER_ERROR: the code for a server error
+@var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
+@var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
+@var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
+@var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
+@var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
+@var KRPC_ERROR_INTERNAL: the code for an internal error
+@var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
+@var KRPC_ERROR_TIMEOUT: the code for a timeout error
+@var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
+@var TID: the identifier for the transaction ID
+@var REQ: the identifier for a request packet
+@var RSP: the identifier for a response packet
+@var TYP: the identifier for the type of packet
+@var ARG: the identifier for the argument to the request
+@var ERR: the identifier for an error packet
+@group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
+@group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
from bencode import bencode, bdecode
from time import asctime
from math import ceil
ERR = 'e'
class KrpcError(Exception):
+ """An error occurred in the KRPC protocol."""
def verifyMessage(msg):
if type(msg[TID]) != str:
raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
-class hostbroker(protocol.DatagramProtocol):
+class hostbroker(protocol.DatagramProtocol):
+ """The factory for the KRPC protocol.
+ @type server: L{khashmir.Khashmir}
+ @ivar server: the main Khashmir program
+ @type config: C{dictionary}
+ @ivar config: the configuration parameters for the DHT
+ @type connections: C{dictionary}
+ @ivar connections: all the connections that have ever been made to the
+ protocol, keys are IP address and port pairs, values are L{KRPC}
+ protocols for the addresses
+ @ivar protocol: the protocol to use to handle incoming connections
+ (added externally)
+ @type addr: (C{string}, C{int})
+ @ivar addr: the IP address and port of this node
+ """
def __init__(self, server, config):
+ """Initialize the factory.
+ @type server: L{khashmir.Khashmir}
+ @param server: the main DHT program
+ @type config: C{dictionary}
+ @param config: the configuration parameters for the DHT
+ """
self.server = server
self.config = config
# this should be changed to storage that drops old entries
self.connections = {}
def datagramReceived(self, datagram, addr):
- #print `addr`, `datagram`
- #if addr != self.addr:
+ """Optionally create a new protocol object, and handle the new datagram.
+ @type datagram: C{string}
+ @param datagram: the data received from the transport.
+ @type addr: (C{string}, C{int})
+ @param addr: source IP address and port of datagram.
+ """
c = self.connectionForAddr(addr)
c.datagramReceived(datagram, addr)
#if c.idle():
# del self.connections[addr]
def connectionForAddr(self, addr):
+ """Get a protocol object for the source.
+ @type addr: (C{string}, C{int})
+ @param addr: source IP address and port of datagram.
+ """
+ # Don't connect to ourself
if addr == self.addr:
- raise Exception
+ raise KrcpError
+ # Create a new protocol object if necessary
if not self.connections.has_key(addr):
conn = self.protocol(addr, self.server, self.transport, self.config['SPEW'])
self.connections[addr] = conn
return conn
def makeConnection(self, transport):
+ """Make a connection to a transport and save our address."""
protocol.DatagramProtocol.makeConnection(self, transport)
tup = transport.getHost()
self.addr = (tup.host, tup.port)
def stopProtocol(self):
+ """Stop all the open connections."""
for conn in self.connections.values():
-## connection
class KRPC:
+ """The KRPC protocol implementation.
+ @ivar transport: the transport to use for the protocol
+ @type factory: L{khashmir.Khashmir}
+ @ivar factory: the main Khashmir program
+ @type addr: (C{string}, C{int})
+ @ivar addr: the IP address and port of the source node
+ @type noisy: C{boolean}
+ @ivar noisy: whether to log additional details of the protocol
+ @type tids: C{dictionary}
+ @ivar tids: the transaction IDs outstanding for requests, keys are the
+ transaction ID of the request, values are the deferreds to call with
+ the results
+ @type stopped: C{boolean}
+ @ivar stopped: whether the protocol has been stopped
+ """
def __init__(self, addr, server, transport, spew = False):
+ """Initialize the protocol.
+ @type addr: (C{string}, C{int})
+ @param addr: the IP address and port of the source node
+ @type server: L{khashmir.Khashmir}
+ @param server: the main Khashmir program
+ @param transport: the transport to use for the protocol
+ @type spew: C{boolean}
+ @param spew: whether to log additional details of the protocol
+ (optional, defaults to False)
+ """
self.transport = transport
self.factory = server
self.addr = addr
self.stopped = False
def datagramReceived(self, data, addr):
+ """Process the new datagram.
+ @type data: C{string}
+ @param data: the data received from the transport.
+ @type addr: (C{string}, C{int})
+ @param addr: source IP address and port of datagram.
+ """
if self.stopped:
if self.noisy:
log.msg("stopped, dropping message from %r: %s" % (addr, data))
- # bdecode
+ # Bdecode the message
msg = bdecode(data)
except Exception, e:
+ # Make sure the remote node isn't trying anything funny
except Exception, e:
if self.noisy:
log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
- # look at msg type
+ # Process it based on its type
if msg[TYP] == REQ:
ilen = len(data)
- # if request
- # tell factory to handle
+ # Requests are handled by the factory
f = getattr(self.factory ,"krpc_" + msg[REQ], None)
msg[ARG]['_krpc_sender'] = self.addr
if f and callable(f):
olen = self._sendResponse(addr, msg[TID], RSP, ret)
- # unknown method
+ # Request for unknown method
log.msg("ERROR: don't know about method %s" % msg[REQ])
olen = self._sendResponse(addr, msg[TID], ERR,
[KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
ilen, msg[REQ], olen))
elif msg[TYP] == RSP:
- # if response
- # lookup tid
+ # Responses get processed by their TID's deferred
if self.tids.has_key(msg[TID]):
df = self.tids[msg[TID]]
# callback
if self.noisy:
log.msg('timeout: %r' % msg[RSP]['id'])
elif msg[TYP] == ERR:
- # if error
- # lookup tid
+ # Errors get processed by their TID's deferred's errback
if self.tids.has_key(msg[TID]):
df = self.tids[msg[TID]]
log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
+ # Received an unknown message type
if self.noisy:
log.msg("unknown message type: %r" % msg)
- # unknown message type
if msg[TID] in self.tids:
df = self.tids[msg[TID]]
"Received an unknown message type: %r" % msg[TYP]))
def _sendResponse(self, addr, tid, msgType, response):
+ """Helper function for sending responses to nodes.
+ @type addr: (C{string}, C{int})
+ @param addr: source IP address and port of datagram.
+ @param tid: the transaction ID of the request
+ @param msgType: the type of message to respond with
+ @param response: the arguments for the response
+ """
if not response:
response = {}
+ # Create the response message
msg = {TID : tid, TYP : msgType, msgType : response}
if self.noisy:
out = bencode(msg)
+ # Make sure its not too long
if len(out) > UDP_PACKET_LIMIT:
+ # Can we remove some values to shorten it?
if 'values' in response:
# Save the original list of values
orig_values = response['values']
return len(out)
def sendRequest(self, method, args):
+ """Send a request to the remote node.
+ @type method: C{string}
+ @param method: the methiod name to call on the remote node
+ @param args: the arguments to send to the remote node's method
+ """
if self.stopped:
raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
- # make message
- # send it
+ # Create the request message
msg = {TID : newID(), TYP : REQ, REQ : method, ARG : args}
if self.noisy:
log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
data = bencode(msg)
+ # Create the deferred and save it with the TID
d = Deferred()
self.tids[msg[TID]] = d
+ # Schedule a later timeout call
def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr):
+ """Call the deferred's errback if a timeout occurs."""
if tids.has_key(id):
df = tids[id]
df.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % (method, addr)))
later = reactor.callLater(KRPC_TIMEOUT, timeOut)
+ # Cancel the timeout call if a response is received
def dropTimeOut(dict, later_call = later):
+ """Cancel the timeout call when a response is received."""
if later_call.active():
return dict
self.transport.write(data, self.addr)
return d
df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, 'connection has been stopped while waiting for response'))
self.tids = {}
self.stopped = True
+#{ For testing the KRPC protocol
def connectionForAddr(host, port):
return host
def gotLongRsp(self, dict):
# Not quite accurate, but good enough
self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)
\ No newline at end of file