X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=khashmir.py;h=ca6c3d0d1d5cd7c15e0b56f84b127615503d308b;hb=2f1a76beca50cc9600df404c927753b11f94c87e;hp=20cc062f0b9a2128ba1d358634666510329e7295;hpb=e04df494d939e3bb644f788a02123ec05f4c8df4;p=quix0rs-apt-p2p.git diff --git a/khashmir.py b/khashmir.py index 20cc062..ca6c3d0 100644 --- a/khashmir.py +++ b/khashmir.py @@ -1,6 +1,8 @@ ## Copyright 2002 Andrew Loewenstern, All Rights Reserved from const import reactor +import const + import time from pickle import loads, dumps from sha import sha @@ -21,10 +23,7 @@ threadable.init() from bsddb3 import db ## find this at http://pybsddb.sf.net/ from bsddb3._db import DBNotFoundError -from base64 import decodestring as decode - -# don't ping unless it's been at least this many seconds since we've heard from a peer -MAX_PING_INTERVAL = 60 * 15 # fifteen minutes +from xmlrpclib import Binary @@ -92,14 +91,16 @@ class Khashmir(xmlrpc.XMLRPC): def valueForKey(self, key, callback): """ returns the values found for key in global table """ nodes = self.table.findNodes(key) - # decode values, they will be base64 encoded - def cbwrap(values, cb=callback): - values = map(lambda x: decode(x), values) - callback(values) - # create our search state - state = GetValue(self, key, cbwrap) - reactor.callFromThread(state.goWithNodes, nodes) + # get locals + l = self.retrieveValues(key) + if len(l) > 0: + reactor.callFromThread(callback, l) + + # create our search state + state = GetValue(self, key, callback) + reactor.callFromThread(state.goWithNodes, nodes, l) + ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now @@ -109,21 +110,27 @@ class Khashmir(xmlrpc.XMLRPC): values are stored in peers on a first-come first-served basis this will probably change so more than one value can be stored under a key """ - def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"): + def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): if not callback: # default callback - this will get called for each successful store value def _storedValueHandler(sender): pass response=_storedValueHandler + for node in nodes: + def cb(t, table = table, node=node, resp=response): + self.table.insertNode(node) + response(t) if node.id != self.node.id: + def default(err, node=node, table=table): + table.nodeFailed(node) df = node.storeValue(key, value, self.node.senderDict()) - df.addCallbacks(response, default) + df.addCallback(cb) # this call is asynch self.findNode(key, _storeValueForKey) - def insertNode(self, n): + def insertNode(self, n, contacted=1): """ insert a node in our local table, pinging oldest contact in bucket, if necessary @@ -132,8 +139,8 @@ class Khashmir(xmlrpc.XMLRPC): a node into the table without it's peer-ID. That means of course the node passed into this method needs to be a properly formed Node object with a valid ID. """ - old = self.table.insertNode(n) - if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id: + old = self.table.insertNode(n, contacted=contacted) + if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: # the bucket is full, check to see if old node is still around and if so, replace it ## these are the callbacks used when we ping the oldest node in a bucket @@ -142,9 +149,12 @@ class Khashmir(xmlrpc.XMLRPC): self.table.replaceStaleNode(old, newnode) def _notStaleNodeHandler(sender, old=old): - """ called when we get a ping from the remote node """ - if sender['id'] == old.id: - self.table.insertNode(old) + """ called when we get a pong from the old node """ + sender, conn = sender + sender['host'] = conn['host'] + sender = Node().initWithDict(sender) + if sender.id == old.id: + self.table.justSeenNode(old) df = old.ping(self.node.senderDict()) df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) @@ -157,19 +167,18 @@ class Khashmir(xmlrpc.XMLRPC): df = node.ping(self.node.senderDict()) ## these are the callbacks we use when we issue a PING def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table): - if id != 20 * ' ' and id != sender['id']: + sender = sender[0] + if id != 20 * ' ' and id != sender['id'].data: # whoah, got response from different peer than we were expecting pass else: - #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port) sender['host'] = host sender['port'] = port n = Node().initWithDict(sender) table.insertNode(n) return - def _defaultPong(err): - # this should probably increment a failed message counter and dump the node if it gets over a threshold - return + def _defaultPong(err, node=node, table=self.table): + table.nodeFailed(node) df.addCallbacks(_pongHandler,_defaultPong) @@ -198,6 +207,19 @@ class Khashmir(xmlrpc.XMLRPC): self.findNode(id, callback) + def retrieveValues(self, key): + if self.kw.has_key(key): + c = self.kw.cursor() + tup = c.set(key) + l = [] + while(tup and tup[0] == key): + h1 = tup[1] + v = loads(self.store[h1])[1] + l.append(v) + tup = c.next() + return l + return [] + ##### ##### INCOMING MESSAGE HANDLERS @@ -209,24 +231,24 @@ class Khashmir(xmlrpc.XMLRPC): ip = self.crequest.getClientIP() sender['host'] = ip n = Node().initWithDict(sender) - self.insertNode(n) + self.insertNode(n, contacted=0) return self.node.senderDict() def xmlrpc_find_node(self, target, sender): - nodes = self.table.findNodes(target) + nodes = self.table.findNodes(target.data) nodes = map(lambda node: node.senderDict(), nodes) ip = self.crequest.getClientIP() sender['host'] = ip n = Node().initWithDict(sender) - self.insertNode(n) + self.insertNode(n, contacted=0) return nodes, self.node.senderDict() def xmlrpc_store_value(self, key, value, sender): - key = decode(key) - h1 = sha(key+value).digest() + key = key.data + h1 = sha(key+value.data).digest() t = `time.time()` if not self.store.has_key(h1): - v = dumps((key, value, t)) + v = dumps((key, value.data, t)) self.store.put(h1, v) self.itime.put(t, h1) self.kw.put(key, h1) @@ -239,25 +261,19 @@ class Khashmir(xmlrpc.XMLRPC): ip = self.crequest.getClientIP() sender['host'] = ip n = Node().initWithDict(sender) - self.insertNode(n) + self.insertNode(n, contacted=0) return self.node.senderDict() def xmlrpc_find_value(self, key, sender): ip = self.crequest.getClientIP() - key = decode(key) + key = key.data sender['host'] = ip n = Node().initWithDict(sender) - self.insertNode(n) + self.insertNode(n, contacted=0) - if self.kw.has_key(key): - c = self.kw.cursor() - tup = c.set(key) - l = [] - while(tup): - h1 = tup[1] - v = loads(self.store[h1])[1] - l.append(v) - tup = c.next() + l = self.retrieveValues(key) + if len(l) > 0: + l = map(lambda v: Binary(v), l) return {'values' : l}, self.node.senderDict() else: nodes = self.table.findNodes(key) @@ -369,7 +385,7 @@ def test_find_value(l, quiet=0): try: if(len(values) == 0): if not self.found: - print "find FAILED" + print "find NOT FOUND" else: print "find FOUND" sys.stdout.flush() @@ -387,14 +403,18 @@ def test_find_value(l, quiet=0): d.valueForKey(key, cb(fc).callback) fc.wait() -def test_one(port): +def test_one(host, port): import thread - k = Khashmir('localhost', port) + k = Khashmir(host, port) thread.start_new_thread(k.app.run, ()) return k if __name__ == "__main__": - l = test_build_net() + import sys + n = 8 + if len(sys.argv) > 1: + n = int(sys.argv[1]) + l = test_build_net(peers=n) time.sleep(3) print "finding nodes..." for i in range(10):