import const
from khash import intify
-from knode import KNode as Node
from ktable import KTable, K
class ActionBase:
sender = {'id' : dict["id"]}
sender['port'] = _krpc_sender[1]
sender['host'] = _krpc_sender[0]
- sender = Node().initWithDict(sender)
+ sender = self.table.Node().initWithDict(sender)
sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
self.outstanding = self.outstanding - 1
self.answered[sender.id] = 1
for node in l:
- n = Node().initWithDict(node)
+ n = self.table.Node().initWithDict(node)
n.conn = self.table.udp.connectionForAddr((n.host, n.port))
if not self.found.has_key(n.id):
self.found[n.id] = n
def makeMsgFailed(self, node):
def defaultGotNodes(err, self=self, node=node):
- print ">>> find failed %s/%s" % (node.host, node.port)
+ print ">>> find failed %s/%s" % (node.host, node.port), err
self.table.table.nodeFailed(node)
self.outstanding = self.outstanding - 1
self.schedule()
self.schedule()
-GET_VALUE_TIMEOUT = 15
+get_value_timeout = 15
class GetValue(FindNode):
+ def __init__(self, table, target, callback, find="findValue"):
+ FindNode.__init__(self, table, target, callback)
+ self.findValue = find
+
""" get value task """
def handleGotNodes(self, dict):
_krpc_sender = dict['_krpc_sender']
sender = {'id' : dict["id"]}
sender['port'] = _krpc_sender[1]
sender['host'] = _krpc_sender[0]
- sender = Node().initWithDict(sender)
+ sender = self.table.Node().initWithDict(sender)
sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
# if we have any closer than what we already got, query them
if dict.has_key('nodes'):
for node in dict['nodes']:
- n = Node().initWithDict(node)
+ n = self.table.Node().initWithDict(node)
n.conn = self.table.udp.connectionForAddr((n.host, n.port))
if not self.found.has_key(n.id):
self.found[n.id] = n
for node in l[:K]:
if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
#xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
- df = node.findValue(self.target, self.table.node.id)
- df.addCallback(self.handleGotNodes)
- df.addErrback(self.makeMsgFailed(node))
- self.outstanding = self.outstanding + 1
- self.queried[node.id] = 1
+ try:
+ f = getattr(node, self.findValue)
+ except AttributeError:
+ print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
+ else:
+ df = f(self.target, self.table.node.id)
+ df.addCallback(self.handleGotNodes)
+ df.addErrback(self.makeMsgFailed(node))
+ self.outstanding = self.outstanding + 1
+ self.queried[node.id] = 1
if self.outstanding >= const.CONCURRENT_REQS:
break
assert(self.outstanding) >=0
class StoreValue(ActionBase):
- def __init__(self, table, target, value, callback):
+ def __init__(self, table, target, value, callback, store="storeValue"):
ActionBase.__init__(self, table, target, callback)
self.value = value
self.stored = []
-
+ self.store = store
+
def storedValue(self, t, node):
self.outstanding -= 1
self.table.insertNode(node)
else:
if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
self.schedule()
-
+ return t
+
def storeFailed(self, t, node):
print ">>> store failed %s/%s" % (node.host, node.port)
self.table.nodeFailed(node)
self.outstanding -= 1
if self.finished:
- return
+ return t
self.schedule()
-
+ return t
+
def schedule(self):
if self.finished:
return
else:
if not node.id == self.table.node.id:
self.outstanding += 1
- if type(self.value) == type([]):
- df = node.storeValues(self.target, self.value, self.table.node.id)
+ try:
+ f = getattr(node, self.store)
+ except AttributeError:
+ print ">>> %s doesn't have a %s method!" % (node, self.store)
else:
- df = node.storeValue(self.target, self.value, self.table.node.id)
-
- df.addCallback(self.storedValue, node=node)
- df.addErrback(self.storeFailed, node=node)
+ df = f(self.target, self.value, self.table.node.id)
+ df.addCallback(self.storedValue, node=node)
+ df.addErrback(self.storeFailed, node=node)
def goWithNodes(self, nodes):
self.nodes = nodes
from sha import sha
from ktable import KTable, K
-from knode import KNode as Node
+from knode import *
from khash import newID, newIDInRange
threadable.init()
import sys
+from random import randrange
+
import sqlite ## find this at http://pysqlite.sourceforge.net/
class KhashmirDBExcept(Exception):
pass
-# this is the main class!
-class Khashmir(protocol.Factory):
+# this is the base class, has base functionality and find node, no key-value mappings
+class KhashmirBase(protocol.Factory):
__slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
+ _Node = KNodeBase
def __init__(self, host, port, db='khashmir.db'):
self.setup(host, port, db)
self.port = port
self.node = self._loadSelfNode(host, port)
self.table = KTable(self.node)
- self.app = service.Application("krpc")
+ #self.app = service.Application("krpc")
self.udp = krpc.hostbroker(self)
self.udp.protocol = krpc.KRPC
self.listenport = reactor.listenUDP(port, self.udp)
self.last = time.time()
self._loadRoutingTable()
KeyExpirer(store=self.store)
- #self.refreshTable(force=1)
+ self.refreshTable(force=1)
reactor.callLater(60, self.checkpoint, (1,))
-
+
+ def Node(self):
+ n = self._Node()
+ n.table = self.table
+ return n
+
def __del__(self):
self.listenport.stopListening()
id = c.fetchone()[0]
else:
id = newID()
- return Node().init(id, host, port)
+ return self._Node().init(id, host, port)
def _saveSelfNode(self):
- self.store.autocommit = 0
c = self.store.cursor()
c.execute('delete from self where num = 0;')
c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
self.store.commit()
- self.store.autocommit = 1
def checkpoint(self, auto=0):
self._saveSelfNode()
self._dumpRoutingTable()
+ self.refreshTable()
if auto:
- reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+ reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,))
def _findDB(self, db):
import os
def _loadDB(self, db):
try:
self.store = sqlite.connect(db=db)
- self.store.autocommit = 1
+ #self.store.autocommit = 0
except:
import traceback
raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
def _createNewDB(self, db):
self.store = sqlite.connect(db=db)
- self.store.autocommit = 1
s = """
create table kv (key binary, value binary, time timestamp, primary key (key, value));
create index kv_key on kv(key);
"""
c = self.store.cursor()
c.execute(s)
+ self.store.commit()
def _dumpRoutingTable(self):
"""
save routing table nodes to the database
"""
- self.store.autocommit = 0;
c = self.store.cursor()
c.execute("delete from nodes where id not NULL;")
for bucket in self.table.buckets:
for node in bucket.l:
c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
self.store.commit()
- self.store.autocommit = 1;
def _loadRoutingTable(self):
"""
c = self.store.cursor()
c.execute("select * from nodes;")
for rec in c.fetchall():
- n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
+ n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
n.conn = self.udp.connectionForAddr((n.host, n.port))
self.table.insertNode(n, contacted=0)
"""
ping this node and add the contact info to the table on pong!
"""
- n =Node().init(const.NULL_ID, host, port)
+ n =self.Node().init(const.NULL_ID, host, port)
n.conn = self.udp.connectionForAddr((n.host, n.port))
self.sendPing(n, callback=callback)
state = FindNode(self, id, d.callback)
reactor.callFromThread(state.goWithNodes, nodes)
-
- ## also async
- def valueForKey(self, key, callback, searchlocal = 1):
- """ returns the values found for key in global table
- callback will be called with a list of values for each peer that returns unique values
- final callback will be an empty list - probably should change to 'more coming' arg
- """
- nodes = self.table.findNodes(key)
-
- # get locals
- if searchlocal:
- l = self.retrieveValues(key)
- if len(l) > 0:
- reactor.callLater(0, callback, (l))
- else:
- l = []
-
- # create our search state
- state = GetValue(self, key, callback)
- reactor.callFromThread(state.goWithNodes, nodes, l)
-
- ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
- def storeValueForKey(self, key, value, callback=None):
- """ stores the value for key in the global table, returns immediately, no status
- in this implementation, peers respond but don't indicate status to storing values
- a key can have many values
- """
- def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
- if not response:
- # default callback
- def _storedValueHandler(sender):
- pass
- response=_storedValueHandler
- action = StoreValue(self.table, key, value, response)
- reactor.callFromThread(action.goWithNodes, nodes)
-
- # this call is asynch
- self.findNode(key, _storeValueForKey)
-
-
def insertNode(self, n, contacted=1):
"""
insert a node in our local table, pinging oldest contact in bucket, if necessary
_krpc_sender = dict['_krpc_sender']
dict = dict['rsp']
sender = {'id' : dict['id']}
- if node.id != const.NULL_ID and node.id != sender['id']:
- # whoah, got response from different peer than we were expecting
- self.table.invalidateNode(node)
- else:
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = Node().initWithDict(sender)
- n.conn = self.udp.connectionForAddr((n.host, n.port))
- table.insertNode(n)
- if callback:
- callback()
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ n = self.Node().initWithDict(sender)
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
+ table.insertNode(n)
+ if callback:
+ callback()
def _defaultPong(err, node=node, table=self.table, callback=callback):
table.nodeFailed(node)
if callback:
id = newIDInRange(bucket.min, bucket.max)
self.findNode(id, callback)
+ def stats(self):
+ """
+ Returns (num_contacts, num_nodes)
+ num_contacts: number contacts in our routing table
+ num_nodes: number of nodes estimated in the entire dht
+ """
+ num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
+ num_nodes = const.K * (2**(len(self.table.buckets) - 1))
+ return (num_contacts, num_nodes)
- def retrieveValues(self, key):
- c = self.store.cursor()
- c.execute("select value from kv where key = %s;", sqlite.encode(key))
- t = c.fetchone()
- l = []
- while t:
- l.append(t['value'])
- t = c.fetchone()
- return l
-
- #####
- ##### INCOMING MESSAGE HANDLERS
-
def krpc_ping(self, id, _krpc_sender):
sender = {'id' : id}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
- n = Node().initWithDict(sender)
+ n = self.Node().initWithDict(sender)
n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
return {"id" : self.node.id}
sender = {'id' : id}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
- n = Node().initWithDict(sender)
+ n = self.Node().initWithDict(sender)
n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
return {"nodes" : nodes, "id" : self.node.id}
-
- def krpc_store_value(self, key, value, id, _krpc_sender):
- t = "%0.6f" % time.time()
- c = self.store.cursor()
- try:
- c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
- except sqlite.IntegrityError, reason:
- # update last insert time
- c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
- sender = {'id' : id}
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = Node().initWithDict(sender)
- n.conn = self.udp.connectionForAddr((n.host, n.port))
- self.insertNode(n, contacted=0)
- return {"id" : self.node.id}
- ## multiple values per key
- def krpc_store_values(self, key, values, id, _krpc_sender):
- t = "%0.6f" % time.time()
+
+## This class provides read-only access to the DHT, valueForKey
+## you probably want to use this mixin and provide your own write methods
+class KhashmirRead(KhashmirBase):
+ _Node = KNodeRead
+ def retrieveValues(self, key):
c = self.store.cursor()
- key = sqlite.encode(key)
- for value in values:
- value = sqlite.encode(value)
- try:
- c.execute("insert into kv values (%s, %s, %s);", key, value, t)
- except sqlite.IntegrityError, reason:
- # update last insert time
- c.execute("update kv set time = %s where key = %s and value = %s;", (t, key, value))
- sender = {'id' : id}
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = Node().initWithDict(sender)
- n.conn = self.udp.connectionForAddr((n.host, n.port))
- self.insertNode(n, contacted=0)
- return {"id" : self.node.id}
+ c.execute("select value from kv where key = %s;", sqlite.encode(key))
+ t = c.fetchone()
+ l = []
+ while t:
+ l.append(t['value'])
+ t = c.fetchone()
+ return l
+ ## also async
+ def valueForKey(self, key, callback, searchlocal = 1):
+ """ returns the values found for key in global table
+ callback will be called with a list of values for each peer that returns unique values
+ final callback will be an empty list - probably should change to 'more coming' arg
+ """
+ nodes = self.table.findNodes(key)
+
+ # get locals
+ if searchlocal:
+ l = self.retrieveValues(key)
+ if len(l) > 0:
+ reactor.callLater(0, callback, (l))
+ else:
+ l = []
+
+ # create our search state
+ state = GetValue(self, key, callback)
+ reactor.callFromThread(state.goWithNodes, nodes, l)
def krpc_find_value(self, key, id, _krpc_sender):
sender = {'id' : id}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
- n = Node().initWithDict(sender)
+ n = self.Node().initWithDict(sender)
n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
nodes = map(lambda node: node.senderDict(), nodes)
return {'nodes' : nodes, "id": self.node.id}
+### provides a generic write method, you probably don't want to deploy something that allows
+### arbitrary value storage
+class KhashmirWrite(KhashmirRead):
+ _Node = KNodeWrite
+ ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+ def storeValueForKey(self, key, value, callback=None):
+ """ stores the value for key in the global table, returns immediately, no status
+ in this implementation, peers respond but don't indicate status to storing values
+ a key can have many values
+ """
+ def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+ if not response:
+ # default callback
+ def _storedValueHandler(sender):
+ pass
+ response=_storedValueHandler
+ action = StoreValue(self.table, key, value, response)
+ reactor.callFromThread(action.goWithNodes, nodes)
+
+ # this call is asynch
+ self.findNode(key, _storeValueForKey)
+
+ def krpc_store_value(self, key, value, id, _krpc_sender):
+ t = "%0.6f" % time.time()
+ c = self.store.cursor()
+ try:
+ c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
+ except sqlite.IntegrityError, reason:
+ # update last insert time
+ c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
+ self.store.commit()
+ sender = {'id' : id}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ n = self.Node().initWithDict(sender)
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
+ self.insertNode(n, contacted=0)
+ return {"id" : self.node.id}
+
+# the whole shebang, for testing
+class Khashmir(KhashmirWrite):
+ _Node = KNodeWrite