From a6a2f7ed963e55f0b99926b84cdbdc7be5d3d47c Mon Sep 17 00:00:00 2001 From: burris Date: Thu, 30 Jan 2003 04:38:46 +0000 Subject: [PATCH] fix bug where we fail to return any values we have stored locally fixed test script to wait for store to finish before trying to find grab port from airhook connection info --- khashmir.py | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/khashmir.py b/khashmir.py index f3e410a..a738a97 100644 --- a/khashmir.py +++ b/khashmir.py @@ -39,6 +39,7 @@ class Khashmir(protocol.Factory): def setup(self, host, port, db='khashmir.db'): self._findDB(db) + self.port = port self.node = self._loadSelfNode(host, port) self.table = KTable(self.node) self.app = Application("krpc") @@ -160,7 +161,7 @@ class Khashmir(protocol.Factory): ## also async - def valueForKey(self, key, callback): + def valueForKey(self, key, callback, searchlocal = 1): """ returns the values found for key in global table callback will be called with a list of values for each peer that returns unique values final callback will be an empty list - probably should change to 'more coming' arg @@ -168,7 +169,12 @@ class Khashmir(protocol.Factory): nodes = self.table.findNodes(key) # get locals - l = self.retrieveValues(key) + if searchlocal: + l = self.retrieveValues(key) + if len(l) > 0: + reactor.callLater(0, callback, (l)) + else: + l = [] # create our search state state = GetValue(self, key, callback) @@ -213,6 +219,8 @@ class Khashmir(protocol.Factory): def _notStaleNodeHandler(dict, old=old): """ called when we get a pong from the old node """ + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] sender = dict['sender'] if sender['id'] == old.id: self.table.justSeenNode(old.id) @@ -227,6 +235,8 @@ class Khashmir(protocol.Factory): df = node.ping(self.node.senderDict()) ## these are the callbacks we use when we issue a PING def _pongHandler(dict, node=node, table=self.table, callback=callback): + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] sender = dict['sender'] if node.id != const.NULL_ID and node.id != sender['id']: # whoah, got response from different peer than we were expecting @@ -288,6 +298,7 @@ class Khashmir(protocol.Factory): returns sender dict """ sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) n.conn = self.airhook.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) @@ -297,6 +308,7 @@ class Khashmir(protocol.Factory): nodes = self.table.findNodes(target) nodes = map(lambda node: node.senderDict(), nodes) sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) n.conn = self.airhook.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) @@ -313,6 +325,7 @@ class Khashmir(protocol.Factory): s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64")) c.execute(s) sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) n.conn = self.airhook.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) @@ -320,6 +333,7 @@ class Khashmir(protocol.Factory): def krpc_find_value(self, key, sender, _krpc_sender): sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) n.conn = self.airhook.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) @@ -436,7 +450,7 @@ def test_find_nodes(l, quiet=0): flag.wait() def test_find_value(l, quiet=0): - + ff = threading.Event() fa = threading.Event() fb = threading.Event() fc = threading.Event() @@ -450,34 +464,38 @@ def test_find_value(l, quiet=0): key = newID() value = newID() if not quiet: print "inserting value..." - a.storeValueForKey(key, value) - time.sleep(3) + def acb(p, f=ff): + f.set() + a.storeValueForKey(key, value, acb) + ff.wait() + if not quiet: print "finding..." class cb: - def __init__(self, flag, value=value): + def __init__(self, flag, value=value, port=None): self.flag = flag self.val = value self.found = 0 + self.port = port def callback(self, values): try: if(len(values) == 0): if not self.found: - print "find NOT FOUND" + print "find %s NOT FOUND" % self.port else: - print "find FOUND" + print "find %s FOUND" % self.port else: if self.val in values: self.found = 1 finally: self.flag.set() - b.valueForKey(key, cb(fa).callback) + b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0) fa.wait() - c.valueForKey(key, cb(fb).callback) + c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0) fb.wait() - d.valueForKey(key, cb(fc).callback) + d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0) fc.wait() def test_one(host, port, db='/tmp/test'): @@ -493,7 +511,7 @@ if __name__ == "__main__": l = test_build_net(peers=n) time.sleep(3) print "finding nodes..." - for i in range(10): + for i in range(n): test_find_nodes(l) print "inserting and fetching values..." for i in range(10): -- 2.39.5