Document the DHT's main khashmir module.
authorCameron Dale <camrdale@gmail.com>
Wed, 5 Mar 2008 23:03:11 +0000 (15:03 -0800)
committerCameron Dale <camrdale@gmail.com>
Wed, 5 Mar 2008 23:03:11 +0000 (15:03 -0800)
apt_dht_Khashmir/khashmir.py

index eeaab0a..126a30e 100644 (file)
@@ -1,6 +1,8 @@
 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
 # see LICENSE.txt for license information
 
+"""The main Khashmir program."""
+
 import warnings
 warnings.simplefilter("ignore", DeprecationWarning)
 
@@ -20,155 +22,257 @@ from khash import newID, newIDInRange
 from actions import FindNode, FindValue, GetValue, StoreValue
 import krpc
 
-# this is the base class, has base functionality and find node, no key-value mappings
 class KhashmirBase(protocol.Factory):
+    """The base Khashmir class, with base functionality and find node, no key-value mappings.
+    
+    @type _Node: L{node.Node}
+    @ivar _Node: the knode implementation to use for this class of DHT
+    @type config: C{dictionary}
+    @ivar config: the configuration parameters for the DHT
+    @type port: C{int}
+    @ivar port: the port to listen on
+    @type store: L{db.DB}
+    @ivar store: the database to store nodes and key/value pairs in
+    @type node: L{node.Node}
+    @ivar node: this node
+    @type table: L{ktable.KTable}
+    @ivar table: the routing table
+    @type token_secrets: C{list} of C{string}
+    @ivar token_secrets: the current secrets to use to create tokens
+    @type udp: L{krpc.hostbroker}
+    @ivar udp: the factory for the KRPC protocol
+    @type listenport: L{twisted.internet.interfaces.IListeningPort}
+    @ivar listenport: the UDP listening port
+    @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
+    @ivar next_checkpoint: the delayed call for the next checkpoint
+    """
+    
     _Node = KNodeBase
+    
     def __init__(self, config, cache_dir='/tmp'):
+        """Initialize the Khashmir class and call the L{setup} method.
+        
+        @type config: C{dictionary}
+        @param config: the configuration parameters for the DHT
+        @type cache_dir: C{string}
+        @param cache_dir: the directory to store all files in
+            (optional, defaults to the /tmp directory)
+        """
         self.config = None
         self.setup(config, cache_dir)
         
     def setup(self, config, cache_dir):
+        """Setup all the Khashmir sub-modules.
+        
+        @type config: C{dictionary}
+        @param config: the configuration parameters for the DHT
+        @type cache_dir: C{string}
+        @param cache_dir: the directory to store all files in
+        """
         self.config = config
         self.port = config['PORT']
         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
         self.token_secrets = [newID()]
-        #self.app = service.Application("krpc")
+        
+        # Start listening
         self.udp = krpc.hostbroker(self, config)
         self.udp.protocol = krpc.KRPC
         self.listenport = reactor.listenUDP(self.port, self.udp)
+        
+        # Load the routing table and begin checkpointing
         self._loadRoutingTable()
-        self.refreshTable(force=1)
-        self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
+        self.refreshTable(force = True)
+        self.next_checkpoint = reactor.callLater(60, self.checkpoint)
 
     def Node(self, id, host = None, port = None):
-        """Create a new node."""
+        """Create a new node.
+        
+        @see: L{node.Node.__init__}
+        """
         n = self._Node(id, host, port)
         n.table = self.table
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         return n
     
     def __del__(self):
+        """Stop listening for packets."""
         self.listenport.stopListening()
         
     def _loadSelfNode(self, host, port):
+        """Create this node, loading any previously saved one."""
         id = self.store.getSelfNode()
         if not id:
             id = newID()
         return self._Node(id, host, port)
         
-    def checkpoint(self, auto=0):
+    def checkpoint(self):
+        """Perform some periodic maintenance operations."""
+        # Create a new token secret
         self.token_secrets.insert(0, newID())
         if len(self.token_secrets) > 3:
             self.token_secrets.pop()
+            
+        # Save some parameters for reloading
         self.store.saveSelfNode(self.node.id)
         self.store.dumpRoutingTable(self.table.buckets)
