## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
# see LICENSE.txt for license information
+"""The main Khashmir program."""
import warnings
warnings.simplefilter("ignore", DeprecationWarning)
from actions import FindNode, FindValue, GetValue, StoreValue
import krpc
-# this is the base class, has base functionality and find node, no key-value mappings
class KhashmirBase(protocol.Factory):
+ """The base Khashmir class, with base functionality and find node, no key-value mappings.
+ @type _Node: L{node.Node}
+ @ivar _Node: the knode implementation to use for this class of DHT
+ @type config: C{dictionary}
+ @ivar config: the configuration parameters for the DHT
+ @type port: C{int}
+ @ivar port: the port to listen on
+ @type store: L{db.DB}
+ @ivar store: the database to store nodes and key/value pairs in
+ @type node: L{node.Node}
+ @ivar node: this node
+ @type table: L{ktable.KTable}
+ @ivar table: the routing table
+ @type token_secrets: C{list} of C{string}
+ @ivar token_secrets: the current secrets to use to create tokens
+ @type udp: L{krpc.hostbroker}
+ @ivar udp: the factory for the KRPC protocol
+ @type listenport: L{twisted.internet.interfaces.IListeningPort}
+ @ivar listenport: the UDP listening port
+ @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
+ @ivar next_checkpoint: the delayed call for the next checkpoint
+ """
_Node = KNodeBase
def __init__(self, config, cache_dir='/tmp'):
+ """Initialize the Khashmir class and call the L{setup} method.
+ @type config: C{dictionary}
+ @param config: the configuration parameters for the DHT
+ @type cache_dir: C{string}
+ @param cache_dir: the directory to store all files in
+ (optional, defaults to the /tmp directory)
+ """
self.config = None
self.setup(config, cache_dir)
def setup(self, config, cache_dir):
+ """Setup all the Khashmir sub-modules.
+ @type config: C{dictionary}
+ @param config: the configuration parameters for the DHT
+ @type cache_dir: C{string}
+ @param cache_dir: the directory to store all files in
+ """
self.config = config
self.port = config['PORT']
self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
self.node = self._loadSelfNode('', self.port)
self.table = KTable(self.node, config)
self.token_secrets = [newID()]
- #self.app = service.Application("krpc")
+ # Start listening
self.udp = krpc.hostbroker(self, config)
self.udp.protocol = krpc.KRPC
self.listenport = reactor.listenUDP(self.port, self.udp)
+ # Load the routing table and begin checkpointing
- self.refreshTable(force=1)
- self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
+ self.refreshTable(force = True)
+ self.next_checkpoint = reactor.callLater(60, self.checkpoint)
def Node(self, id, host = None, port = None):
- """Create a new node."""
+ """Create a new node.
+ @see: L{node.Node.__init__}
+ """
n = self._Node(id, host, port)
n.table = self.table
n.conn = self.udp.connectionForAddr((n.host, n.port))
return n
def __del__(self):
+ """Stop listening for packets."""
def _loadSelfNode(self, host, port):
+ """Create this node, loading any previously saved one."""
id = self.store.getSelfNode()
if not id:
id = newID()
return self._Node(id, host, port)
- def checkpoint(self, auto=0):
+ def checkpoint(self):
+ """Perform some periodic maintenance operations."""
+ # Create a new token secret
self.token_secrets.insert(0, newID())
if len(self.token_secrets) > 3:
+ # Save some parameters for reloading
+ # DHT maintenance
- if auto:
- self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
- int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
- self.checkpoint, (1,))
+ self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
+ int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
+ self.checkpoint)
def _loadRoutingTable(self):
- """
- load routing table nodes from database
- it's usually a good idea to call refreshTable(force=1) after loading the table
+ """Load the previous routing table nodes from the database.
+ It's usually a good idea to call refreshTable(force = True) after
+ loading the table.
nodes = self.store.getRoutingTable()
for rec in nodes:
n = self.Node(rec[0], rec[1], int(rec[2]))
- self.table.insertNode(n, contacted=0)
+ self.table.insertNode(n, contacted = False)
- #######
- ####### LOCAL INTERFACE - use these methods!
+ #{ Local interface
def addContact(self, host, port, callback=None, errback=None):
- """
- ping this node and add the contact info to the table on pong!
+ """Ping this node and add the contact info to the table on pong.
+ @type host: C{string}
+ @param host: the IP address of the node to contact
+ @type port: C{int}
+ @param port:the port of the node to contact
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 1
+ parameter, the contact info returned by the node
+ (optional, defaults to doing nothing with the results)
+ @type errback: C{method}
+ @param errback: the method to call if an error occurs
+ (optional, defaults to calling the callback with None)
n = self.Node(NULL_ID, host, port)
self.sendJoin(n, callback=callback, errback=errback)
- ## this call is async!
def findNode(self, id, callback, errback=None):
- """ returns the contact info for node, or the k closest nodes, from the global table """
- # get K nodes out of local table/cache, or the node we want
+ """Find the contact info for the K closest nodes in the global table.
+ @type id: C{string}
+ @param id: the target ID to find the K closest nodes of
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 1
+ parameter, the list of K closest nodes
+ @type errback: C{method}
+ @param errback: the method to call if an error occurs
+ (optional, defaults to doing nothing when an error occurs)
+ """
+ # Get K nodes out of local table/cache
nodes = self.table.findNodes(id)
d = Deferred()
if errback:
d.addCallbacks(callback, errback)
- if len(nodes) == 1 and nodes[0].id == id :
+ # If the target ID was found
+ if len(nodes) == 1 and nodes[0].id == id:
- # create our search state
+ # Start the finding nodes action
state = FindNode(self, id, d.callback, self.config)
reactor.callLater(0, state.goWithNodes, nodes)
- def insertNode(self, n, contacted=1):
- """
- insert a node in our local table, pinging oldest contact in bucket, if necessary
+ def insertNode(self, node, contacted = True):
+ """Try to insert a node in our local table, pinging oldest contact if necessary.
- If all you have is a host/port, then use addContact, which calls this method after
- receiving the PONG from the remote node. The reason for the seperation is we can't insert
- a node into the table without it's peer-ID. That means of course the node passed into this
- method needs to be a properly formed Node object with a valid ID.
+ If all you have is a host/port, then use L{addContact}, which calls this
+ method after receiving the PONG from the remote node. The reason for
+ the seperation is we can't insert a node into the table without its
+ node ID. That means of course the node passed into this method needs
+ to be a properly formed Node object with a valid ID.
+ @type node: L{node.Node}
+ @param node: the new node to try and insert
+ @type contacted: C{boolean}
+ @param contacted: whether the new node is known to be good, i.e.
+ responded to a request (optional, defaults to True)
- old = self.table.insertNode(n, contacted=contacted)
+ old = self.table.insertNode(node, contacted=contacted)
if (old and old.id != self.node.id and
(datetime.now() - old.lastSeen) >
- # the bucket is full, check to see if old node is still around and if so, replace it
- ## these are the callbacks used when we ping the oldest node in a bucket
- def _staleNodeHandler(oldnode=old, newnode = n):
- """ called if the pinged node never responds """
- self.table.replaceStaleNode(old, newnode)
+ def _staleNodeHandler(oldnode = old, newnode = node):
+ """The pinged node never responded, so replace it."""
+ self.table.replaceStaleNode(oldnode, newnode)
def _notStaleNodeHandler(dict, old=old):
- """ called when we get a pong from the old node """
+ """Got a pong from the old node, so update it."""
dict = dict['rsp']
if dict['id'] == old.id:
+ # Bucket is full, check to see if old node is still available
df = old.ping(self.node.id)
df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
def sendJoin(self, node, callback=None, errback=None):
+ """Join the DHT by pinging a bootstrap node.
+ @type node: L{node.Node}
+ @param node: the node to send the join to
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 1
+ parameter, the contact info returned by the node
+ (optional, defaults to doing nothing with the results)
+ @type errback: C{method}
+ @param errback: the method to call if an error occurs
+ (optional, defaults to calling the callback with None)
- ping a node
- """
- df = node.join(self.node.id)
- ## these are the callbacks we use when we issue a PING
def _pongHandler(dict, node=node, self=self, callback=callback):
+ """Node responded properly, callback with response."""
n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
if callback:
callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
+ """Error occurred, fail node and errback or callback with error."""
if errback:
- else:
+ elif callback:
- df.addCallbacks(_pongHandler,_defaultPong)
+ df = node.join(self.node.id)
+ df.addCallbacks(_pongHandler, _defaultPong)
def findCloseNodes(self, callback=lambda a: None, errback = None):
- """
- This does a findNode on the ID one away from our own.
- This will allow us to populate our table with nodes on our network closest to our own.
- This is called as soon as we start up with an empty table
+ """Perform a findNode on the ID one away from our own.
+ This will allow us to populate our table with nodes on our network
+ closest to our own. This is called as soon as we start up with an
+ empty table.
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 1
+ parameter, the list of K closest nodes
+ (optional, defaults to doing nothing with the results)
+ @type errback: C{method}
+ @param errback: the method to call if an error occurs
+ (optional, defaults to doing nothing when an error occurs)
id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
self.findNode(id, callback, errback)
- def refreshTable(self, force=0):
- """
- force=1 will refresh table regardless of last bucket access time
+ def refreshTable(self, force = False):
+ """Check all the buckets for those that need refreshing.
+ @param force: refresh all buckets regardless of last bucket access time
+ (optional, defaults to False)
def callback(nodes):
for bucket in self.table.buckets:
if force or (datetime.now() - bucket.lastAccessed >
+ # Choose a random ID in the bucket and try and find it
id = newIDInRange(bucket.min, bucket.max)
self.findNode(id, callback)
def stats(self):
- """
- Returns (num_contacts, num_nodes)
- num_contacts: number contacts in our routing table
- num_nodes: number of nodes estimated in the entire dht
+ """Collect some statistics about the DHT.
+ @rtype: (C{int}, C{int})
+ @return: the number contacts in our routing table, and the estimated
+ number of nodes in the entire DHT
num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
- #### Remote Interface - called by remote nodes
+ #{ Remote interface
def krpc_ping(self, id, _krpc_sender):
+ """Pong with our ID.
+ @type id: C{string}
+ @param id: the node ID of the sender node
+ @type _krpc_sender: (C{string}, C{int})
+ @param _krpc_sender: the sender node's IP address and port
+ """
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted=0)
+ self.insertNode(n, contacted = False)
return {"id" : self.node.id}
def krpc_join(self, id, _krpc_sender):
+ """Add the node by responding with its address and port.
+ @type id: C{string}
+ @param id: the node ID of the sender node
+ @type _krpc_sender: (C{string}, C{int})
+ @param _krpc_sender: the sender node's IP address and port
+ """
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted=0)
+ self.insertNode(n, contacted = False)
return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
def krpc_find_node(self, target, id, _krpc_sender):
+ """Find the K closest nodes to the target in the local routing table.
+ @type target: C{string}
+ @param target: the target ID to find nodes for
+ @type id: C{string}
+ @param id: the node ID of the sender node
+ @type _krpc_sender: (C{string}, C{int})
+ @param _krpc_sender: the sender node's IP address and port
+ """
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted=0)
+ self.insertNode(n, contacted = False)
nodes = self.table.findNodes(target)
nodes = map(lambda node: node.contactInfo(), nodes)
token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
return {"nodes" : nodes, "token" : token, "id" : self.node.id}
-## This class provides read-only access to the DHT, valueForKey
-## you probably want to use this mixin and provide your own write methods
class KhashmirRead(KhashmirBase):
+ """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
_Node = KNodeRead
- ## also async
+ #{ Local interface
def findValue(self, key, callback, errback=None):
- """ returns the contact info for nodes that have values for the key, from the global table """
- # get K nodes out of local table/cache
+ """Get the nodes that have values for the key from the global table.
+ @type key: C{string}
+ @param key: the target key to find the values for
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 1
+ parameter, the list of nodes with values
+ @type errback: C{method}
+ @param errback: the method to call if an error occurs
+ (optional, defaults to doing nothing when an error occurs)
+ """
+ # Get K nodes out of local table/cache
nodes = self.table.findNodes(key)
d = Deferred()
if errback:
- # create our search state
+ # Search for others starting with the locally found ones
state = FindValue(self, key, d.callback, self.config)
reactor.callLater(0, state.goWithNodes, nodes)
- def valueForKey(self, key, callback, searchlocal = 1):
- """ returns the values found for key in global table
- callback will be called with a list of values for each peer that returns unique values
- final callback will be an empty list - probably should change to 'more coming' arg
+ def valueForKey(self, key, callback, searchlocal = True):
+ """Get the values found for key in global table.
+ Callback will be called with a list of values for each peer that
+ returns unique values. The final callback will be an empty list.
+ @type key: C{string}
+ @param key: the target key to get the values for
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 2
+ parameters: the key, and the values found
+ @type searchlocal: C{boolean}
+ @param searchlocal: whether to also look for any local values
- # get locals
+ # Get any local values
if searchlocal:
l = self.store.retrieveValues(key)
if len(l) > 0:
l = []
def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
- # create our search state
+ """Use the found nodes to send requests for values to."""
state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
reactor.callLater(0, state.goWithNodes, nodes)
- # this call is asynch
+ # First lookup nodes that have values for the key
self.findValue(key, _getValueForKey)
- #### Remote Interface - called by remote nodes
+ #{ Remote interface
def krpc_find_value(self, key, id, _krpc_sender):
+ """Find the number of values stored locally for the key, and the K closest nodes.
+ @type key: C{string}
+ @param key: the target key to find the values and nodes for
+ @type id: C{string}
+ @param id: the node ID of the sender node
+ @type _krpc_sender: (C{string}, C{int})
+ @param _krpc_sender: the sender node's IP address and port
+ """
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted=0)
+ self.insertNode(n, contacted = False)
nodes = self.table.findNodes(key)
nodes = map(lambda node: node.contactInfo(), nodes)
return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
def krpc_get_value(self, key, num, id, _krpc_sender):
+ """Retrieve the values stored locally for the key.
+ @type key: C{string}
+ @param key: the target key to retrieve the values for
+ @type num: C{int}
+ @param num: the maximum number of values to retrieve, or 0 to
+ retrieve all of them
+ @type id: C{string}
+ @param id: the node ID of the sender node
+ @type _krpc_sender: (C{string}, C{int})
+ @param _krpc_sender: the sender node's IP address and port
+ """
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted=0)
+ self.insertNode(n, contacted = False)
l = self.store.retrieveValues(key)
if num == 0 or num >= len(l):
return {'values' : l[:num], "id": self.node.id}
-### provides a generic write method, you probably don't want to deploy something that allows
-### arbitrary value storage
class KhashmirWrite(KhashmirRead):
+ """The read-write Khashmir class, which can store and retrieve key/value mappings."""
_Node = KNodeWrite
- ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+ #{ Local interface
def storeValueForKey(self, key, value, callback=None):
- """ stores the value and origination time for key in the global table, returns immediately, no status
- in this implementation, peers respond but don't indicate status to storing values
- a key can have many values
+ """Stores the value for the key in the global table.
+ No status in this implementation, peers respond but don't indicate
+ status of storing values.
+ @type key: C{string}
+ @param key: the target key to store the value for
+ @type value: C{string}
+ @param value: the value to store with the key
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 3
+ parameters: the key, the value stored, and the result of the store
+ (optional, defaults to doing nothing with the results)
def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
+ """Use the returned K closest nodes to store the key at."""
if not response:
- # default callback
def _storedValueHandler(key, value, sender):
+ """Default callback that does nothing."""
- response=_storedValueHandler
+ response = _storedValueHandler
action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
reactor.callLater(0, action.goWithNodes, nodes)
- # this call is asynch
+ # First find the K closest nodes to operate on.
self.findNode(key, _storeValueForKey)
- #### Remote Interface - called by remote nodes
+ #{ Remote interface
def krpc_store_value(self, key, value, token, id, _krpc_sender):
+ """Store the value locally with the key.
+ @type key: C{string}
+ @param key: the target key to store the value for
+ @type value: C{string}
+ @param value: the value to store with the key
+ @param token: the token to confirm that this peer contacted us previously
+ @type id: C{string}
+ @param id: the node ID of the sender node
+ @type _krpc_sender: (C{string}, C{int})
+ @param _krpc_sender: the sender node's IP address and port
+ """
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted=0)
+ self.insertNode(n, contacted = False)
for secret in self.token_secrets:
this_token = sha(secret + _krpc_sender[0]).digest()
if token == this_token:
return {"id" : self.node.id}
raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
-# the whole shebang, for testing
class Khashmir(KhashmirWrite):
+ """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
_Node = KNodeWrite
class SimpleTests(unittest.TestCase):
timeout = 10
'KEY_EXPIRE': 3600, 'SPEW': False, }
def setUp(self):
- krpc.KRPC.noisy = 0
d = self.DHT_DEFAULTS.copy()
d['PORT'] = 4044
self.a = Khashmir(d)