def setup(self, host, port, db='khashmir.db'):
self._findDB(db)
+ self.port = port
self.node = self._loadSelfNode(host, port)
self.table = KTable(self.node)
self.app = Application("krpc")
## also async
- def valueForKey(self, key, callback):
+ def valueForKey(self, key, callback, searchlocal = 1):
""" returns the values found for key in global table
callback will be called with a list of values for each peer that returns unique values
final callback will be an empty list - probably should change to 'more coming' arg
nodes = self.table.findNodes(key)
# get locals
- l = self.retrieveValues(key)
+ if searchlocal:
+ l = self.retrieveValues(key)
+ if len(l) > 0:
+ reactor.callLater(0, callback, (l))
+ else:
+ l = []
# create our search state
state = GetValue(self, key, callback)
def _notStaleNodeHandler(dict, old=old):
""" called when we get a pong from the old node """
+ _krpc_sender = dict['_krpc_sender']
+ dict = dict['rsp']
sender = dict['sender']
if sender['id'] == old.id:
self.table.justSeenNode(old.id)
df = node.ping(self.node.senderDict())
## these are the callbacks we use when we issue a PING
def _pongHandler(dict, node=node, table=self.table, callback=callback):
+ _krpc_sender = dict['_krpc_sender']
+ dict = dict['rsp']
sender = dict['sender']
if node.id != const.NULL_ID and node.id != sender['id']:
# whoah, got response from different peer than we were expecting
returns sender dict
"""
sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
n.conn = self.airhook.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
nodes = self.table.findNodes(target)
nodes = map(lambda node: node.senderDict(), nodes)
sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
n.conn = self.airhook.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
c.execute(s)
sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
n.conn = self.airhook.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
def krpc_find_value(self, key, sender, _krpc_sender):
sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
n.conn = self.airhook.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
flag.wait()
def test_find_value(l, quiet=0):
-
+ ff = threading.Event()
fa = threading.Event()
fb = threading.Event()
fc = threading.Event()
key = newID()
value = newID()
if not quiet: print "inserting value..."
- a.storeValueForKey(key, value)
- time.sleep(3)
+ def acb(p, f=ff):
+ f.set()
+ a.storeValueForKey(key, value, acb)
+ ff.wait()
+
if not quiet:
print "finding..."
class cb:
- def __init__(self, flag, value=value):
+ def __init__(self, flag, value=value, port=None):
self.flag = flag
self.val = value
self.found = 0
+ self.port = port
def callback(self, values):
try:
if(len(values) == 0):
if not self.found:
- print "find NOT FOUND"
+ print "find %s NOT FOUND" % self.port
else:
- print "find FOUND"
+ print "find %s FOUND" % self.port
else:
if self.val in values:
self.found = 1
finally:
self.flag.set()
- b.valueForKey(key, cb(fa).callback)
+ b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0)
fa.wait()
- c.valueForKey(key, cb(fb).callback)
+ c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0)
fb.wait()
- d.valueForKey(key, cb(fc).callback)
+ d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0)
fc.wait()
def test_one(host, port, db='/tmp/test'):
l = test_build_net(peers=n)
time.sleep(3)
print "finding nodes..."
- for i in range(10):
+ for i in range(n):
test_find_nodes(l)
print "inserting and fetching values..."
for i in range(10):