+        
+        # DHT maintenance
         self.store.expireValues(self.config['KEY_EXPIRE'])
         self.refreshTable()
-        if auto:
-            self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
-                                        int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
-                              self.checkpoint, (1,))
+        
+        self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
+                                                           int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
+                                                 self.checkpoint)
         
     def _loadRoutingTable(self):
-        """
-            load routing table nodes from database
-            it's usually a good idea to call refreshTable(force=1) after loading the table
+        """Load the previous routing table nodes from the database.
+        
+        It's usually a good idea to call refreshTable(force = True) after
+        loading the table.
         """
         nodes = self.store.getRoutingTable()
         for rec in nodes:
             n = self.Node(rec[0], rec[1], int(rec[2]))
-            self.table.insertNode(n, contacted=0)
+            self.table.insertNode(n, contacted = False)
             
-
-    #######
-    #######  LOCAL INTERFACE    - use these methods!
+    #{ Local interface
     def addContact(self, host, port, callback=None, errback=None):
-        """
-            ping this node and add the contact info to the table on pong!
+        """Ping this node and add the contact info to the table on pong.
+        
+        @type host: C{string}
+        @param host: the IP address of the node to contact
+        @type port: C{int}
+        @param port:the port of the node to contact
+        @type callback: C{method}
+        @param callback: the method to call with the results, it must take 1
+            parameter, the contact info returned by the node
+            (optional, defaults to doing nothing with the results)
+        @type errback: C{method}
+        @param errback: the method to call if an error occurs
+            (optional, defaults to calling the callback with None)
         """
         n = self.Node(NULL_ID, host, port)
         self.sendJoin(n, callback=callback, errback=errback)
 
-    ## this call is async!
     def findNode(self, id, callback, errback=None):
-        """ returns the contact info for node, or the k closest nodes, from the global table """
-        # get K nodes out of local table/cache, or the node we want
+        """Find the contact info for the K closest nodes in the global table.
+        
+        @type id: C{string}
+        @param id: the target ID to find the K closest nodes of
+        @type callback: C{method}
+        @param callback: the method to call with the results, it must take 1
+            parameter, the list of K closest nodes
+        @type errback: C{method}
+        @param errback: the method to call if an error occurs
+            (optional, defaults to doing nothing when an error occurs)
+        """
+        # Get K nodes out of local table/cache
         nodes = self.table.findNodes(id)
         d = Deferred()
         if errback:
             d.addCallbacks(callback, errback)
         else:
             d.addCallback(callback)
-        if len(nodes) == 1 and nodes[0].id == id :
+
+        # If the target ID was found
+        if len(nodes) == 1 and nodes[0].id == id:
             d.callback(nodes)
         else:
-            # create our search state
+            # Start the finding nodes action
             state = FindNode(self, id, d.callback, self.config)
             reactor.callLater(0, state.goWithNodes, nodes)
     
-    def insertNode(self, n, contacted=1):
-        """
-        insert a node in our local table, pinging oldest contact in bucket, if necessary
+    def insertNode(self, node, contacted = True):
+        """Try to insert a node in our local table, pinging oldest contact if necessary.
         
