Document the DHT's krpc module.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / krpc.py
index 14000309ceaf441a4261706f29803e6d5803c29d..a4fbacc0a1fb1a6ed3b1744d8900c32ee36594ef 100644 (file)
@@ -1,6 +1,42 @@
 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
 # see LICENSE.txt for license information
 
+"""The KRPC communication protocol implementation.
+
+@var KRPC_TIMEOUT: the number of seconds after which requests timeout
+@var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
+    UDP packet without fragmentation
+
+@var KRPC_ERROR: the code for a generic error
+@var KRPC_ERROR_SERVER_ERROR: the code for a server error
+@var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
+@var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
+@var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
+@var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
+@var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
+
+@var KRPC_ERROR_INTERNAL: the code for an internal error
+@var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
+@var KRPC_ERROR_TIMEOUT: the code for a timeout error
+@var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
+
+@var TID: the identifier for the transaction ID
+@var REQ: the identifier for a request packet
+@var RSP: the identifier for a response packet
+@var TYP: the identifier for the type of packet
+@var ARG: the identifier for the argument to the request
+@var ERR: the identifier for an error packet
+
+@group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
+    KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
+    KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
+    KRPC_ERROR_RESPONSE_TOO_LONG
+@group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
+    KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
+@group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
+
+"""
+
 from bencode import bencode, bdecode
 from time import asctime
 from math import ceil
@@ -39,6 +75,7 @@ ARG = 'a'
 ERR = 'e'
 
 class KrpcError(Exception):
+    """An error occurred in the KRPC protocol."""
     pass
 
 def verifyMessage(msg):
@@ -85,24 +122,60 @@ def verifyMessage(msg):
     if type(msg[TID]) != str:
         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
 
-class hostbroker(protocol.DatagramProtocol):       
+class hostbroker(protocol.DatagramProtocol):
+    """The factory for the KRPC protocol.
+    
+    @type server: L{khashmir.Khashmir}
+    @ivar server: the main Khashmir program
+    @type config: C{dictionary}
+    @ivar config: the configuration parameters for the DHT
+    @type connections: C{dictionary}
+    @ivar connections: all the connections that have ever been made to the
+        protocol, keys are IP address and port pairs, values are L{KRPC}
+        protocols for the addresses
+    @ivar protocol: the protocol to use to handle incoming connections
+        (added externally)
+    @type addr: (C{string}, C{int})
+    @ivar addr: the IP address and port of this node
+    """
+    
     def __init__(self, server, config):
+        """Initialize the factory.
+        
+        @type server: L{khashmir.Khashmir}
+        @param server: the main DHT program
+        @type config: C{dictionary}
+        @param config: the configuration parameters for the DHT
+        """
         self.server = server
         self.config = config
         # this should be changed to storage that drops old entries
         self.connections = {}
         
     def datagramReceived(self, datagram, addr):
-        #print `addr`, `datagram`
-        #if addr != self.addr:
+        """Optionally create a new protocol object, and handle the new datagram.
+        
+        @type datagram: C{string}
+        @param datagram: the data received from the transport.
+        @type addr: (C{string}, C{int})
+        @param addr: source IP address and port of datagram.
+        """
         c = self.connectionForAddr(addr)
         c.datagramReceived(datagram, addr)
         #if c.idle():
         #    del self.connections[addr]
 
     def connectionForAddr(self, addr):
+        """Get a protocol object for the source.
+        
+        @type addr: (C{string}, C{int})
+        @param addr: source IP address and port of datagram.
+        """
+        # Don't connect to ourself
         if addr == self.addr:
-            raise Exception
+            raise KrcpError
+        
+        # Create a new protocol object if necessary
         if not self.connections.has_key(addr):
             conn = self.protocol(addr, self.server, self.transport, self.config['SPEW'])
             self.connections[addr] = conn
@@ -111,18 +184,47 @@ class hostbroker(protocol.DatagramProtocol):
         return conn
 
     def makeConnection(self, transport):
+        """Make a connection to a transport and save our address."""
         protocol.DatagramProtocol.makeConnection(self, transport)
         tup = transport.getHost()
         self.addr = (tup.host, tup.port)
         
     def stopProtocol(self):
+        """Stop all the open connections."""
         for conn in self.connections.values():
             conn.stop()
         protocol.DatagramProtocol.stopProtocol(self)
 
-## connection
 class KRPC:
+    """The KRPC protocol implementation.
+    
+    @ivar transport: the transport to use for the protocol
+    @type factory: L{khashmir.Khashmir}
+    @ivar factory: the main Khashmir program
+    @type addr: (C{string}, C{int})
+    @ivar addr: the IP address and port of the source node
+    @type noisy: C{boolean}
+    @ivar noisy: whether to log additional details of the protocol
+    @type tids: C{dictionary}
+    @ivar tids: the transaction IDs outstanding for requests, keys are the
+        transaction ID of the request, values are the deferreds to call with
+        the results
+    @type stopped: C{boolean}
+    @ivar stopped: whether the protocol has been stopped
+    """
+    
     def __init__(self, addr, server, transport, spew = False):
+        """Initialize the protocol.
+        
+        @type addr: (C{string}, C{int})
+        @param addr: the IP address and port of the source node
+        @type server: L{khashmir.Khashmir}
+        @param server: the main Khashmir program
+        @param transport: the transport to use for the protocol
+        @type spew: C{boolean}
+        @param spew: whether to log additional details of the protocol
+            (optional, defaults to False)
+        """
         self.transport = transport
         self.factory = server
         self.addr = addr
