X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=khashmir.py;h=8321cf7ce08d3d83a22c92c2e343871e2176ec6b;hb=975ed9c978aedbf58356b4216388fda9bacbfece;hp=9e5a26cc17e7c13e592081e00bdf5fa26f8942a9;hpb=349bb1f96bc7053e28739639eced3f5a784fc87e;p=quix0rs-apt-p2p.git diff --git a/khashmir.py b/khashmir.py index 9e5a26c..8321cf7 100644 --- a/khashmir.py +++ b/khashmir.py @@ -21,6 +21,8 @@ threadable.init() from bsddb3 import db ## find this at http://pybsddb.sf.net/ from bsddb3._db import DBNotFoundError +from xmlrpclib import Binary + # don't ping unless it's been at least this many seconds since we've heard from a peer MAX_PING_INTERVAL = 60 * 15 # fifteen minutes @@ -30,7 +32,7 @@ MAX_PING_INTERVAL = 60 * 15 # fifteen minutes class Khashmir(xmlrpc.XMLRPC): __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app'] def __init__(self, host, port): - self.node = Node(newID(), host, port) + self.node = Node().init(newID(), host, port) self.table = KTable(self.node) self.app = Application("xmlrpc") self.app.listenTCP(port, server.Site(self)) @@ -67,7 +69,7 @@ class Khashmir(xmlrpc.XMLRPC): """ ping this node and add the contact info to the table on pong! """ - n =Node(" "*20, host, port) # note, we + n =Node().init(" "*20, host, port) # note, we self.sendPing(n) @@ -90,6 +92,7 @@ class Khashmir(xmlrpc.XMLRPC): def valueForKey(self, key, callback): """ returns the values found for key in global table """ nodes = self.table.findNodes(key) + # decode values, they will be base64 encoded # create our search state state = GetValue(self, key, callback) reactor.callFromThread(state.goWithNodes, nodes) @@ -137,7 +140,8 @@ class Khashmir(xmlrpc.XMLRPC): def _notStaleNodeHandler(sender, old=old): """ called when we get a ping from the remote node """ - if sender['id'] == old.id: + sender = Node().initWithSenderDict(sender) + if sender.id == old.id: self.table.insertNode(old) df = old.ping(self.node.senderDict()) @@ -151,12 +155,15 @@ class Khashmir(xmlrpc.XMLRPC): df = node.ping(self.node.senderDict()) ## these are the callbacks we use when we issue a PING def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table): - if id != 20 * ' ' and id != sender['id']: + sender = Node().initWithSenderDict(sender) + if id != 20 * ' ' and id != sender.id: # whoah, got response from different peer than we were expecting pass else: #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port) - n = Node(sender['id'], host, port) + sender['host'] = host + sender['port'] = port + n = Node().initWithDict(sender) table.insertNode(n) return def _defaultPong(err): @@ -199,7 +206,8 @@ class Khashmir(xmlrpc.XMLRPC): returns sender dict """ ip = self.crequest.getClientIP() - n = Node(sender['id'], ip, sender['port']) + sender['host'] = ip + n = Node().initWithDict(sender) self.insertNode(n) return self.node.senderDict() @@ -207,15 +215,17 @@ class Khashmir(xmlrpc.XMLRPC): nodes = self.table.findNodes(target) nodes = map(lambda node: node.senderDict(), nodes) ip = self.crequest.getClientIP() - n = Node(sender['id'], ip, sender['port']) + sender['host'] = ip + n = Node().initWithDict(sender) self.insertNode(n) return nodes, self.node.senderDict() def xmlrpc_store_value(self, key, value, sender): - h1 = sha(key+value).digest() + key = key.data + h1 = sha(key+value.data).digest() t = `time.time()` if not self.store.has_key(h1): - v = dumps((key, value, t)) + v = dumps((key, value.data, t)) self.store.put(h1, v) self.itime.put(t, h1) self.kw.put(key, h1) @@ -226,14 +236,18 @@ class Khashmir(xmlrpc.XMLRPC): self.itime.put(t, h1) ip = self.crequest.getClientIP() - n = Node(sender['id'], ip, sender['port']) + sender['host'] = ip + n = Node().initWithDict(sender) self.insertNode(n) return self.node.senderDict() def xmlrpc_find_value(self, key, sender): ip = self.crequest.getClientIP() - n = Node(sender['id'], ip, sender['port']) + key = key.data + sender['host'] = ip + n = Node().initWithDict(sender) self.insertNode(n) + if self.kw.has_key(key): c = self.kw.cursor() tup = c.set(key) @@ -243,6 +257,7 @@ class Khashmir(xmlrpc.XMLRPC): v = loads(self.store[h1])[1] l.append(v) tup = c.next() + l = map(lambda v: Binary(v), l) return {'values' : l}, self.node.senderDict() else: nodes = self.table.findNodes(key) @@ -255,7 +270,7 @@ class Khashmir(xmlrpc.XMLRPC): #------ testing -def test_build_net(quiet=0, peers=8, pause=1): +def test_build_net(quiet=0, peers=24, host='localhost', pause=1): from whrandom import randrange import thread port = 2001 @@ -265,7 +280,7 @@ def test_build_net(quiet=0, peers=8, pause=1): print "Building %s peer table." % peers for i in xrange(peers): - a = Khashmir('localhost', port + i) + a = Khashmir(host, port + i) l.append(a) @@ -279,11 +294,11 @@ def test_build_net(quiet=0, peers=8, pause=1): for peer in l[1:]: n = l[randrange(0, len(l))].node - peer.addContact(n.host, n.port) + peer.addContact(host, n.port) n = l[randrange(0, len(l))].node - peer.addContact(n.host, n.port) + peer.addContact(host, n.port) n = l[randrange(0, len(l))].node - peer.addContact(n.host, n.port) + peer.addContact(host, n.port) if pause: time.sleep(.30) @@ -310,8 +325,8 @@ def test_find_nodes(l, quiet=0): a = l[randrange(0,n)] b = l[randrange(0,n)] - def callback(nodes, flag=flag): - if (len(nodes) >0) and (nodes[0].id == b.node.id): + def callback(nodes, flag=flag, id = b.node.id): + if (len(nodes) >0) and (nodes[0].id == id): print "test_find_nodes PASSED" else: print "test_find_nodes FAILED"