-        If all you have is a host/port, then use addContact, which calls this method after
-        receiving the PONG from the remote node.  The reason for the seperation is we can't insert
-        a node into the table without it's peer-ID.  That means of course the node passed into this
-        method needs to be a properly formed Node object with a valid ID.
+        If all you have is a host/port, then use L{addContact}, which calls this
+        method after receiving the PONG from the remote node. The reason for
+        the seperation is we can't insert a node into the table without its
+        node ID. That means of course the node passed into this method needs
+        to be a properly formed Node object with a valid ID.
+
+        @type node: L{node.Node}
+        @param node: the new node to try and insert
+        @type contacted: C{boolean}
+        @param contacted: whether the new node is known to be good, i.e.
+            responded to a request (optional, defaults to True)
         """
-        old = self.table.insertNode(n, contacted=contacted)
+        old = self.table.insertNode(node, contacted=contacted)
         if (old and old.id != self.node.id and
             (datetime.now() - old.lastSeen) > 
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
-            # the bucket is full, check to see if old node is still around and if so, replace it
             
-            ## these are the callbacks used when we ping the oldest node in a bucket
-            def _staleNodeHandler(oldnode=old, newnode = n):
-                """ called if the pinged node never responds """
-                self.table.replaceStaleNode(old, newnode)
+            def _staleNodeHandler(oldnode = old, newnode = node):
+                """The pinged node never responded, so replace it."""
+                self.table.replaceStaleNode(oldnode, newnode)
             
             def _notStaleNodeHandler(dict, old=old):
-                """ called when we get a pong from the old node """
+                """Got a pong from the old node, so update it."""
                 dict = dict['rsp']
                 if dict['id'] == old.id:
                     self.table.justSeenNode(old.id)
             
+            # Bucket is full, check to see if old node is still available
             df = old.ping(self.node.id)
             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
     def sendJoin(self, node, callback=None, errback=None):
+        """Join the DHT by pinging a bootstrap node.
+        
+        @type node: L{node.Node}
+        @param node: the node to send the join to
+        @type callback: C{method}
+        @param callback: the method to call with the results, it must take 1
+            parameter, the contact info returned by the node
+            (optional, defaults to doing nothing with the results)
+        @type errback: C{method}
+        @param errback: the method to call if an error occurs
+            (optional, defaults to calling the callback with None)
         """
-            ping a node
-        """
-        df = node.join(self.node.id)
-        ## these are the callbacks we use when we issue a PING
+
         def _pongHandler(dict, node=node, self=self, callback=callback):
+            """Node responded properly, callback with response."""
             n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
             self.insertNode(n)
             if callback:
                 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
+
         def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
+            """Error occurred, fail node and errback or callback with error."""
             table.nodeFailed(node)
             if errback:
                 errback()
-            else:
+            elif callback:
                 callback(None)
         
-        df.addCallbacks(_pongHandler,_defaultPong)
+        df = node.join(self.node.id)
+        df.addCallbacks(_pongHandler, _defaultPong)
 
     def findCloseNodes(self, callback=lambda a: None, errback = None):
-        """
-            This does a findNode on the ID one away from our own.  
-            This will allow us to populate our table with nodes on our network closest to our own.
-            This is called as soon as we start up with an empty table
+        """Perform a findNode on the ID one away from our own.
+
+        This will allow us to populate our table with nodes on our network
+        closest to our own. This is called as soon as we start up with an
+        empty table.
+
+        @type callback: C{method}
+        @param callback: the method to call with the results, it must take 1
+            parameter, the list of K closest nodes
+            (optional, defaults to doing nothing with the results)
+        @type errback: C{method}
+        @param errback: the method to call if an error occurs
+            (optional, defaults to doing nothing when an error occurs)
         """
         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
         self.findNode(id, callback, errback)
 
-    def refreshTable(self, force=0):
-        """
-            force=1 will refresh table regardless of last bucket access time
+    def refreshTable(self, force = False):
+        """Check all the buckets for those that need refreshing.
+        
+        @param force: refresh all buckets regardless of last bucket access time
+            (optional, defaults to False)
         """
         def callback(nodes):
             pass