@@ -131,10 +233,18 @@ class KRPC:
         self.stopped = False
 
     def datagramReceived(self, data, addr):
+        """Process the new datagram.
+        
+        @type data: C{string}
+        @param data: the data received from the transport.
+        @type addr: (C{string}, C{int})
+        @param addr: source IP address and port of datagram.
+        """
         if self.stopped:
             if self.noisy:
                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
-        # bdecode
+
+        # Bdecode the message
         try:
             msg = bdecode(data)
         except Exception, e:
@@ -143,6 +253,7 @@ class KRPC:
                 log.err(e)
             return
 
+        # Make sure the remote node isn't trying anything funny
         try:
             verifyMessage(msg)
         except Exception, e:
@@ -152,11 +263,12 @@ class KRPC:
 
         if self.noisy:
             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
-        # look at msg type
+
+        # Process it based on its type
         if msg[TYP]  == REQ:
             ilen = len(data)
-            # if request
-            #  tell factory to handle
+            
+            # Requests are handled by the factory
             f = getattr(self.factory ,"krpc_" + msg[REQ], None)
             msg[ARG]['_krpc_sender'] =  self.addr
             if f and callable(f):
@@ -179,7 +291,7 @@ class KRPC:
                 else:
                     olen = self._sendResponse(addr, msg[TID], RSP, ret)
             else:
-                # unknown method
+                # Request for unknown method
                 log.msg("ERROR: don't know about method %s" % msg[REQ])
                 olen = self._sendResponse(addr, msg[TID], ERR,
                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
@@ -187,8 +299,7 @@ class KRPC:
                 log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
                                                   ilen, msg[REQ], olen))
         elif msg[TYP] == RSP:
-            # if response
-            #  lookup tid
+            # Responses get processed by their TID's deferred
             if self.tids.has_key(msg[TID]):
                 df = self.tids[msg[TID]]
                 #      callback
@@ -199,8 +310,7 @@ class KRPC:
                 if self.noisy:
                     log.msg('timeout: %r' % msg[RSP]['id'])
         elif msg[TYP] == ERR:
-            # if error
-            #  lookup tid
+            # Errors get processed by their TID's deferred's errback
             if self.tids.has_key(msg[TID]):
                 df = self.tids[msg[TID]]
                 del(self.tids[msg[TID]])
@@ -211,9 +321,9 @@ class KRPC:
                 log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
                 pass
         else:
+            # Received an unknown message type
             if self.noisy:
                 log.msg("unknown message type: %r" % msg)
-            # unknown message type
             if msg[TID] in self.tids:
                 df = self.tids[msg[TID]]
                 del(self.tids[msg[TID]])
@@ -222,10 +332,19 @@ class KRPC:
                                      "Received an unknown message type: %r" % msg[TYP]))
                 
     def _sendResponse(self, addr, tid, msgType, response):
+        """Helper function for sending responses to nodes.
+        
+        @type addr: (C{string}, C{int})
+        @param addr: source IP address and port of datagram.
+        @param tid: the transaction ID of the request
+        @param msgType: the type of message to respond with
+        @param response: the arguments for the response
+        """
         if not response:
             response = {}
         
         try:
+            # Create the response message
             msg = {TID : tid, TYP : msgType, msgType : response}
     
             if self.noisy:
@@ -233,7 +352,9 @@ class KRPC:
     
             out = bencode(msg)
             
+            # Make sure its not too long
             if len(out) > UDP_PACKET_LIMIT:
+                # Can we remove some values to shorten it?
                 if 'values' in response:
                     # Save the original list of values
                     orig_values = response['values']
@@ -279,27 +400,42 @@ class KRPC:
         return len(out)
     
     def sendRequest(self, method, args):
+        """Send a request to the remote node.
+        
+        @type method: C{string}
+        @param method: the methiod name to call on the remote node
+        @param args: the arguments to send to the remote node's method
+        """
         if self.stopped:
             raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
-        # make message
-        # send it
+
+        # Create the request message
         msg = {TID : newID(), TYP : REQ,  REQ : method, ARG : args}
         if self.noisy:
             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
         data = bencode(msg)
+        
+        # Create the deferred and save it with the TID
         d = Deferred()
         self.tids[msg[TID]] = d
+
+        # Schedule a later timeout call
         def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr):
+            """Call the deferred's errback if a timeout occurs."""
             if tids.has_key(id):
                 df = tids[id]
                 del(tids[id])
                 df.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % (method, addr)))
         later = reactor.callLater(KRPC_TIMEOUT, timeOut)
+        
+        # Cancel the timeout call if a response is received
         def dropTimeOut(dict, later_call = later):
+            """Cancel the timeout call when a response is received."""
             if later_call.active():
                 later_call.cancel()
             return dict
         d.addBoth(dropTimeOut)
+        
         self.transport.write(data, self.addr)
         return d
     
@@ -309,7 +445,8 @@ class KRPC:
             df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, 'connection has been stopped while waiting for response'))
         self.tids = {}
         self.stopped = True
+
+#{ For testing the KRPC protocol
 def connectionForAddr(host, port):
     return host
     
@@ -421,5 +558,4 @@ class KRPCTests(unittest.TestCase):
     def gotLongRsp(self, dict):
         # Not quite accurate, but good enough
         self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)
-
         
\ No newline at end of file