## Copyright 2002 Andrew Loewenstern, All Rights Reserved
from const import reactor
+import const
+
import time
-from pickle import loads, dumps
+
from sha import sha
from ktable import KTable, K
from knode import KNode as Node
-from hash import newID
+from hash import newID, newIDInRange
from actions import FindNode, GetValue, KeyExpirer
from twisted.web import xmlrpc
from twisted.web import server
threadable.init()
-from bsddb3 import db ## find this at http://pybsddb.sf.net/
-from bsddb3._db import DBNotFoundError
-
-from xmlrpclib import Binary
-
-# don't ping unless it's been at least this many seconds since we've heard from a peer
-MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
-
+import sqlite ## find this at http://pysqlite.sourceforge.net/
+import pysqlite_exceptions
+KhashmirDBExcept = "KhashmirDBExcept"
# this is the main class!
class Khashmir(xmlrpc.XMLRPC):
- __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
- def __init__(self, host, port):
+ __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
+ def __init__(self, host, port, db='khashmir.db'):
self.node = Node().init(newID(), host, port)
self.table = KTable(self.node)
self.app = Application("xmlrpc")
self.app.listenTCP(port, server.Site(self))
-
- ## these databases may be more suited to on-disk rather than in-memory
- # h((key, value)) -> (key, value, time) mappings
- self.store = db.DB()
- self.store.open(None, None, db.DB_BTREE)
-
- # <insert time> -> h((key, value))
- self.itime = db.DB()
- self.itime.set_flags(db.DB_DUP)
- self.itime.open(None, None, db.DB_BTREE)
-
- # key -> h((key, value))
- self.kw = db.DB()
- self.kw.set_flags(db.DB_DUP)
- self.kw.open(None, None, db.DB_BTREE)
-
- KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
-
+ self.findDB(db)
+ self.last = time.time()
+ KeyExpirer(store=self.store)
+
+ def findDB(self, db):
+ import os
+ try:
+ os.stat(db)
+ except OSError:
+ self.createNewDB(db)
+ else:
+ self.loadDB(db)
+
+ def loadDB(self, db):
+ try:
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ except:
+ import traceback
+ raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+
+ def createNewDB(self, db):
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ s = """
+ create table kv (key text, value text, time timestamp, primary key (key, value));
+ create index kv_key on kv(key);
+ create index kv_timestamp on kv(time);
+
+ create table nodes (id text primary key, host text, port number);
+ """
+ c = self.store.cursor()
+ c.execute(s)
+
def render(self, request):
"""
Override the built in render so we can have access to the request object!
"""
ping this node and add the contact info to the table on pong!
"""
- n =Node().init(" "*20, host, port) # note, we
+ n =Node().init(const.NULL_ID, host, port) # note, we
self.sendPing(n)
# get K nodes out of local table/cache, or the node we want
nodes = self.table.findNodes(id)
d = Deferred()
- d.addCallbacks(callback, errback)
+ if errback:
+ d.addCallbacks(callback, errback)
+ else:
+ d.addCallback(callback)
if len(nodes) == 1 and nodes[0].id == id :
d.callback(nodes)
else:
## also async
def valueForKey(self, key, callback):
- """ returns the values found for key in global table """
+ """ returns the values found for key in global table
+ callback will be called with a list of values for each peer that returns unique values
+ final callback will be an empty list - probably should change to 'more coming' arg
+ """
nodes = self.table.findNodes(key)
- # create our search state
- state = GetValue(self, key, callback)
- reactor.callFromThread(state.goWithNodes, nodes)
-
+
# get locals
l = self.retrieveValues(key)
if len(l) > 0:
- callback(l)
+ reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
+ # create our search state
+ state = GetValue(self, key, callback)
+ reactor.callFromThread(state.goWithNodes, nodes, l)
+
- ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
+ ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
def storeValueForKey(self, key, value, callback=None):
""" stores the value for key in the global table, returns immediately, no status
in this implementation, peers respond but don't indicate status to storing values
- values are stored in peers on a first-come first-served basis
- this will probably change so more than one value can be stored under a key
+ a key can have many values
"""
- def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
+ def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
if not callback:
- # default callback - this will get called for each successful store value
+ # default callback
def _storedValueHandler(sender):
pass
response=_storedValueHandler
- for node in nodes:
+
+ for node in nodes[:const.STORE_REDUNDANCY]:
+ def cb(t, table = table, node=node, resp=response):
+ self.table.insertNode(node)
+ response(t)
if node.id != self.node.id:
+ def default(err, node=node, table=table):
+ table.nodeFailed(node)
df = node.storeValue(key, value, self.node.senderDict())
- df.addCallbacks(response, default)
+ df.addCallbacks(cb, lambda x: None)
# this call is asynch
self.findNode(key, _storeValueForKey)
- def insertNode(self, n):
+ def insertNode(self, n, contacted=1):
"""
insert a node in our local table, pinging oldest contact in bucket, if necessary
a node into the table without it's peer-ID. That means of course the node passed into this
method needs to be a properly formed Node object with a valid ID.
"""
- old = self.table.insertNode(n)
- if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
+ old = self.table.insertNode(n, contacted=contacted)
+ if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
# the bucket is full, check to see if old node is still around and if so, replace it
## these are the callbacks used when we ping the oldest node in a bucket
self.table.replaceStaleNode(old, newnode)
def _notStaleNodeHandler(sender, old=old):
- """ called when we get a ping from the remote node """
+ """ called when we get a pong from the old node """
sender = Node().initWithDict(sender)
if sender.id == old.id:
- self.table.insertNode(old)
+ self.table.justSeenNode(old)
df = old.ping(self.node.senderDict())
df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
"""
df = node.ping(self.node.senderDict())
## these are the callbacks we use when we issue a PING
- def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
- if id != 20 * ' ' and id != sender['id'].data:
+ def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
+ l, sender = args
+ if id != const.NULL_ID and id != sender['id'].decode('base64'):
# whoah, got response from different peer than we were expecting
pass
else:
- #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
sender['host'] = host
sender['port'] = port
n = Node().initWithDict(sender)
table.insertNode(n)
return
- def _defaultPong(err):
- # this should probably increment a failed message counter and dump the node if it gets over a threshold
- return
+ def _defaultPong(err, node=node, table=self.table):
+ table.nodeFailed(node)
df.addCallbacks(_pongHandler,_defaultPong)
- def findCloseNodes(self):
+ def findCloseNodes(self, callback=lambda a: None):
"""
This does a findNode on the ID one away from our own.
This will allow us to populate our table with nodes on our network closest to our own.
This is called as soon as we start up with an empty table
"""
id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
- def callback(nodes):
- pass
self.findNode(id, callback)
def refreshTable(self):
pass
for bucket in self.table.buckets:
- if time.time() - bucket.lastAccessed >= 60 * 60:
- id = randRange(bucket.min, bucket.max)
+ if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
+ id = newIDInRange(bucket.min, bucket.max)
self.findNode(id, callback)
def retrieveValues(self, key):
- if self.kw.has_key(key):
- c = self.kw.cursor()
- tup = c.set(key)
- l = []
- while(tup and tup[0] == key):
- h1 = tup[1]
- v = loads(self.store[h1])[1]
- l.append(v)
- tup = c.next()
- l = map(lambda v: Binary(v), l)
- return l
- return []
+ s = "select value from kv where key = '%s';" % key.encode('base64')
+ c = self.store.cursor()
+ c.execute(s)
+ t = c.fetchone()
+ l = []
+ while t:
+ l.append(t['value'])
+ t = c.fetchone()
+ return l
#####
##### INCOMING MESSAGE HANDLERS
ip = self.crequest.getClientIP()
sender['host'] = ip
n = Node().initWithDict(sender)
- self.insertNode(n)
- return self.node.senderDict()
+ self.insertNode(n, contacted=0)
+ return (), self.node.senderDict()
def xmlrpc_find_node(self, target, sender):
- nodes = self.table.findNodes(target.data)
+ nodes = self.table.findNodes(target.decode('base64'))
nodes = map(lambda node: node.senderDict(), nodes)
ip = self.crequest.getClientIP()
sender['host'] = ip
n = Node().initWithDict(sender)
- self.insertNode(n)
+ self.insertNode(n, contacted=0)
return nodes, self.node.senderDict()
-
+
def xmlrpc_store_value(self, key, value, sender):
- key = key.data
- h1 = sha(key+value.data).digest()
- t = `time.time()`
- if not self.store.has_key(h1):
- v = dumps((key, value.data, t))
- self.store.put(h1, v)
- self.itime.put(t, h1)
- self.kw.put(key, h1)
- else:
+ t = "%0.6f" % time.time()
+ s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
+ c = self.store.cursor()
+ try:
+ c.execute(s)
+ except pysqlite_exceptions.IntegrityError, reason:
# update last insert time
- tup = loads(self.store[h1])
- self.store[h1] = dumps((tup[0], tup[1], t))
- self.itime.put(t, h1)
-
+ s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
+ c.execute(s)
ip = self.crequest.getClientIP()
sender['host'] = ip
n = Node().initWithDict(sender)
- self.insertNode(n)
- return self.node.senderDict()
+ self.insertNode(n, contacted=0)
+ return (), self.node.senderDict()
def xmlrpc_find_value(self, key, sender):
ip = self.crequest.getClientIP()
- key = key.data
+ key = key.decode('base64')
sender['host'] = ip
n = Node().initWithDict(sender)
- self.insertNode(n)
+ self.insertNode(n, contacted=0)
l = self.retrieveValues(key)
- if len(l):
+ if len(l) > 0:
return {'values' : l}, self.node.senderDict()
else:
nodes = self.table.findNodes(key)
def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
from whrandom import randrange
+ import threading
import thread
port = 2001
l = []
print "Building %s peer table." % peers
for i in xrange(peers):
- a = Khashmir(host, port + i)
+ a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
l.append(a)
time.sleep(1)
for peer in l[1:]:
peer.app.run()
- #time.sleep(.25)
+ time.sleep(10)
print "adding contacts...."
n = l[randrange(0, len(l))].node
peer.addContact(host, n.port)
if pause:
- time.sleep(.30)
-
- time.sleep(1)
+ time.sleep(.33)
+
+ time.sleep(10)
print "finding close nodes...."
for peer in l:
- peer.findCloseNodes()
- if pause:
- time.sleep(.5)
- if pause:
- time.sleep(2)
+ flag = threading.Event()
+ def cb(nodes, f=flag):
+ f.set()
+ peer.findCloseNodes(cb)
+ flag.wait()
+
# for peer in l:
# peer.refreshTable()
return l
try:
if(len(values) == 0):
if not self.found:
- print "find FAILED"
+ print "find NOT FOUND"
else:
print "find FOUND"
sys.stdout.flush()
d.valueForKey(key, cb(fc).callback)
fc.wait()
-def test_one(port):
+def test_one(host, port, db='/tmp/test'):
import thread
- k = Khashmir('localhost', port)
+ k = Khashmir(host, port, db)
thread.start_new_thread(k.app.run, ())
return k
if __name__ == "__main__":
- l = test_build_net()
+ import sys
+ n = 8
+ if len(sys.argv) > 1:
+ n = int(sys.argv[1])
+ l = test_build_net(peers=n)
time.sleep(3)
print "finding nodes..."
for i in range(10):