import const
import time
-from bencode import bdecode as loads
-from bencode import bencode as dumps
from sha import sha
from twisted.web import server
threadable.init()
-from bsddb3 import db ## find this at http://pybsddb.sf.net/
-from bsddb3._db import DBNotFoundError
-
-from xmlrpclib import Binary
-
+import sqlite ## find this at http://pysqlite.sourceforge.net/
+import pysqlite_exceptions
+KhashmirDBExcept = "KhashmirDBExcept"
# this is the main class!
class Khashmir(xmlrpc.XMLRPC):
- __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
- def __init__(self, host, port):
+ __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
+ def __init__(self, host, port, db='khashmir.db'):
self.node = Node().init(newID(), host, port)
self.table = KTable(self.node)
self.app = Application("xmlrpc")
self.app.listenTCP(port, server.Site(self))
-
- ## these databases may be more suited to on-disk rather than in-memory
- # h((key, value)) -> (key, value, time) mappings
- self.store = db.DB()
- self.store.open(None, None, db.DB_BTREE)
-
- # <insert time> -> h((key, value))
- self.itime = db.DB()
- self.itime.set_flags(db.DB_DUP)
- self.itime.open(None, None, db.DB_BTREE)
-
- # key -> h((key, value))
- self.kw = db.DB()
- self.kw.set_flags(db.DB_DUP)
- self.kw.open(None, None, db.DB_BTREE)
-
- KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
-
+ self.findDB(db)
+ self.last = time.time()
+ KeyExpirer(store=self.store)
+
+ def findDB(self, db):
+ import os
+ try:
+ os.stat(db)
+ except OSError:
+ self.createNewDB(db)
+ else:
+ self.loadDB(db)
+
+ def loadDB(self, db):
+ try:
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ except:
+ import traceback
+ raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+
+ def createNewDB(self, db):
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ s = """
+ create table kv (key text, value text, time timestamp, primary key (key, value));
+ create index kv_key on kv(key);
+ create index kv_timestamp on kv(time);
+
+ create table nodes (id text primary key, host text, port number);
+ """
+ c = self.store.cursor()
+ c.execute(s)
+
def render(self, request):
"""
Override the built in render so we can have access to the request object!
"""
ping this node and add the contact info to the table on pong!
"""
- n =Node().init(" "*20, host, port) # note, we
+ n =Node().init(const.NULL_ID, host, port) # note, we
self.sendPing(n)
## also async
def valueForKey(self, key, callback):
- """ returns the values found for key in global table """
+ """ returns the values found for key in global table
+ callback will be called with a list of values for each peer that returns unique values
+ final callback will be an empty list - probably should change to 'more coming' arg
+ """
nodes = self.table.findNodes(key)
# get locals
l = self.retrieveValues(key)
if len(l) > 0:
- reactor.callFromThread(callback, l)
+ reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
# create our search state
state = GetValue(self, key, callback)
- ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
+ ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
def storeValueForKey(self, key, value, callback=None):
""" stores the value for key in the global table, returns immediately, no status
in this implementation, peers respond but don't indicate status to storing values
- values are stored in peers on a first-come first-served basis
- this will probably change so more than one value can be stored under a key
+ a key can have many values
"""
def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
if not callback:
- # default callback - this will get called for each successful store value
+ # default callback
def _storedValueHandler(sender):
pass
response=_storedValueHandler
- for node in nodes:
+ for node in nodes[:const.STORE_REDUNDANCY]:
def cb(t, table = table, node=node, resp=response):
self.table.insertNode(node)
response(t)
"""
df = node.ping(self.node.senderDict())
## these are the callbacks we use when we issue a PING
- def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
- if id != 20 * ' ' and id != sender['id'].data:
+ def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
+ l, sender = args
+ if id != const.NULL_ID and id != sender['id'].decode('base64'):
# whoah, got response from different peer than we were expecting
pass
else:
df.addCallbacks(_pongHandler,_defaultPong)
- def findCloseNodes(self):
+ def findCloseNodes(self, callback=lambda a: None):
"""
This does a findNode on the ID one away from our own.
This will allow us to populate our table with nodes on our network closest to our own.
This is called as soon as we start up with an empty table
"""
id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
- def callback(nodes):
- pass
self.findNode(id, callback)
def refreshTable(self):
def retrieveValues(self, key):
- if self.kw.has_key(key):
- c = self.kw.cursor()
- tup = c.set(key)
- l = []
- while(tup and tup[0] == key):
- h1 = tup[1]
- v = loads(self.store[h1])[1]
- l.append(v)
- tup = c.next()
- return l
- return []
+ s = "select value from kv where key = '%s';" % key.encode('base64')
+ c = self.store.cursor()
+ c.execute(s)
+ t = c.fetchone()
+ l = []
+ while t:
+ l.append(t['value'])
+ t = c.fetchone()
+ return l
#####
##### INCOMING MESSAGE HANDLERS
sender['host'] = ip
n = Node().initWithDict(sender)
self.insertNode(n, contacted=0)
- return self.node.senderDict()
+ return (), self.node.senderDict()
def xmlrpc_find_node(self, target, sender):
- nodes = self.table.findNodes(target.data)
+ nodes = self.table.findNodes(target.decode('base64'))
nodes = map(lambda node: node.senderDict(), nodes)
ip = self.crequest.getClientIP()
sender['host'] = ip
n = Node().initWithDict(sender)
self.insertNode(n, contacted=0)
return nodes, self.node.senderDict()
-
+
def xmlrpc_store_value(self, key, value, sender):
- key = key.data
- h1 = sha(key+value.data).digest()
- t = `time.time()`
- if not self.store.has_key(h1):
- v = dumps((key, value.data, t))
- self.store.put(h1, v)
- self.itime.put(t, h1)
- self.kw.put(key, h1)
- else:
+ t = "%0.6f" % time.time()
+ s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
+ c = self.store.cursor()
+ try:
+ c.execute(s)
+ except pysqlite_exceptions.IntegrityError, reason:
# update last insert time
- tup = loads(self.store[h1])
- self.store[h1] = dumps((tup[0], tup[1], t))
- self.itime.put(t, h1)
-
+ s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
+ c.execute(s)
ip = self.crequest.getClientIP()
sender['host'] = ip
n = Node().initWithDict(sender)
self.insertNode(n, contacted=0)
- return self.node.senderDict()
+ return (), self.node.senderDict()
def xmlrpc_find_value(self, key, sender):
ip = self.crequest.getClientIP()
- key = key.data
+ key = key.decode('base64')
sender['host'] = ip
n = Node().initWithDict(sender)
self.insertNode(n, contacted=0)
l = self.retrieveValues(key)
if len(l) > 0:
- l = map(lambda v: Binary(v), l)
return {'values' : l}, self.node.senderDict()
else:
nodes = self.table.findNodes(key)
def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
from whrandom import randrange
+ import threading
import thread
port = 2001
l = []
print "Building %s peer table." % peers
for i in xrange(peers):
- a = Khashmir(host, port + i)
+ a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
l.append(a)
time.sleep(1)
for peer in l[1:]:
peer.app.run()
- #time.sleep(.25)
+ time.sleep(10)
print "adding contacts...."
n = l[randrange(0, len(l))].node
peer.addContact(host, n.port)
if pause:
- time.sleep(.5)
-
+ time.sleep(.33)
+
time.sleep(10)
print "finding close nodes...."
for peer in l:
- peer.findCloseNodes()
- if pause:
- time.sleep(2)
- if pause:
- time.sleep(10)
+ flag = threading.Event()
+ def cb(nodes, f=flag):
+ f.set()
+ peer.findCloseNodes(cb)
+ flag.wait()
+
# for peer in l:
# peer.refreshTable()
return l
d.valueForKey(key, cb(fc).callback)
fc.wait()
-def test_one(host, port):
+def test_one(host, port, db='/tmp/test'):
import thread
- k = Khashmir(host, port)
+ k = Khashmir(host, port, db)
thread.start_new_thread(k.app.run, ())
return k