@@ -176,14 +280,16 @@ class KhashmirBase(protocol.Factory):
         for bucket in self.table.buckets:
             if force or (datetime.now() - bucket.lastAccessed > 
                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
+                # Choose a random ID in the bucket and try and find it
                 id = newIDInRange(bucket.min, bucket.max)
                 self.findNode(id, callback)
 
     def stats(self):
-        """
-        Returns (num_contacts, num_nodes)
-        num_contacts: number contacts in our routing table
-        num_nodes: number of nodes estimated in the entire dht
+        """Collect some statistics about the DHT.
+        
+        @rtype: (C{int}, C{int})
+        @return: the number contacts in our routing table, and the estimated
+            number of nodes in the entire DHT
         """
         num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
         num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
@@ -198,35 +304,71 @@ class KhashmirBase(protocol.Factory):
             pass
         self.store.close()
 
-    #### Remote Interface - called by remote nodes
+    #{ Remote interface
     def krpc_ping(self, id, _krpc_sender):
+        """Pong with our ID.
+        
+        @type id: C{string}
+        @param id: the node ID of the sender node
+        @type _krpc_sender: (C{string}, C{int})
+        @param _krpc_sender: the sender node's IP address and port
+        """
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted=0)
+        self.insertNode(n, contacted = False)
+
         return {"id" : self.node.id}
         
     def krpc_join(self, id, _krpc_sender):
+        """Add the node by responding with its address and port.
+        
+        @type id: C{string}
+        @param id: the node ID of the sender node
+        @type _krpc_sender: (C{string}, C{int})
+        @param _krpc_sender: the sender node's IP address and port
+        """
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted=0)
+        self.insertNode(n, contacted = False)
+
         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
         
     def krpc_find_node(self, target, id, _krpc_sender):
+        """Find the K closest nodes to the target in the local routing table.
+        
+        @type target: C{string}
+        @param target: the target ID to find nodes for
+        @type id: C{string}
+        @param id: the node ID of the sender node
+        @type _krpc_sender: (C{string}, C{int})
+        @param _krpc_sender: the sender node's IP address and port
+        """
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted=0)
+        self.insertNode(n, contacted = False)
+
         nodes = self.table.findNodes(target)
         nodes = map(lambda node: node.contactInfo(), nodes)
         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
 
 
-## This class provides read-only access to the DHT, valueForKey
-## you probably want to use this mixin and provide your own write methods
 class KhashmirRead(KhashmirBase):
+    """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
+
     _Node = KNodeRead
 
-    ## also async
+    #{ Local interface
     def findValue(self, key, callback, errback=None):
-        """ returns the contact info for nodes that have values for the key, from the global table """
-        # get K nodes out of local table/cache
+        """Get the nodes that have values for the key from the global table.
+        
+        @type key: C{string}
+        @param key: the target key to find the values for
+        @type callback: C{method}
+        @param callback: the method to call with the results, it must take 1
+            parameter, the list of nodes with values
+        @type errback: C{method}
+        @param errback: the method to call if an error occurs
+            (optional, defaults to doing nothing when an error occurs)
+        """
+        # Get K nodes out of local table/cache
         nodes = self.table.findNodes(key)
         d = Deferred()
         if errback:
@@ -234,16 +376,25 @@ class KhashmirRead(KhashmirBase):
         else:
             d.addCallback(callback)
 
-        # create our search state
+        # Search for others starting with the locally found ones
         state = FindValue(self, key, d.callback, self.config)
         reactor.callLater(0, state.goWithNodes, nodes)
 
-    def valueForKey(self, key, callback, searchlocal = 1):
-        """ returns the values found for key in global table
-            callback will be called with a list of values for each peer that returns unique values
-            final callback will be an empty list - probably should change to 'more coming' arg
+    def valueForKey(self, key, callback, searchlocal = True):
+        """Get the values found for key in global table.
+        
+        Callback will be called with a list of values for each peer that
+        returns unique values. The final callback will be an empty list.
+
+        @type key: C{string}
+        @param key: the target key to get the values for
+        @type callback: C{method}
+        @param callback: the method to call with the results, it must take 2
+            parameters: the key, and the values found
+        @type searchlocal: C{boolean}
+        @param searchlocal: whether to also look for any local values
         """
-        # get locals
+        # Get any local values
         if searchlocal:
             l = self.store.retrieveValues(key)
             if len(l) > 0:
@@ -252,17 +403,26 @@ class KhashmirRead(KhashmirBase):
             l = []
 
         def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
-            # create our search state
+            """Use the found nodes to send requests for values to."""
             state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
             reactor.callLater(0, state.goWithNodes, nodes)
             
-        # this call is asynch
+        # First lookup nodes that have values for the key
         self.findValue(key, _getValueForKey)
 
-    #### Remote Interface - called by remote nodes
+    #{ Remote interface
     def krpc_find_value(self, key, id, _krpc_sender):
+        """Find the number of values stored locally for the key, and the K closest nodes.
+        
+        @type key: C{string}
+        @param key: the target key to find the values and nodes for
+        @type id: C{string}
+        @param id: the node ID of the sender node
+        @type _krpc_sender: (C{string}, C{int})
+        @param _krpc_sender: the sender node's IP address and port
+        """
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted=0)
+        self.insertNode(n, contacted = False)
     
         nodes = self.table.findNodes(key)
         nodes = map(lambda node: node.contactInfo(), nodes)
@@ -270,8 +430,20 @@ class KhashmirRead(KhashmirBase):
         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
 
     def krpc_get_value(self, key, num, id, _krpc_sender):
+        """Retrieve the values stored locally for the key.
+        
+        @type key: C{string}
+        @param key: the target key to retrieve the values for
+        @type num: C{int}
+        @param num: the maximum number of values to retrieve, or 0 to
+            retrieve all of them
+        @type id: C{string}
+        @param id: the node ID of the sender node
+        @type _krpc_sender: (C{string}, C{int})
+        @param _krpc_sender: the sender node's IP address and port
+        """
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted=0)
+        self.insertNode(n, contacted = False)
     
         l = self.store.retrieveValues(key)
         if num == 0 or num >= len(l):
@@ -280,32 +452,57 @@ class KhashmirRead(KhashmirBase):
             shuffle(l)
             return {'values' : l[:num], "id": self.node.id}
 
-###  provides a generic write method, you probably don't want to deploy something that allows
-###  arbitrary value storage
+
 class KhashmirWrite(KhashmirRead):
+    """The read-write Khashmir class, which can store and retrieve key/value mappings."""
+
     _Node = KNodeWrite
-    ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+
+    #{ Local interface
     def storeValueForKey(self, key, value, callback=None):
-        """ stores the value and origination time for key in the global table, returns immediately, no status 
-            in this implementation, peers respond but don't indicate status to storing values
-            a key can have many values
+        """Stores the value for the key in the global table.
+        
+        No status in this implementation, peers respond but don't indicate
+        status of storing values.
+
+        @type key: C{string}
+        @param key: the target key to store the value for
+        @type value: C{string}
+        @param value: the value to store with the key
+        @type callback: C{method}
+        @param callback: the method to call with the results, it must take 3
+            parameters: the key, the value stored, and the result of the store
+            (optional, defaults to doing nothing with the results)
         """
         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
+            """Use the returned K closest nodes to store the key at."""
             if not response:
-                # default callback
                 def _storedValueHandler(key, value, sender):
+                    """Default callback that does nothing."""
                     pass
-                response=_storedValueHandler
+                response = _storedValueHandler
             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
             reactor.callLater(0, action.goWithNodes, nodes)
             
-        # this call is asynch
+        # First find the K closest nodes to operate on.
         self.findNode(key, _storeValueForKey)
                     
-    #### Remote Interface - called by remote nodes
+    #{ Remote interface
     def krpc_store_value(self, key, value, token, id, _krpc_sender):
+        """Store the value locally with the key.
+        
+        @type key: C{string}
+        @param key: the target key to store the value for
+        @type value: C{string}
+        @param value: the value to store with the key
+        @param token: the token to confirm that this peer contacted us previously
+        @type id: C{string}
+        @param id: the node ID of the sender node
+        @type _krpc_sender: (C{string}, C{int})
+        @param _krpc_sender: the sender node's IP address and port
+        """
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted=0)
+        self.insertNode(n, contacted = False)
         for secret in self.token_secrets:
             this_token = sha(secret + _krpc_sender[0]).digest()
             if token == this_token:
@@ -313,10 +510,12 @@ class KhashmirWrite(KhashmirRead):
                 return {"id" : self.node.id}
         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
 
-# the whole shebang, for testing
+
 class Khashmir(KhashmirWrite):
+    """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
     _Node = KNodeWrite
 
+
 class SimpleTests(unittest.TestCase):
     
     timeout = 10
@@ -328,7 +527,6 @@ class SimpleTests(unittest.TestCase):
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def setUp(self):
-        krpc.KRPC.noisy = 0
         d = self.DHT_DEFAULTS.copy()
         d['PORT'] = 4044
         self.a = Khashmir(d)