from twisted.web import server
threadable.init()
-from bsddb3 import db ## find this at http://pybsddb.sf.net/
-from bsddb3._db import DBNotFoundError
-
-from xmlrpclib import Binary
+import sqlite ## find this at http://pysqlite.sourceforge.net/
+KhashmirDBExcept = "KhashmirDBExcept"
# this is the main class!
class Khashmir(xmlrpc.XMLRPC):
- __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
- def __init__(self, host, port):
+ __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
+ def __init__(self, host, port, db='khashmir.db'):
self.node = Node().init(newID(), host, port)
self.table = KTable(self.node)
self.app = Application("xmlrpc")
self.app.listenTCP(port, server.Site(self))
-
- ## these databases may be more suited to on-disk rather than in-memory
- # h((key, value)) -> (key, value, time) mappings
- self.store = db.DB()
- self.store.open(None, None, db.DB_BTREE)
-
- # <insert time> -> h((key, value))
- self.itime = db.DB()
- self.itime.set_flags(db.DB_DUP)
- self.itime.open(None, None, db.DB_BTREE)
-
- # key -> h((key, value))
- self.kw = db.DB()
- self.kw.set_flags(db.DB_DUP)
- self.kw.open(None, None, db.DB_BTREE)
-
- KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
-
+ self.findDB(db)
+ self.last = time.time()
+ KeyExpirer(store=self.store)
+
+ def findDB(self, db):
+ import os
+ try:
+ os.stat(db)
+ except OSError:
+ self.createNewDB(db)
+ else:
+ self.loadDB(db)
+
+ def loadDB(self, db):
+ try:
+ self.store = sqlite.connect(db=db)
+ except:
+ import traceback
+ raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+
+ def createNewDB(self, db):
+ self.store = sqlite.connect(db=db)
+ s = """
+ create table kv (hkv text primary key, key text, value text, time timestamp);
+ create index kv_key on kv(key);
+
+ create table nodes (id text primary key, host text, port number);
+ """
+ c = self.store.cursor()
+ c.execute(s)
+ self.store.commit()
+
def render(self, request):
"""
Override the built in render so we can have access to the request object!
## also async
def valueForKey(self, key, callback):
- """ returns the values found for key in global table """
+ """ returns the values found for key in global table
+ callback will be called with a list of values for each peer that returns unique values
+ final callback will be an empty list - probably should change to 'more coming' arg
+ """
nodes = self.table.findNodes(key)
# get locals
l = self.retrieveValues(key)
if len(l) > 0:
- reactor.callFromThread(callback, l)
+ reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
# create our search state
state = GetValue(self, key, callback)
- ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
+ ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
def storeValueForKey(self, key, value, callback=None):
""" stores the value for key in the global table, returns immediately, no status
in this implementation, peers respond but don't indicate status to storing values
- values are stored in peers on a first-come first-served basis
- this will probably change so more than one value can be stored under a key
+ a key can have many values
"""
def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
if not callback:
- # default callback - this will get called for each successful store value
+ # default callback
def _storedValueHandler(sender):
pass
response=_storedValueHandler
## these are the callbacks we use when we issue a PING
def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
l, sender = args
- if id != const.NULL_ID and id != sender['id'].data:
+ if id != const.NULL_ID and id != sender['id'].decode('base64'):
# whoah, got response from different peer than we were expecting
pass
else:
def retrieveValues(self, key):
- if self.kw.has_key(key):
- c = self.kw.cursor()
- tup = c.set(key)
- l = []
- while(tup and tup[0] == key):
- h1 = tup[1]
- v = loads(self.store[h1])[1]
- l.append(v)
- tup = c.next()
- return l
- return []
+ s = "select value from kv where key = '%s';" % key.encode('base64')
+ c = self.store.cursor()
+ c.execute(s)
+ t = c.fetchone()
+ l = []
+ while t:
+ l.append(t['value'])
+ t = c.fetchone()
+ return l
#####
##### INCOMING MESSAGE HANDLERS
return (), self.node.senderDict()
def xmlrpc_find_node(self, target, sender):
- nodes = self.table.findNodes(target.data)
+ nodes = self.table.findNodes(target.decode('base64'))
nodes = map(lambda node: node.senderDict(), nodes)
ip = self.crequest.getClientIP()
sender['host'] = ip
n = Node().initWithDict(sender)
self.insertNode(n, contacted=0)
return nodes, self.node.senderDict()
-
+
def xmlrpc_store_value(self, key, value, sender):
- key = key.data
- h1 = sha(key+value.data).digest()
+ h1 = sha(key+value).digest().encode('base64')
t = `time.time()`
- if not self.store.has_key(h1):
- v = dumps((key, value.data, t))
- self.store.put(h1, v)
- self.itime.put(t, h1)
- self.kw.put(key, h1)
- else:
+ s = "insert into kv values ('%s', '%s', '%s', '%s')" % (h1, key, value, t)
+ c = self.store.cursor()
+ try:
+ c.execute(s)
+ except:
# update last insert time
- tup = loads(self.store[h1])
- self.store[h1] = dumps((tup[0], tup[1], t))
- self.itime.put(t, h1)
-
+ s = "update kv set time = '%s' where hkv = '%s'" % (t, h1)
+ c.execute(s)
+ self.store.commit()
ip = self.crequest.getClientIP()
sender['host'] = ip
n = Node().initWithDict(sender)
def xmlrpc_find_value(self, key, sender):
ip = self.crequest.getClientIP()
- key = key.data
+ key = key.decode('base64')
sender['host'] = ip
n = Node().initWithDict(sender)
self.insertNode(n, contacted=0)
l = self.retrieveValues(key)
if len(l) > 0:
- l = map(lambda v: Binary(v), l)
return {'values' : l}, self.node.senderDict()
else:
nodes = self.table.findNodes(key)
print "Building %s peer table." % peers
for i in xrange(peers):
- a = Khashmir(host, port + i)
+ a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
l.append(a)
d.valueForKey(key, cb(fc).callback)
fc.wait()
-def test_one(host, port):
+def test_one(host, port, db='/tmp/test'):
import thread
- k = Khashmir(host, port)
+ k = Khashmir(host, port, db)
thread.start_new_thread(k.app.run, ())
return k
from twisted.internet.defer import Deferred
from xmlrpcclient import XMLRPCClientFactory as factory
from const import reactor, NULL_ID
-from xmlrpclib import Binary
-
class KNode(Node):
def makeResponse(self, df):
except:
d.callback(args)
else:
- if self.id != NULL_ID and sender['id'].data != self.id:
+ if self.id != NULL_ID and sender['id'] != self._senderDict['id']:
d.errback()
else:
d.callback(args)
return df
def findNode(self, target, sender):
df = Deferred()
- f = factory('find_node', (Binary(target), sender), self.makeResponse(df), df.errback)
+ f = factory('find_node', (target.encode('base64'), sender), self.makeResponse(df), df.errback)
reactor.connectTCP(self.host, self.port, f)
return df
def storeValue(self, key, value, sender):
df = Deferred()
- f = factory('store_value', (Binary(key), Binary(value), sender), self.makeResponse(df), df.errback)
+ f = factory('store_value', (key.encode('base64'), value.encode('base64'), sender), self.makeResponse(df), df.errback)
reactor.connectTCP(self.host, self.port, f)
return df
def findValue(self, key, sender):
df = Deferred()
- f = factory('find_value', (Binary(key), sender), self.makeResponse(df), df.errback)
+ f = factory('find_value', (key.encode('base64'), sender), self.makeResponse(df), df.errback)
reactor.connectTCP(self.host, self.port, f)
return df