From 75febfbd4cda5f4953f5b93d355444671e853551 Mon Sep 17 00:00:00 2001 From: burris Date: Tue, 3 Sep 2002 08:23:56 +0000 Subject: [PATCH] now we gracefully deal with multiple values per key valueForKey callback is fired repeatedly with new results until it returns an empty list --- khashmir.py | 105 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 75 insertions(+), 30 deletions(-) diff --git a/khashmir.py b/khashmir.py index c29b6e7..2b716e6 100644 --- a/khashmir.py +++ b/khashmir.py @@ -2,6 +2,8 @@ from const import reactor import time +from pickle import loads, dumps +from sha import sha from ktable import KTable, K from knode import KNode as Node @@ -26,15 +28,28 @@ MAX_PING_INTERVAL = 60 * 15 # fifteen minutes # this is the main class! class Khashmir(xmlrpc.XMLRPC): - __slots__ = ['listener', 'node', 'table', 'store', 'app'] + __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app'] def __init__(self, host, port): self.node = Node(newID(), host, port) self.table = KTable(self.node) self.app = Application("xmlrpc") self.app.listenTCP(port, server.Site(self)) + + ## these databases may be more suited to on-disk rather than in-memory + # h((key, value)) -> (key, value, time) mappings self.store = db.DB() self.store.open(None, None, db.DB_BTREE) + # -> h((key, value)) + self.itime = db.DB() + self.itime.set_flags(db.DB_DUP) + self.itime.open(None, None, db.DB_BTREE) + + # key -> h((key, value)) + self.kw = db.DB() + self.kw.set_flags(db.DB_DUP) + self.kw.open(None, None, db.DB_BTREE) + def render(self, request): """ @@ -118,7 +133,7 @@ class Khashmir(xmlrpc.XMLRPC): if sender['id'] == old.id: self.table.insertNode(old) - df = old.ping() + df = old.ping(self.node.senderDict()) df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler) @@ -190,8 +205,19 @@ class Khashmir(xmlrpc.XMLRPC): return nodes, self.node.senderDict() def xmlrpc_store_value(self, key, value, sender): - if not self.store.has_key(key): - self.store.put(key, value) + h1 = sha(key+value).digest() + t = `time.time()` + if not self.store.has_key(h1): + v = dumps((key, value, t)) + self.store.put(h1, v) + self.itime.put(t, h1) + self.kw.put(key, h1) + else: + # update last insert time + tup = loads(self.store[h1]) + self.store[h1] = dumps((tup[0], tup[1], t)) + self.itime.put(t, h1) + ip = self.crequest.getClientIP() n = Node(sender['id'], ip, sender['port']) self.insertNode(n) @@ -201,8 +227,16 @@ class Khashmir(xmlrpc.XMLRPC): ip = self.crequest.getClientIP() n = Node(sender['id'], ip, sender['port']) self.insertNode(n) - if self.store.has_key(key): - return {'values' : self.store[key]}, self.node.senderDict() + if self.kw.has_key(key): + c = self.kw.cursor() + tup = c.set_range(key) + l = [] + while(tup): + h1 = tup[1] + v = loads(self.store[h1])[1] + l.append(v) + tup = c.next() + return {'values' : l}, self.node.senderDict() else: nodes = self.table.findNodes(key) nodes = map(lambda node: node.senderDict(), nodes) @@ -221,7 +255,7 @@ class Khashmir(xmlrpc.XMLRPC): #------ testing -def test_build_net(quiet=0, peers=256, pause=1): +def test_build_net(quiet=0, peers=64, pause=1): from whrandom import randrange import thread port = 2001 @@ -253,13 +287,15 @@ def test_build_net(quiet=0, peers=256, pause=1): if pause: time.sleep(.30) - time.sleep(5) + time.sleep(1) print "finding close nodes...." for peer in l: peer.findCloseNodes() if pause: - time.sleep(1) + time.sleep(.5) + if pause: + time.sleep(10) # for peer in l: # peer.refreshTable() return l @@ -274,7 +310,7 @@ def test_find_nodes(l, quiet=0): a = l[randrange(0,n)] b = l[randrange(0,n)] - def callback(nodes, l=l, flag=flag): + def callback(nodes, flag=flag): if (len(nodes) >0) and (nodes[0].id == b.node.id): print "test_find_nodes PASSED" else: @@ -301,43 +337,52 @@ def test_find_value(l, quiet=0): key = sha(`randrange(0,100000)`).digest() value = sha(`randrange(0,100000)`).digest() if not quiet: - print "inserting value...", + print "inserting value..." sys.stdout.flush() a.storeValueForKey(key, value) time.sleep(3) print "finding..." + sys.stdout.flush() - def mc(flag, value=value): - def callback(values, f=flag, val=value): + class cb: + def __init__(self, flag, value=value): + self.flag = flag + self.val = value + self.found = 0 + def callback(self, values): try: if(len(values) == 0): - print "find FAILED" - else: - if values != val: + if not self.found: print "find FAILED" else: print "find FOUND" + sys.stdout.flush() + + else: + if self.val in values: + self.found = 1 finally: - f.set() - return callback - b.valueForKey(key, mc(fa)) + self.flag.set() + + b.valueForKey(key, cb(fa).callback) fa.wait() - c.valueForKey(key, mc(fb)) + c.valueForKey(key, cb(fb).callback) fb.wait() - d.valueForKey(key, mc(fc)) + d.valueForKey(key, cb(fc).callback) fc.wait() +def test_one(port): + import thread + k = Khashmir('localhost', port) + thread.start_new_thread(k.app.run, ()) + return k + if __name__ == "__main__": l = test_build_net() time.sleep(3) print "finding nodes..." - test_find_nodes(l) - test_find_nodes(l) - test_find_nodes(l) + for i in range(10): + test_find_nodes(l) print "inserting and fetching values..." - test_find_value(l) - test_find_value(l) - test_find_value(l) - test_find_value(l) - test_find_value(l) - test_find_value(l) + for i in range(10): + test_find_value(l) -- 2.39.5