From d78e24749831bf3803e5fbc217c442d3e93912e9 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Wed, 5 Mar 2008 15:03:11 -0800 Subject: [PATCH] Document the DHT's main khashmir module. --- apt_dht_Khashmir/khashmir.py | 374 ++++++++++++++++++++++++++--------- 1 file changed, 286 insertions(+), 88 deletions(-) diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index eeaab0a..126a30e 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -1,6 +1,8 @@ ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information +"""The main Khashmir program.""" + import warnings warnings.simplefilter("ignore", DeprecationWarning) @@ -20,155 +22,257 @@ from khash import newID, newIDInRange from actions import FindNode, FindValue, GetValue, StoreValue import krpc -# this is the base class, has base functionality and find node, no key-value mappings class KhashmirBase(protocol.Factory): + """The base Khashmir class, with base functionality and find node, no key-value mappings. + + @type _Node: L{node.Node} + @ivar _Node: the knode implementation to use for this class of DHT + @type config: C{dictionary} + @ivar config: the configuration parameters for the DHT + @type port: C{int} + @ivar port: the port to listen on + @type store: L{db.DB} + @ivar store: the database to store nodes and key/value pairs in + @type node: L{node.Node} + @ivar node: this node + @type table: L{ktable.KTable} + @ivar table: the routing table + @type token_secrets: C{list} of C{string} + @ivar token_secrets: the current secrets to use to create tokens + @type udp: L{krpc.hostbroker} + @ivar udp: the factory for the KRPC protocol + @type listenport: L{twisted.internet.interfaces.IListeningPort} + @ivar listenport: the UDP listening port + @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall} + @ivar next_checkpoint: the delayed call for the next checkpoint + """ + _Node = KNodeBase + def __init__(self, config, cache_dir='/tmp'): + """Initialize the Khashmir class and call the L{setup} method. + + @type config: C{dictionary} + @param config: the configuration parameters for the DHT + @type cache_dir: C{string} + @param cache_dir: the directory to store all files in + (optional, defaults to the /tmp directory) + """ self.config = None self.setup(config, cache_dir) def setup(self, config, cache_dir): + """Setup all the Khashmir sub-modules. + + @type config: C{dictionary} + @param config: the configuration parameters for the DHT + @type cache_dir: C{string} + @param cache_dir: the directory to store all files in + """ self.config = config self.port = config['PORT'] self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db')) self.node = self._loadSelfNode('', self.port) self.table = KTable(self.node, config) self.token_secrets = [newID()] - #self.app = service.Application("krpc") + + # Start listening self.udp = krpc.hostbroker(self, config) self.udp.protocol = krpc.KRPC self.listenport = reactor.listenUDP(self.port, self.udp) + + # Load the routing table and begin checkpointing self._loadRoutingTable() - self.refreshTable(force=1) - self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,)) + self.refreshTable(force = True) + self.next_checkpoint = reactor.callLater(60, self.checkpoint) def Node(self, id, host = None, port = None): - """Create a new node.""" + """Create a new node. + + @see: L{node.Node.__init__} + """ n = self._Node(id, host, port) n.table = self.table n.conn = self.udp.connectionForAddr((n.host, n.port)) return n def __del__(self): + """Stop listening for packets.""" self.listenport.stopListening() def _loadSelfNode(self, host, port): + """Create this node, loading any previously saved one.""" id = self.store.getSelfNode() if not id: id = newID() return self._Node(id, host, port) - def checkpoint(self, auto=0): + def checkpoint(self): + """Perform some periodic maintenance operations.""" + # Create a new token secret self.token_secrets.insert(0, newID()) if len(self.token_secrets) > 3: self.token_secrets.pop() + + # Save some parameters for reloading self.store.saveSelfNode(self.node.id) self.store.dumpRoutingTable(self.table.buckets) + + # DHT maintenance self.store.expireValues(self.config['KEY_EXPIRE']) self.refreshTable() - if auto: - self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), - int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), - self.checkpoint, (1,)) + + self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), + int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), + self.checkpoint) def _loadRoutingTable(self): - """ - load routing table nodes from database - it's usually a good idea to call refreshTable(force=1) after loading the table + """Load the previous routing table nodes from the database. + + It's usually a good idea to call refreshTable(force = True) after + loading the table. """ nodes = self.store.getRoutingTable() for rec in nodes: n = self.Node(rec[0], rec[1], int(rec[2])) - self.table.insertNode(n, contacted=0) + self.table.insertNode(n, contacted = False) - - ####### - ####### LOCAL INTERFACE - use these methods! + #{ Local interface def addContact(self, host, port, callback=None, errback=None): - """ - ping this node and add the contact info to the table on pong! + """Ping this node and add the contact info to the table on pong. + + @type host: C{string} + @param host: the IP address of the node to contact + @type port: C{int} + @param port:the port of the node to contact + @type callback: C{method} + @param callback: the method to call with the results, it must take 1 + parameter, the contact info returned by the node + (optional, defaults to doing nothing with the results) + @type errback: C{method} + @param errback: the method to call if an error occurs + (optional, defaults to calling the callback with None) """ n = self.Node(NULL_ID, host, port) self.sendJoin(n, callback=callback, errback=errback) - ## this call is async! def findNode(self, id, callback, errback=None): - """ returns the contact info for node, or the k closest nodes, from the global table """ - # get K nodes out of local table/cache, or the node we want + """Find the contact info for the K closest nodes in the global table. + + @type id: C{string} + @param id: the target ID to find the K closest nodes of + @type callback: C{method} + @param callback: the method to call with the results, it must take 1 + parameter, the list of K closest nodes + @type errback: C{method} + @param errback: the method to call if an error occurs + (optional, defaults to doing nothing when an error occurs) + """ + # Get K nodes out of local table/cache nodes = self.table.findNodes(id) d = Deferred() if errback: d.addCallbacks(callback, errback) else: d.addCallback(callback) - if len(nodes) == 1 and nodes[0].id == id : + + # If the target ID was found + if len(nodes) == 1 and nodes[0].id == id: d.callback(nodes) else: - # create our search state + # Start the finding nodes action state = FindNode(self, id, d.callback, self.config) reactor.callLater(0, state.goWithNodes, nodes) - def insertNode(self, n, contacted=1): - """ - insert a node in our local table, pinging oldest contact in bucket, if necessary + def insertNode(self, node, contacted = True): + """Try to insert a node in our local table, pinging oldest contact if necessary. - If all you have is a host/port, then use addContact, which calls this method after - receiving the PONG from the remote node. The reason for the seperation is we can't insert - a node into the table without it's peer-ID. That means of course the node passed into this - method needs to be a properly formed Node object with a valid ID. + If all you have is a host/port, then use L{addContact}, which calls this + method after receiving the PONG from the remote node. The reason for + the seperation is we can't insert a node into the table without its + node ID. That means of course the node passed into this method needs + to be a properly formed Node object with a valid ID. + + @type node: L{node.Node} + @param node: the new node to try and insert + @type contacted: C{boolean} + @param contacted: whether the new node is known to be good, i.e. + responded to a request (optional, defaults to True) """ - old = self.table.insertNode(n, contacted=contacted) + old = self.table.insertNode(node, contacted=contacted) if (old and old.id != self.node.id and (datetime.now() - old.lastSeen) > timedelta(seconds=self.config['MIN_PING_INTERVAL'])): - # the bucket is full, check to see if old node is still around and if so, replace it - ## these are the callbacks used when we ping the oldest node in a bucket - def _staleNodeHandler(oldnode=old, newnode = n): - """ called if the pinged node never responds """ - self.table.replaceStaleNode(old, newnode) + def _staleNodeHandler(oldnode = old, newnode = node): + """The pinged node never responded, so replace it.""" + self.table.replaceStaleNode(oldnode, newnode) def _notStaleNodeHandler(dict, old=old): - """ called when we get a pong from the old node """ + """Got a pong from the old node, so update it.""" dict = dict['rsp'] if dict['id'] == old.id: self.table.justSeenNode(old.id) + # Bucket is full, check to see if old node is still available df = old.ping(self.node.id) df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) def sendJoin(self, node, callback=None, errback=None): + """Join the DHT by pinging a bootstrap node. + + @type node: L{node.Node} + @param node: the node to send the join to + @type callback: C{method} + @param callback: the method to call with the results, it must take 1 + parameter, the contact info returned by the node + (optional, defaults to doing nothing with the results) + @type errback: C{method} + @param errback: the method to call if an error occurs + (optional, defaults to calling the callback with None) """ - ping a node - """ - df = node.join(self.node.id) - ## these are the callbacks we use when we issue a PING + def _pongHandler(dict, node=node, self=self, callback=callback): + """Node responded properly, callback with response.""" n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) self.insertNode(n) if callback: callback((dict['rsp']['ip_addr'], dict['rsp']['port'])) + def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback): + """Error occurred, fail node and errback or callback with error.""" table.nodeFailed(node) if errback: errback() - else: + elif callback: callback(None) - df.addCallbacks(_pongHandler,_defaultPong) + df = node.join(self.node.id) + df.addCallbacks(_pongHandler, _defaultPong) def findCloseNodes(self, callback=lambda a: None, errback = None): - """ - This does a findNode on the ID one away from our own. - This will allow us to populate our table with nodes on our network closest to our own. - This is called as soon as we start up with an empty table + """Perform a findNode on the ID one away from our own. + + This will allow us to populate our table with nodes on our network + closest to our own. This is called as soon as we start up with an + empty table. + + @type callback: C{method} + @param callback: the method to call with the results, it must take 1 + parameter, the list of K closest nodes + (optional, defaults to doing nothing with the results) + @type errback: C{method} + @param errback: the method to call if an error occurs + (optional, defaults to doing nothing when an error occurs) """ id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) self.findNode(id, callback, errback) - def refreshTable(self, force=0): - """ - force=1 will refresh table regardless of last bucket access time + def refreshTable(self, force = False): + """Check all the buckets for those that need refreshing. + + @param force: refresh all buckets regardless of last bucket access time + (optional, defaults to False) """ def callback(nodes): pass @@ -176,14 +280,16 @@ class KhashmirBase(protocol.Factory): for bucket in self.table.buckets: if force or (datetime.now() - bucket.lastAccessed > timedelta(seconds=self.config['BUCKET_STALENESS'])): + # Choose a random ID in the bucket and try and find it id = newIDInRange(bucket.min, bucket.max) self.findNode(id, callback) def stats(self): - """ - Returns (num_contacts, num_nodes) - num_contacts: number contacts in our routing table - num_nodes: number of nodes estimated in the entire dht + """Collect some statistics about the DHT. + + @rtype: (C{int}, C{int}) + @return: the number contacts in our routing table, and the estimated + number of nodes in the entire DHT """ num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0) num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1)) @@ -198,35 +304,71 @@ class KhashmirBase(protocol.Factory): pass self.store.close() - #### Remote Interface - called by remote nodes + #{ Remote interface def krpc_ping(self, id, _krpc_sender): + """Pong with our ID. + + @type id: C{string} + @param id: the node ID of the sender node + @type _krpc_sender: (C{string}, C{int}) + @param _krpc_sender: the sender node's IP address and port + """ n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted=0) + self.insertNode(n, contacted = False) + return {"id" : self.node.id} def krpc_join(self, id, _krpc_sender): + """Add the node by responding with its address and port. + + @type id: C{string} + @param id: the node ID of the sender node + @type _krpc_sender: (C{string}, C{int}) + @param _krpc_sender: the sender node's IP address and port + """ n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted=0) + self.insertNode(n, contacted = False) + return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id} def krpc_find_node(self, target, id, _krpc_sender): + """Find the K closest nodes to the target in the local routing table. + + @type target: C{string} + @param target: the target ID to find nodes for + @type id: C{string} + @param id: the node ID of the sender node + @type _krpc_sender: (C{string}, C{int}) + @param _krpc_sender: the sender node's IP address and port + """ n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted=0) + self.insertNode(n, contacted = False) + nodes = self.table.findNodes(target) nodes = map(lambda node: node.contactInfo(), nodes) token = sha(self.token_secrets[0] + _krpc_sender[0]).digest() return {"nodes" : nodes, "token" : token, "id" : self.node.id} -## This class provides read-only access to the DHT, valueForKey -## you probably want to use this mixin and provide your own write methods class KhashmirRead(KhashmirBase): + """The read-only Khashmir class, which can only retrieve (not store) key/value mappings.""" + _Node = KNodeRead - ## also async + #{ Local interface def findValue(self, key, callback, errback=None): - """ returns the contact info for nodes that have values for the key, from the global table """ - # get K nodes out of local table/cache + """Get the nodes that have values for the key from the global table. + + @type key: C{string} + @param key: the target key to find the values for + @type callback: C{method} + @param callback: the method to call with the results, it must take 1 + parameter, the list of nodes with values + @type errback: C{method} + @param errback: the method to call if an error occurs + (optional, defaults to doing nothing when an error occurs) + """ + # Get K nodes out of local table/cache nodes = self.table.findNodes(key) d = Deferred() if errback: @@ -234,16 +376,25 @@ class KhashmirRead(KhashmirBase): else: d.addCallback(callback) - # create our search state + # Search for others starting with the locally found ones state = FindValue(self, key, d.callback, self.config) reactor.callLater(0, state.goWithNodes, nodes) - def valueForKey(self, key, callback, searchlocal = 1): - """ returns the values found for key in global table - callback will be called with a list of values for each peer that returns unique values - final callback will be an empty list - probably should change to 'more coming' arg + def valueForKey(self, key, callback, searchlocal = True): + """Get the values found for key in global table. + + Callback will be called with a list of values for each peer that + returns unique values. The final callback will be an empty list. + + @type key: C{string} + @param key: the target key to get the values for + @type callback: C{method} + @param callback: the method to call with the results, it must take 2 + parameters: the key, and the values found + @type searchlocal: C{boolean} + @param searchlocal: whether to also look for any local values """ - # get locals + # Get any local values if searchlocal: l = self.store.retrieveValues(key) if len(l) > 0: @@ -252,17 +403,26 @@ class KhashmirRead(KhashmirBase): l = [] def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self): - # create our search state + """Use the found nodes to send requests for values to.""" state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config) reactor.callLater(0, state.goWithNodes, nodes) - # this call is asynch + # First lookup nodes that have values for the key self.findValue(key, _getValueForKey) - #### Remote Interface - called by remote nodes + #{ Remote interface def krpc_find_value(self, key, id, _krpc_sender): + """Find the number of values stored locally for the key, and the K closest nodes. + + @type key: C{string} + @param key: the target key to find the values and nodes for + @type id: C{string} + @param id: the node ID of the sender node + @type _krpc_sender: (C{string}, C{int}) + @param _krpc_sender: the sender node's IP address and port + """ n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted=0) + self.insertNode(n, contacted = False) nodes = self.table.findNodes(key) nodes = map(lambda node: node.contactInfo(), nodes) @@ -270,8 +430,20 @@ class KhashmirRead(KhashmirBase): return {'nodes' : nodes, 'num' : num_values, "id": self.node.id} def krpc_get_value(self, key, num, id, _krpc_sender): + """Retrieve the values stored locally for the key. + + @type key: C{string} + @param key: the target key to retrieve the values for + @type num: C{int} + @param num: the maximum number of values to retrieve, or 0 to + retrieve all of them + @type id: C{string} + @param id: the node ID of the sender node + @type _krpc_sender: (C{string}, C{int}) + @param _krpc_sender: the sender node's IP address and port + """ n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted=0) + self.insertNode(n, contacted = False) l = self.store.retrieveValues(key) if num == 0 or num >= len(l): @@ -280,32 +452,57 @@ class KhashmirRead(KhashmirBase): shuffle(l) return {'values' : l[:num], "id": self.node.id} -### provides a generic write method, you probably don't want to deploy something that allows -### arbitrary value storage + class KhashmirWrite(KhashmirRead): + """The read-write Khashmir class, which can store and retrieve key/value mappings.""" + _Node = KNodeWrite - ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) + + #{ Local interface def storeValueForKey(self, key, value, callback=None): - """ stores the value and origination time for key in the global table, returns immediately, no status - in this implementation, peers respond but don't indicate status to storing values - a key can have many values + """Stores the value for the key in the global table. + + No status in this implementation, peers respond but don't indicate + status of storing values. + + @type key: C{string} + @param key: the target key to store the value for + @type value: C{string} + @param value: the value to store with the key + @type callback: C{method} + @param callback: the method to call with the results, it must take 3 + parameters: the key, the value stored, and the result of the store + (optional, defaults to doing nothing with the results) """ def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self): + """Use the returned K closest nodes to store the key at.""" if not response: - # default callback def _storedValueHandler(key, value, sender): + """Default callback that does nothing.""" pass - response=_storedValueHandler + response = _storedValueHandler action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config) reactor.callLater(0, action.goWithNodes, nodes) - # this call is asynch + # First find the K closest nodes to operate on. self.findNode(key, _storeValueForKey) - #### Remote Interface - called by remote nodes + #{ Remote interface def krpc_store_value(self, key, value, token, id, _krpc_sender): + """Store the value locally with the key. + + @type key: C{string} + @param key: the target key to store the value for + @type value: C{string} + @param value: the value to store with the key + @param token: the token to confirm that this peer contacted us previously + @type id: C{string} + @param id: the node ID of the sender node + @type _krpc_sender: (C{string}, C{int}) + @param _krpc_sender: the sender node's IP address and port + """ n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted=0) + self.insertNode(n, contacted = False) for secret in self.token_secrets: this_token = sha(secret + _krpc_sender[0]).digest() if token == this_token: @@ -313,10 +510,12 @@ class KhashmirWrite(KhashmirRead): return {"id" : self.node.id} raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one') -# the whole shebang, for testing + class Khashmir(KhashmirWrite): + """The default Khashmir class (currently the read-write L{KhashmirWrite}).""" _Node = KNodeWrite + class SimpleTests(unittest.TestCase): timeout = 10 @@ -328,7 +527,6 @@ class SimpleTests(unittest.TestCase): 'KEY_EXPIRE': 3600, 'SPEW': False, } def setUp(self): - krpc.KRPC.noisy = 0 d = self.DHT_DEFAULTS.copy() d['PORT'] = 4044 self.a = Khashmir(d) -- 2.30.2