## Copyright 2002 Andrew Loewenstern, All Rights Reserved
from const import reactor
+import const
+
import time
+from bencode import bdecode as loads
+from bencode import bencode as dumps
+
+from sha import sha
from ktable import KTable, K
from knode import KNode as Node
-from hash import newID
+from hash import newID, newIDInRange
-from actions import FindNode, GetValue
+from actions import FindNode, GetValue, KeyExpirer
from twisted.web import xmlrpc
from twisted.internet.defer import Deferred
from twisted.python import threadable
+from twisted.internet.app import Application
+from twisted.web import server
threadable.init()
-from bsddb3 import db ## find this at http://pybsddb.sf.net/
-from bsddb3._db import DBNotFoundError
-
-# don't ping unless it's been at least this many seconds since we've heard from a peer
-MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
+import sqlite ## find this at http://pysqlite.sourceforge.net/
+KhashmirDBExcept = "KhashmirDBExcept"
# this is the main class!
class Khashmir(xmlrpc.XMLRPC):
- __slots__ = ['listener', 'node', 'table', 'store', 'app']
- def __init__(self, host, port):
- self.node = Node(newID(), host, port)
+ __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
+ def __init__(self, host, port, db='khashmir.db'):
+ self.node = Node().init(newID(), host, port)
self.table = KTable(self.node)
- from twisted.internet.app import Application
- from twisted.web import server
self.app = Application("xmlrpc")
self.app.listenTCP(port, server.Site(self))
- self.store = db.DB()
- self.store.open(None, None, db.DB_BTREE)
-
+ self.findDB(db)
+ self.last = time.time()
+ KeyExpirer(store=self.store)
+ def findDB(self, db):
+ import os
+ try:
+ os.stat(db)
+ except OSError:
+ self.createNewDB(db)
+ else:
+ self.loadDB(db)
+
+ def loadDB(self, db):
+ try:
+ self.store = sqlite.connect(db=db)
+ except:
+ import traceback
+ raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+
+ def createNewDB(self, db):
+ self.store = sqlite.connect(db=db)
+ s = """
+ create table kv (hkv text primary key, key text, value text, time timestamp);
+ create index kv_key on kv(key);
+
+ create table nodes (id text primary key, host text, port number);
+ """
+ c = self.store.cursor()
+ c.execute(s)
+ self.store.commit()
+
def render(self, request):
"""
Override the built in render so we can have access to the request object!
"""
ping this node and add the contact info to the table on pong!
"""
- n =Node(" "*20, host, port) # note, we
+ n =Node().init(const.NULL_ID, host, port) # note, we
self.sendPing(n)
# get K nodes out of local table/cache, or the node we want
nodes = self.table.findNodes(id)
d = Deferred()
- d.addCallbacks(callback, errback)
+ if errback:
+ d.addCallbacks(callback, errback)
+ else:
+ d.addCallback(callback)
if len(nodes) == 1 and nodes[0].id == id :
d.callback(nodes)
else:
## also async
def valueForKey(self, key, callback):
- """ returns the values found for key in global table """
+ """ returns the values found for key in global table
+ callback will be called with a list of values for each peer that returns unique values
+ final callback will be an empty list - probably should change to 'more coming' arg
+ """
nodes = self.table.findNodes(key)
+
+ # get locals
+ l = self.retrieveValues(key)
+ if len(l) > 0:
+ reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
+
# create our search state
state = GetValue(self, key, callback)
- reactor.callFromThread(state.goWithNodes, nodes)
+ reactor.callFromThread(state.goWithNodes, nodes, l)
+
- ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
- def storeValueForKey(self, key, value):
+ ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+ def storeValueForKey(self, key, value, callback=None):
""" stores the value for key in the global table, returns immediately, no status
in this implementation, peers respond but don't indicate status to storing values
- values are stored in peers on a first-come first-served basis
- this will probably change so more than one value can be stored under a key
+ a key can have many values
"""
- def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
- for node in nodes:
+ def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+ if not callback:
+ # default callback
+ def _storedValueHandler(sender):
+ pass
+ response=_storedValueHandler
+
+ for node in nodes[:const.STORE_REDUNDANCY]:
+ def cb(t, table = table, node=node, resp=response):
+ self.table.insertNode(node)
+ response(t)
if node.id != self.node.id:
+ def default(err, node=node, table=table):
+ table.nodeFailed(node)
df = node.storeValue(key, value, self.node.senderDict())
- df.addCallbacks(response, default)
+ df.addCallbacks(cb, lambda x: None)
# this call is asynch
self.findNode(key, _storeValueForKey)
- def insertNode(self, n):
+ def insertNode(self, n, contacted=1):
"""
insert a node in our local table, pinging oldest contact in bucket, if necessary
a node into the table without it's peer-ID. That means of course the node passed into this
method needs to be a properly formed Node object with a valid ID.
"""
- old = self.table.insertNode(n)
- if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
+ old = self.table.insertNode(n, contacted=contacted)
+ if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
# the bucket is full, check to see if old node is still around and if so, replace it
## these are the callbacks used when we ping the oldest node in a bucket
self.table.replaceStaleNode(old, newnode)
def _notStaleNodeHandler(sender, old=old):
- """ called when we get a ping from the remote node """
- if sender['id'] == old.id:
- self.table.insertNode(old)
+ """ called when we get a pong from the old node """
+ sender = Node().initWithDict(sender)
+ if sender.id == old.id:
+ self.table.justSeenNode(old)
- df = old.ping()
- df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
+ df = old.ping(self.node.senderDict())
+ df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
def sendPing(self, node):
"""
df = node.ping(self.node.senderDict())
## these are the callbacks we use when we issue a PING
- def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
- if id != 20 * ' ' and id != sender['id']:
+ def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
+ l, sender = args
+ if id != const.NULL_ID and id != sender['id'].decode('base64'):
# whoah, got response from different peer than we were expecting
pass
else:
- #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
- n = Node(sender['id'], host, port)
+ sender['host'] = host
+ sender['port'] = port
+ n = Node().initWithDict(sender)
table.insertNode(n)
return
- def _defaultPong(err):
- # this should probably increment a failed message counter and dump the node if it gets over a threshold
- return
+ def _defaultPong(err, node=node, table=self.table):
+ table.nodeFailed(node)
df.addCallbacks(_pongHandler,_defaultPong)
pass
for bucket in self.table.buckets:
- if time.time() - bucket.lastAccessed >= 60 * 60:
- id = randRange(bucket.min, bucket.max)
+ if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
+ id = newIDInRange(bucket.min, bucket.max)
self.findNode(id, callback)
+ def retrieveValues(self, key):
+ s = "select value from kv where key = '%s';" % key.encode('base64')
+ c = self.store.cursor()
+ c.execute(s)
+ t = c.fetchone()
+ l = []
+ while t:
+ l.append(t['value'])
+ t = c.fetchone()
+ return l
+
#####
##### INCOMING MESSAGE HANDLERS
returns sender dict
"""
ip = self.crequest.getClientIP()
- n = Node(sender['id'], ip, sender['port'])
- self.insertNode(n)
- return self.node.senderDict()
+ sender['host'] = ip
+ n = Node().initWithDict(sender)
+ self.insertNode(n, contacted=0)
+ return (), self.node.senderDict()
def xmlrpc_find_node(self, target, sender):
- nodes = self.table.findNodes(target)
+ nodes = self.table.findNodes(target.decode('base64'))
nodes = map(lambda node: node.senderDict(), nodes)
ip = self.crequest.getClientIP()
- n = Node(sender['id'], ip, sender['port'])
- self.insertNode(n)
+ sender['host'] = ip
+ n = Node().initWithDict(sender)
+ self.insertNode(n, contacted=0)
return nodes, self.node.senderDict()
-
+
def xmlrpc_store_value(self, key, value, sender):
- if not self.store.has_key(key):
- self.store.put(key, value)
+ h1 = sha(key+value).digest().encode('base64')
+ t = `time.time()`
+ s = "insert into kv values ('%s', '%s', '%s', '%s')" % (h1, key, value, t)
+ c = self.store.cursor()
+ try:
+ c.execute(s)
+ except:
+ # update last insert time
+ s = "update kv set time = '%s' where hkv = '%s'" % (t, h1)
+ c.execute(s)
+ self.store.commit()
ip = self.crequest.getClientIP()
- n = Node(sender['id'], ip, sender['port'])
- self.insertNode(n)
- return self.node.senderDict()
+ sender['host'] = ip
+ n = Node().initWithDict(sender)
+ self.insertNode(n, contacted=0)
+ return (), self.node.senderDict()
def xmlrpc_find_value(self, key, sender):
ip = self.crequest.getClientIP()
- n = Node(sender['id'], ip, sender['port'])
- self.insertNode(n)
- if self.store.has_key(key):
- return {'values' : self.store[key]}, self.node.senderDict()
+ key = key.decode('base64')
+ sender['host'] = ip
+ n = Node().initWithDict(sender)
+ self.insertNode(n, contacted=0)
+
+ l = self.retrieveValues(key)
+ if len(l) > 0:
+ return {'values' : l}, self.node.senderDict()
else:
- nodes = self.table.findNodes(msg['key'])
+ nodes = self.table.findNodes(key)
nodes = map(lambda node: node.senderDict(), nodes)
return {'nodes' : nodes}, self.node.senderDict()
- ###
- ### message response callbacks
- # called when we get a response to store value
- def _storedValueHandler(self, sender):
- pass
-#------
-def test_build_net(quiet=0):
+
+#------ testing
+
+def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
from whrandom import randrange
import thread
port = 2001
l = []
- peers = 16
-
+
if not quiet:
print "Building %s peer table." % peers
for i in xrange(peers):
- a = Khashmir('localhost', port + i)
+ a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
l.append(a)
- def run(l=l):
- while(1):
- events = 0
- for peer in l:
- events = events + peer.dispatcher.runOnce()
- if events == 0:
- time.sleep(.25)
thread.start_new_thread(l[0].app.run, ())
+ time.sleep(1)
for peer in l[1:]:
peer.app.run()
-
+ #time.sleep(.25)
+
+ print "adding contacts...."
+
for peer in l[1:]:
n = l[randrange(0, len(l))].node
- peer.addContact(n.host, n.port)
+ peer.addContact(host, n.port)
n = l[randrange(0, len(l))].node
- peer.addContact(n.host, n.port)
+ peer.addContact(host, n.port)
n = l[randrange(0, len(l))].node
- peer.addContact(n.host, n.port)
-
- time.sleep(5)
+ peer.addContact(host, n.port)
+ if pause:
+ time.sleep(.5)
+
+ time.sleep(1)
+ print "finding close nodes...."
for peer in l:
peer.findCloseNodes()
- time.sleep(5)
- for peer in l:
- peer.refreshTable()
+ if pause:
+ time.sleep(.5)
+ if pause:
+ time.sleep(1)
+# for peer in l:
+# peer.refreshTable()
return l
def test_find_nodes(l, quiet=0):
a = l[randrange(0,n)]
b = l[randrange(0,n)]
- def callback(nodes, l=l, flag=flag):
- if (len(nodes) >0) and (nodes[0].id == b.node.id):
+ def callback(nodes, flag=flag, id = b.node.id):
+ if (len(nodes) >0) and (nodes[0].id == id):
print "test_find_nodes PASSED"
else:
print "test_find_nodes FAILED"
def test_find_value(l, quiet=0):
from whrandom import randrange
from sha import sha
+ from hash import newID
import time, threading, sys
fa = threading.Event()
c = l[randrange(0,n)]
d = l[randrange(0,n)]
- key = sha(`randrange(0,100000)`).digest()
- value = sha(`randrange(0,100000)`).digest()
+ key = newID()
+ value = newID()
if not quiet:
- print "inserting value...",
+ print "inserting value..."
sys.stdout.flush()
a.storeValueForKey(key, value)
time.sleep(3)
print "finding..."
+ sys.stdout.flush()
- def mc(flag, value=value):
- def callback(values, f=flag, val=value):
+ class cb:
+ def __init__(self, flag, value=value):
+ self.flag = flag
+ self.val = value
+ self.found = 0
+ def callback(self, values):
try:
if(len(values) == 0):
- print "find FAILED"
- else:
- if values[0]['value'] != val:
- print "find FAILED"
+ if not self.found:
+ print "find NOT FOUND"
else:
print "find FOUND"
+ sys.stdout.flush()
+
+ else:
+ if self.val in values:
+ self.found = 1
finally:
- f.set()
- return callback
- b.valueForKey(key, mc(fa))
- c.valueForKey(key, mc(fb))
- d.valueForKey(key, mc(fc))
-
+ self.flag.set()
+
+ b.valueForKey(key, cb(fa).callback)
fa.wait()
+ c.valueForKey(key, cb(fb).callback)
fb.wait()
+ d.valueForKey(key, cb(fc).callback)
fc.wait()
+def test_one(host, port, db='/tmp/test'):
+ import thread
+ k = Khashmir(host, port, db)
+ thread.start_new_thread(k.app.run, ())
+ return k
+
if __name__ == "__main__":
- l = test_build_net()
+ import sys
+ n = 8
+ if len(sys.argv) > 1:
+ n = int(sys.argv[1])
+ l = test_build_net(peers=n)
time.sleep(3)
print "finding nodes..."
- test_find_nodes(l)
- test_find_nodes(l)
- test_find_nodes(l)
+ for i in range(10):
+ test_find_nodes(l)
print "inserting and fetching values..."
- test_find_value(l)
- test_find_value(l)
- test_find_value(l)
- test_find_value(l)
- test_find_value(l)
- test_find_value(l)
+ for i in range(10):
+ test_find_value(l)