## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
# see LICENSE.txt for license information
+"""The main Khashmir program."""
+
import warnings
warnings.simplefilter("ignore", DeprecationWarning)
-from time import time
-from random import randrange
+from datetime import datetime, timedelta
+from random import randrange, shuffle
from sha import sha
import os
-import sqlite ## find this at http://pysqlite.sourceforge.net/
from twisted.internet.defer import Deferred
from twisted.internet import protocol, reactor
from twisted.trial import unittest
+from db import DB
from ktable import KTable
from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
from khash import newID, newIDInRange
-from actions import FindNode, GetValue, KeyExpirer, StoreValue
+from actions import FindNode, FindValue, GetValue, StoreValue
import krpc
-class KhashmirDBExcept(Exception):
- pass
-
-# this is the base class, has base functionality and find node, no key-value mappings
class KhashmirBase(protocol.Factory):
+ """The base Khashmir class, with base functionality and find node, no key-value mappings.
+
+ @type _Node: L{node.Node}
+ @ivar _Node: the knode implementation to use for this class of DHT
+ @type config: C{dictionary}
+ @ivar config: the configuration parameters for the DHT
+ @type port: C{int}
+ @ivar port: the port to listen on
+ @type store: L{db.DB}
+ @ivar store: the database to store nodes and key/value pairs in
+ @type node: L{node.Node}
+ @ivar node: this node
+ @type table: L{ktable.KTable}
+ @ivar table: the routing table
+ @type token_secrets: C{list} of C{string}
+ @ivar token_secrets: the current secrets to use to create tokens
+ @type udp: L{krpc.hostbroker}
+ @ivar udp: the factory for the KRPC protocol
+ @type listenport: L{twisted.internet.interfaces.IListeningPort}
+ @ivar listenport: the UDP listening port
+ @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
+ @ivar next_checkpoint: the delayed call for the next checkpoint
+ """
+
_Node = KNodeBase
+
def __init__(self, config, cache_dir='/tmp'):
+ """Initialize the Khashmir class and call the L{setup} method.
+
+ @type config: C{dictionary}
+ @param config: the configuration parameters for the DHT
+ @type cache_dir: C{string}
+ @param cache_dir: the directory to store all files in
+ (optional, defaults to the /tmp directory)
+ """
self.config = None
self.setup(config, cache_dir)
def setup(self, config, cache_dir):
+ """Setup all the Khashmir sub-modules.
+
+ @type config: C{dictionary}
+ @param config: the configuration parameters for the DHT
+ @type cache_dir: C{string}
+ @param cache_dir: the directory to store all files in
+ """
self.config = config
self.port = config['PORT']
- self._findDB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
+ self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
self.node = self._loadSelfNode('', self.port)
self.table = KTable(self.node, config)
- #self.app = service.Application("krpc")
- self.udp = krpc.hostbroker(self)
+ self.token_secrets = [newID()]
+
+ # Start listening
+ self.udp = krpc.hostbroker(self, config)
self.udp.protocol = krpc.KRPC
self.listenport = reactor.listenUDP(self.port, self.udp)
- self.last = time()
+
+ # Load the routing table and begin checkpointing
self._loadRoutingTable()
- self.expirer = KeyExpirer(self.store, config)
- self.refreshTable(force=1)
- self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
+ self.refreshTable(force = True)
+ self.next_checkpoint = reactor.callLater(60, self.checkpoint)
- def Node(self):
- n = self._Node()
+ def Node(self, id, host = None, port = None):
+ """Create a new node.
+
+ @see: L{node.Node.__init__}
+ """
+ n = self._Node(id, host, port)
n.table = self.table
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
return n
def __del__(self):
+ """Stop listening for packets."""
self.listenport.stopListening()
def _loadSelfNode(self, host, port):
- c = self.store.cursor()
- c.execute('select id from self where num = 0;')
- if c.rowcount > 0:
- id = c.fetchone()[0]
- else:
+ """Create this node, loading any previously saved one."""
+ id = self.store.getSelfNode()
+ if not id:
id = newID()
- return self._Node().init(id, host, port)
+ return self._Node(id, host, port)
- def _saveSelfNode(self):
- c = self.store.cursor()
- c.execute('delete from self where num = 0;')
- c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
- self.store.commit()
+ def checkpoint(self):
+ """Perform some periodic maintenance operations."""
+ # Create a new token secret
+ self.token_secrets.insert(0, newID())
+ if len(self.token_secrets) > 3:
+ self.token_secrets.pop()
+
+ # Save some parameters for reloading
+ self.store.saveSelfNode(self.node.id)
+ self.store.dumpRoutingTable(self.table.buckets)
- def checkpoint(self, auto=0):
- self._saveSelfNode()
- self._dumpRoutingTable()
+ # DHT maintenance
+ self.store.expireValues(self.config['KEY_EXPIRE'])
self.refreshTable()
- if auto:
- self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
- int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
- self.checkpoint, (1,))
- def _findDB(self, db):
- self.db = db
- try:
- os.stat(db)
- except OSError:
- self._createNewDB(db)
- else:
- self._loadDB(db)
-
- def _loadDB(self, db):
- try:
- self.store = sqlite.connect(db=db)
- #self.store.autocommit = 0
- except:
- import traceback
- raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
-
- def _createNewDB(self, db):
- self.store = sqlite.connect(db=db)
- s = """
- create table kv (key binary, value binary, time timestamp, primary key (key, value));
- create index kv_key on kv(key);
- create index kv_timestamp on kv(time);
-
- create table nodes (id binary primary key, host text, port number);
-
- create table self (num number primary key, id binary);
- """
- c = self.store.cursor()
- c.execute(s)
- self.store.commit()
-
- def _dumpRoutingTable(self):
- """
- save routing table nodes to the database
- """
- c = self.store.cursor()
- c.execute("delete from nodes where id not NULL;")
- for bucket in self.table.buckets:
- for node in bucket.l:
- c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
- self.store.commit()
+ self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
+ int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
+ self.checkpoint)
def _loadRoutingTable(self):
+ """Load the previous routing table nodes from the database.
+
+ It's usually a good idea to call refreshTable(force = True) after
+ loading the table.
"""
- load routing table nodes from database
- it's usually a good idea to call refreshTable(force=1) after loading the table
- """
- c = self.store.cursor()
- c.execute("select * from nodes;")
- for rec in c.fetchall():
- n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
- n.conn = self.udp.connectionForAddr((n.host, n.port))
- self.table.insertNode(n, contacted=0)
+ nodes = self.store.getRoutingTable()
+ for rec in nodes:
+ n = self.Node(rec[0], rec[1], int(rec[2]))
+ self.table.insertNode(n, contacted = False)
-
- #######
- ####### LOCAL INTERFACE - use these methods!
- def addContact(self, host, port, callback=None):
- """
- ping this node and add the contact info to the table on pong!
+ #{ Local interface
+ def addContact(self, host, port, callback=None, errback=None):
+ """Ping this node and add the contact info to the table on pong.
+
+ @type host: C{string}
+ @param host: the IP address of the node to contact
+ @type port: C{int}
+ @param port:the port of the node to contact
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 1
+ parameter, the contact info returned by the node
+ (optional, defaults to doing nothing with the results)
+ @type errback: C{method}
+ @param errback: the method to call if an error occurs
+ (optional, defaults to calling the callback with None)
"""
- n =self.Node().init(NULL_ID, host, port)
- n.conn = self.udp.connectionForAddr((n.host, n.port))
- self.sendPing(n, callback=callback)
+ n = self.Node(NULL_ID, host, port)
+ self.sendJoin(n, callback=callback, errback=errback)
- ## this call is async!
def findNode(self, id, callback, errback=None):
- """ returns the contact info for node, or the k closest nodes, from the global table """
- # get K nodes out of local table/cache, or the node we want
+ """Find the contact info for the K closest nodes in the global table.
+
+ @type id: C{string}
+ @param id: the target ID to find the K closest nodes of
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 1
+ parameter, the list of K closest nodes
+ @type errback: C{method}
+ @param errback: the method to call if an error occurs
+ (optional, defaults to doing nothing when an error occurs)
+ """
+ # Get K nodes out of local table/cache
nodes = self.table.findNodes(id)
d = Deferred()
if errback:
d.addCallbacks(callback, errback)
else:
d.addCallback(callback)
- if len(nodes) == 1 and nodes[0].id == id :
+
+ # If the target ID was found
+ if len(nodes) == 1 and nodes[0].id == id:
d.callback(nodes)
else:
- # create our search state
+ # Start the finding nodes action
state = FindNode(self, id, d.callback, self.config)
reactor.callLater(0, state.goWithNodes, nodes)
- def insertNode(self, n, contacted=1):
- """
- insert a node in our local table, pinging oldest contact in bucket, if necessary
+ def insertNode(self, node, contacted = True):
+ """Try to insert a node in our local table, pinging oldest contact if necessary.
- If all you have is a host/port, then use addContact, which calls this method after
- receiving the PONG from the remote node. The reason for the seperation is we can't insert
- a node into the table without it's peer-ID. That means of course the node passed into this
- method needs to be a properly formed Node object with a valid ID.
+ If all you have is a host/port, then use L{addContact}, which calls this
+ method after receiving the PONG from the remote node. The reason for
+ the seperation is we can't insert a node into the table without its
+ node ID. That means of course the node passed into this method needs
+ to be a properly formed Node object with a valid ID.
+
+ @type node: L{node.Node}
+ @param node: the new node to try and insert
+ @type contacted: C{boolean}
+ @param contacted: whether the new node is known to be good, i.e.
+ responded to a request (optional, defaults to True)
"""
- old = self.table.insertNode(n, contacted=contacted)
- if old and (time() - old.lastSeen) > self.config['MIN_PING_INTERVAL'] and old.id != self.node.id:
- # the bucket is full, check to see if old node is still around and if so, replace it
+ old = self.table.insertNode(node, contacted=contacted)
+ if (old and old.id != self.node.id and
+ (datetime.now() - old.lastSeen) >
+ timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
- ## these are the callbacks used when we ping the oldest node in a bucket
- def _staleNodeHandler(oldnode=old, newnode = n):
- """ called if the pinged node never responds """
- self.table.replaceStaleNode(old, newnode)
+ def _staleNodeHandler(oldnode = old, newnode = node):
+ """The pinged node never responded, so replace it."""
+ self.table.replaceStaleNode(oldnode, newnode)
def _notStaleNodeHandler(dict, old=old):
- """ called when we get a pong from the old node """
+ """Got a pong from the old node, so update it."""
dict = dict['rsp']
if dict['id'] == old.id:
self.table.justSeenNode(old.id)
+ # Bucket is full, check to see if old node is still available
df = old.ping(self.node.id)
df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
- def sendPing(self, node, callback=None):
- """
- ping a node
+ def sendJoin(self, node, callback=None, errback=None):
+ """Join the DHT by pinging a bootstrap node.
+
+ @type node: L{node.Node}
+ @param node: the node to send the join to
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 1
+ parameter, the contact info returned by the node
+ (optional, defaults to doing nothing with the results)
+ @type errback: C{method}
+ @param errback: the method to call if an error occurs
+ (optional, defaults to calling the callback with None)
"""
- df = node.ping(self.node.id)
- ## these are the callbacks we use when we issue a PING
- def _pongHandler(dict, node=node, table=self.table, callback=callback):
- _krpc_sender = dict['_krpc_sender']
- dict = dict['rsp']
- sender = {'id' : dict['id']}
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = self.Node().initWithDict(sender)
- n.conn = self.udp.connectionForAddr((n.host, n.port))
- table.insertNode(n)
+
+ def _pongHandler(dict, node=node, self=self, callback=callback):
+ """Node responded properly, callback with response."""
+ n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+ self.insertNode(n)
if callback:
- callback()
- def _defaultPong(err, node=node, table=self.table, callback=callback):
+ callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
+
+ def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
+ """Error occurred, fail node and errback or callback with error."""
table.nodeFailed(node)
- if callback:
- callback()
+ if errback:
+ errback()
+ elif callback:
+ callback(None)
- df.addCallbacks(_pongHandler,_defaultPong)
-
- def findCloseNodes(self, callback=lambda a: None):
- """
- This does a findNode on the ID one away from our own.
- This will allow us to populate our table with nodes on our network closest to our own.
- This is called as soon as we start up with an empty table
+ df = node.join(self.node.id)
+ df.addCallbacks(_pongHandler, _defaultPong)
+
+ def findCloseNodes(self, callback=lambda a: None, errback = None):
+ """Perform a findNode on the ID one away from our own.
+
+ This will allow us to populate our table with nodes on our network
+ closest to our own. This is called as soon as we start up with an
+ empty table.
+
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 1
+ parameter, the list of K closest nodes
+ (optional, defaults to doing nothing with the results)
+ @type errback: C{method}
+ @param errback: the method to call if an error occurs
+ (optional, defaults to doing nothing when an error occurs)
"""
id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
- self.findNode(id, callback)
+ self.findNode(id, callback, errback)
- def refreshTable(self, force=0):
- """
- force=1 will refresh table regardless of last bucket access time
+ def refreshTable(self, force = False):
+ """Check all the buckets for those that need refreshing.
+
+ @param force: refresh all buckets regardless of last bucket access time
+ (optional, defaults to False)
"""
def callback(nodes):
pass
for bucket in self.table.buckets:
- if force or (time() - bucket.lastAccessed >= self.config['BUCKET_STALENESS']):
+ if force or (datetime.now() - bucket.lastAccessed >
+ timedelta(seconds=self.config['BUCKET_STALENESS'])):
+ # Choose a random ID in the bucket and try and find it
id = newIDInRange(bucket.min, bucket.max)
self.findNode(id, callback)
def stats(self):
- """
- Returns (num_contacts, num_nodes)
- num_contacts: number contacts in our routing table
- num_nodes: number of nodes estimated in the entire dht
+ """Collect some statistics about the DHT.
+
+ @rtype: (C{int}, C{int})
+ @return: the number contacts in our routing table, and the estimated
+ number of nodes in the entire DHT
"""
num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
self.next_checkpoint.cancel()
except:
pass
- self.expirer.shutdown()
self.store.close()
+ #{ Remote interface
def krpc_ping(self, id, _krpc_sender):
- sender = {'id' : id}
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = self.Node().initWithDict(sender)
- n.conn = self.udp.connectionForAddr((n.host, n.port))
- self.insertNode(n, contacted=0)
+ """Pong with our ID.
+
+ @type id: C{string}
+ @param id: the node ID of the sender node
+ @type _krpc_sender: (C{string}, C{int})
+ @param _krpc_sender: the sender node's IP address and port
+ """
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted = False)
+
return {"id" : self.node.id}
+ def krpc_join(self, id, _krpc_sender):
+ """Add the node by responding with its address and port.
+
+ @type id: C{string}
+ @param id: the node ID of the sender node
+ @type _krpc_sender: (C{string}, C{int})
+ @param _krpc_sender: the sender node's IP address and port
+ """
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted = False)
+
+ return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
+
def krpc_find_node(self, target, id, _krpc_sender):
+ """Find the K closest nodes to the target in the local routing table.
+
+ @type target: C{string}
+ @param target: the target ID to find nodes for
+ @type id: C{string}
+ @param id: the node ID of the sender node
+ @type _krpc_sender: (C{string}, C{int})
+ @param _krpc_sender: the sender node's IP address and port
+ """
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted = False)
+
nodes = self.table.findNodes(target)
- nodes = map(lambda node: node.senderDict(), nodes)
- sender = {'id' : id}
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = self.Node().initWithDict(sender)
- n.conn = self.udp.connectionForAddr((n.host, n.port))
- self.insertNode(n, contacted=0)
- return {"nodes" : nodes, "id" : self.node.id}
+ nodes = map(lambda node: node.contactInfo(), nodes)
+ token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
+ return {"nodes" : nodes, "token" : token, "id" : self.node.id}
-## This class provides read-only access to the DHT, valueForKey
-## you probably want to use this mixin and provide your own write methods
class KhashmirRead(KhashmirBase):
+ """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
+
_Node = KNodeRead
- def retrieveValues(self, key):
- c = self.store.cursor()
- c.execute("select value from kv where key = %s;", sqlite.encode(key))
- t = c.fetchone()
- l = []
- while t:
- l.append(t['value'])
- t = c.fetchone()
- return l
- ## also async
- def valueForKey(self, key, callback, searchlocal = 1):
- """ returns the values found for key in global table
- callback will be called with a list of values for each peer that returns unique values
- final callback will be an empty list - probably should change to 'more coming' arg
+
+ #{ Local interface
+ def findValue(self, key, callback, errback=None):
+ """Get the nodes that have values for the key from the global table.
+
+ @type key: C{string}
+ @param key: the target key to find the values for
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 1
+ parameter, the list of nodes with values
+ @type errback: C{method}
+ @param errback: the method to call if an error occurs
+ (optional, defaults to doing nothing when an error occurs)
"""
+ # Get K nodes out of local table/cache
nodes = self.table.findNodes(key)
+ d = Deferred()
+ if errback:
+ d.addCallbacks(callback, errback)
+ else:
+ d.addCallback(callback)
+
+ # Search for others starting with the locally found ones
+ state = FindValue(self, key, d.callback, self.config)
+ reactor.callLater(0, state.goWithNodes, nodes)
+
+ def valueForKey(self, key, callback, searchlocal = True):
+ """Get the values found for key in global table.
- # get locals
+ Callback will be called with a list of values for each peer that
+ returns unique values. The final callback will be an empty list.
+
+ @type key: C{string}
+ @param key: the target key to get the values for
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 2
+ parameters: the key, and the values found
+ @type searchlocal: C{boolean}
+ @param searchlocal: whether to also look for any local values
+ """
+ # Get any local values
if searchlocal:
- l = self.retrieveValues(key)
+ l = self.store.retrieveValues(key)
if len(l) > 0:
- reactor.callLater(0, callback, (l))
+ reactor.callLater(0, callback, key, l)
else:
l = []
-
- # create our search state
- state = GetValue(self, key, callback, self.config)
- reactor.callLater(0, state.goWithNodes, nodes, l)
+ def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
+ """Use the found nodes to send requests for values to."""
+ state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
+ reactor.callLater(0, state.goWithNodes, nodes)
+
+ # First lookup nodes that have values for the key
+ self.findValue(key, _getValueForKey)
+
+ #{ Remote interface
def krpc_find_value(self, key, id, _krpc_sender):
- sender = {'id' : id}
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = self.Node().initWithDict(sender)
- n.conn = self.udp.connectionForAddr((n.host, n.port))
- self.insertNode(n, contacted=0)
+ """Find the number of values stored locally for the key, and the K closest nodes.
+
+ @type key: C{string}
+ @param key: the target key to find the values and nodes for
+ @type id: C{string}
+ @param id: the node ID of the sender node
+ @type _krpc_sender: (C{string}, C{int})
+ @param _krpc_sender: the sender node's IP address and port
+ """
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted = False)
+
+ nodes = self.table.findNodes(key)
+ nodes = map(lambda node: node.contactInfo(), nodes)
+ num_values = self.store.countValues(key)
+ return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
+
+ def krpc_get_value(self, key, num, id, _krpc_sender):
+ """Retrieve the values stored locally for the key.
+
+ @type key: C{string}
+ @param key: the target key to retrieve the values for
+ @type num: C{int}
+ @param num: the maximum number of values to retrieve, or 0 to
+ retrieve all of them
+ @type id: C{string}
+ @param id: the node ID of the sender node
+ @type _krpc_sender: (C{string}, C{int})
+ @param _krpc_sender: the sender node's IP address and port
+ """
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted = False)
- l = self.retrieveValues(key)
- if len(l) > 0:
+ l = self.store.retrieveValues(key)
+ if num == 0 or num >= len(l):
return {'values' : l, "id": self.node.id}
else:
- nodes = self.table.findNodes(key)
- nodes = map(lambda node: node.senderDict(), nodes)
- return {'nodes' : nodes, "id": self.node.id}
+ shuffle(l)
+ return {'values' : l[:num], "id": self.node.id}
+
-### provides a generic write method, you probably don't want to deploy something that allows
-### arbitrary value storage
class KhashmirWrite(KhashmirRead):
+ """The read-write Khashmir class, which can store and retrieve key/value mappings."""
+
_Node = KNodeWrite
- ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+
+ #{ Local interface
def storeValueForKey(self, key, value, callback=None):
- """ stores the value for key in the global table, returns immediately, no status
- in this implementation, peers respond but don't indicate status to storing values
- a key can have many values
+ """Stores the value for the key in the global table.
+
+ No status in this implementation, peers respond but don't indicate
+ status of storing values.
+
+ @type key: C{string}
+ @param key: the target key to store the value for
+ @type value: C{string}
+ @param value: the value to store with the key
+ @type callback: C{method}
+ @param callback: the method to call with the results, it must take 3
+ parameters: the key, the value stored, and the result of the store
+ (optional, defaults to doing nothing with the results)
"""
- def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+ def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
+ """Use the returned K closest nodes to store the key at."""
if not response:
- # default callback
- def _storedValueHandler(sender):
+ def _storedValueHandler(key, value, sender):
+ """Default callback that does nothing."""
pass
- response=_storedValueHandler
- action = StoreValue(self.table, key, value, response, self.config)
+ response = _storedValueHandler
+ action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
reactor.callLater(0, action.goWithNodes, nodes)
- # this call is asynch
+ # First find the K closest nodes to operate on.
self.findNode(key, _storeValueForKey)
- def krpc_store_value(self, key, value, id, _krpc_sender):
- t = "%0.6f" % time()
- c = self.store.cursor()
- try:
- c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
- except sqlite.IntegrityError, reason:
- # update last insert time
- c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
- self.store.commit()
- sender = {'id' : id}
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = self.Node().initWithDict(sender)
- n.conn = self.udp.connectionForAddr((n.host, n.port))
- self.insertNode(n, contacted=0)
- return {"id" : self.node.id}
+ #{ Remote interface
+ def krpc_store_value(self, key, value, token, id, _krpc_sender):
+ """Store the value locally with the key.
+
+ @type key: C{string}
+ @param key: the target key to store the value for
+ @type value: C{string}
+ @param value: the value to store with the key
+ @param token: the token to confirm that this peer contacted us previously
+ @type id: C{string}
+ @param id: the node ID of the sender node
+ @type _krpc_sender: (C{string}, C{int})
+ @param _krpc_sender: the sender node's IP address and port
+ """
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted = False)
+ for secret in self.token_secrets:
+ this_token = sha(secret + _krpc_sender[0]).digest()
+ if token == this_token:
+ self.store.storeValue(key, value)
+ return {"id" : self.node.id}
+ raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
+
-# the whole shebang, for testing
class Khashmir(KhashmirWrite):
+ """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
_Node = KNodeWrite
+
class SimpleTests(unittest.TestCase):
timeout = 10
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
- 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
- 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+ 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
+ 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+ 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
- 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, }
+ 'KEY_EXPIRE': 3600, 'SPEW': False, }
def setUp(self):
- krpc.KRPC.noisy = 0
d = self.DHT_DEFAULTS.copy()
d['PORT'] = 4044
self.a = Khashmir(d)
def tearDown(self):
self.a.shutdown()
self.b.shutdown()
- os.unlink(self.a.db)
- os.unlink(self.b.db)
+ os.unlink(self.a.store.db)
+ os.unlink(self.b.store.db)
def testAddContact(self):
- self.assertEqual(len(self.a.table.buckets), 1)
- self.assertEqual(len(self.a.table.buckets[0].l), 0)
+ self.failUnlessEqual(len(self.a.table.buckets), 1)
+ self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
- self.assertEqual(len(self.b.table.buckets), 1)
- self.assertEqual(len(self.b.table.buckets[0].l), 0)
+ self.failUnlessEqual(len(self.b.table.buckets), 1)
+ self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
self.a.addContact('127.0.0.1', 4045)
reactor.iterate()
reactor.iterate()
reactor.iterate()
- self.assertEqual(len(self.a.table.buckets), 1)
- self.assertEqual(len(self.a.table.buckets[0].l), 1)
- self.assertEqual(len(self.b.table.buckets), 1)
- self.assertEqual(len(self.b.table.buckets[0].l), 1)
+ self.failUnlessEqual(len(self.a.table.buckets), 1)
+ self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
+ self.failUnlessEqual(len(self.b.table.buckets), 1)
+ self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
def testStoreRetrieve(self):
self.a.addContact('127.0.0.1', 4045)
reactor.iterate()
reactor.iterate()
- def _cb(self, val):
+ def _cb(self, key, val):
if not val:
- self.assertEqual(self.got, 1)
+ self.failUnlessEqual(self.got, 1)
elif 'foobar' in val:
self.got = 1
timeout = 30
num = 20
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
- 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
- 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+ 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
+ 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+ 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
- 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, }
+ 'KEY_EXPIRE': 3600, 'SPEW': False, }
def _done(self, val):
self.done = 1
def tearDown(self):
for i in self.l:
i.shutdown()
- os.unlink(i.db)
+ os.unlink(i.store.db)
reactor.iterate()
for a in range(3):
self.done = 0
- def _scb(val):
+ def _scb(key, value, result):
self.done = 1
self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
while not self.done:
reactor.iterate()
- def _rcb(val):
+ def _rcb(key, val):
if not val:
self.done = 1
- self.assertEqual(self.got, 1)
+ self.failUnlessEqual(self.got, 1)
elif V in val:
self.got = 1
for x in range(3):