from const import reactor
import time
+from pickle import loads, dumps
+from sha import sha
from ktable import KTable, K
from knode import KNode as Node
# this is the main class!
class Khashmir(xmlrpc.XMLRPC):
- __slots__ = ['listener', 'node', 'table', 'store', 'app']
+ __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
def __init__(self, host, port):
self.node = Node(newID(), host, port)
self.table = KTable(self.node)
self.app = Application("xmlrpc")
self.app.listenTCP(port, server.Site(self))
+
+ ## these databases may be more suited to on-disk rather than in-memory
+ # h((key, value)) -> (key, value, time) mappings
self.store = db.DB()
self.store.open(None, None, db.DB_BTREE)
+ # <insert time> -> h((key, value))
+ self.itime = db.DB()
+ self.itime.set_flags(db.DB_DUP)
+ self.itime.open(None, None, db.DB_BTREE)
+
+ # key -> h((key, value))
+ self.kw = db.DB()
+ self.kw.set_flags(db.DB_DUP)
+ self.kw.open(None, None, db.DB_BTREE)
+
def render(self, request):
"""
if sender['id'] == old.id:
self.table.insertNode(old)
- df = old.ping()
+ df = old.ping(self.node.senderDict())
df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
return nodes, self.node.senderDict()
def xmlrpc_store_value(self, key, value, sender):
- if not self.store.has_key(key):
- self.store.put(key, value)
+ h1 = sha(key+value).digest()
+ t = `time.time()`
+ if not self.store.has_key(h1):
+ v = dumps((key, value, t))
+ self.store.put(h1, v)
+ self.itime.put(t, h1)
+ self.kw.put(key, h1)
+ else:
+ # update last insert time
+ tup = loads(self.store[h1])
+ self.store[h1] = dumps((tup[0], tup[1], t))
+ self.itime.put(t, h1)
+
ip = self.crequest.getClientIP()
n = Node(sender['id'], ip, sender['port'])
self.insertNode(n)
ip = self.crequest.getClientIP()
n = Node(sender['id'], ip, sender['port'])
self.insertNode(n)
- if self.store.has_key(key):
- return {'values' : self.store[key]}, self.node.senderDict()
+ if self.kw.has_key(key):
+ c = self.kw.cursor()
+ tup = c.set_range(key)
+ l = []
+ while(tup):
+ h1 = tup[1]
+ v = loads(self.store[h1])[1]
+ l.append(v)
+ tup = c.next()
+ return {'values' : l}, self.node.senderDict()
else:
nodes = self.table.findNodes(key)
nodes = map(lambda node: node.senderDict(), nodes)
#------ testing
-def test_build_net(quiet=0, peers=256, pause=1):
+def test_build_net(quiet=0, peers=64, pause=1):
from whrandom import randrange
import thread
port = 2001
if pause:
time.sleep(.30)
- time.sleep(5)
+ time.sleep(1)
print "finding close nodes...."
for peer in l:
peer.findCloseNodes()
if pause:
- time.sleep(1)
+ time.sleep(.5)
+ if pause:
+ time.sleep(10)
# for peer in l:
# peer.refreshTable()
return l
a = l[randrange(0,n)]
b = l[randrange(0,n)]
- def callback(nodes, l=l, flag=flag):
+ def callback(nodes, flag=flag):
if (len(nodes) >0) and (nodes[0].id == b.node.id):
print "test_find_nodes PASSED"
else:
key = sha(`randrange(0,100000)`).digest()
value = sha(`randrange(0,100000)`).digest()
if not quiet:
- print "inserting value...",
+ print "inserting value..."
sys.stdout.flush()
a.storeValueForKey(key, value)
time.sleep(3)
print "finding..."
+ sys.stdout.flush()
- def mc(flag, value=value):
- def callback(values, f=flag, val=value):
+ class cb:
+ def __init__(self, flag, value=value):
+ self.flag = flag
+ self.val = value
+ self.found = 0
+ def callback(self, values):
try:
if(len(values) == 0):
- print "find FAILED"
- else:
- if values != val:
+ if not self.found:
print "find FAILED"
else:
print "find FOUND"
+ sys.stdout.flush()
+
+ else:
+ if self.val in values:
+ self.found = 1
finally:
- f.set()
- return callback
- b.valueForKey(key, mc(fa))
+ self.flag.set()
+
+ b.valueForKey(key, cb(fa).callback)
fa.wait()
- c.valueForKey(key, mc(fb))
+ c.valueForKey(key, cb(fb).callback)
fb.wait()
- d.valueForKey(key, mc(fc))
+ d.valueForKey(key, cb(fc).callback)
fc.wait()
+def test_one(port):
+ import thread
+ k = Khashmir('localhost', port)
+ thread.start_new_thread(k.app.run, ())
+ return k
+
if __name__ == "__main__":
l = test_build_net()
time.sleep(3)
print "finding nodes..."
- test_find_nodes(l)
- test_find_nodes(l)
- test_find_nodes(l)
+ for i in range(10):
+ test_find_nodes(l)
print "inserting and fetching values..."
- test_find_value(l)
- test_find_value(l)
- test_find_value(l)
- test_find_value(l)
- test_find_value(l)
- test_find_value(l)
+ for i in range(10):
+ test_find_value(l)