From: burris Date: Fri, 13 Sep 2002 19:31:21 +0000 (+0000) Subject: updated for base64 encoding of hashes and values X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=e04df494d939e3bb644f788a02123ec05f4c8df4;p=quix0rs-apt-p2p.git updated for base64 encoding of hashes and values --- diff --git a/actions.py b/actions.py index c3cea32..5825294 100644 --- a/actions.py +++ b/actions.py @@ -46,14 +46,15 @@ class FindNode(ActionBase): """ find node action merits it's own class as it is a long running stateful process """ def handleGotNodes(self, args): l, sender = args - if self.finished or self.answered.has_key(sender['id']): + sender = Node().initWithDict(sender) + if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short return self.outstanding = self.outstanding - 1 - self.answered[sender['id']] = 1 + self.answered[sender.id] = 1 for node in l: - if not self.found.has_key(node['id']): - n = Node(node['id'], node['host'], node['port']) + n = Node().initWithDict(node) + if not self.found.has_key(n.id): self.found[n.id] = n self.table.insertNode(n) self.schedule() @@ -115,17 +116,18 @@ class GetValue(FindNode): """ get value task """ def handleGotNodes(self, args): l, sender = args - if self.finished or self.answered.has_key(sender['id']): + sender = Node().initWithDict(sender) + if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short return self.outstanding = self.outstanding - 1 - self.answered[sender['id']] = 1 + self.answered[sender.id] = 1 # go through nodes # if we have any closer than what we already got, query them if l.has_key('nodes'): for node in l['nodes']: - if not self.found.has_key(node['id']): - n = Node(node['id'], node['host'], node['port']) + n = Node().initWithDict(node) + if not self.found.has_key(n.id): self.found[n.id] = n self.table.insertNode(n) elif l.has_key('values'): diff --git a/khashmir.py b/khashmir.py index 9e5a26c..20cc062 100644 --- a/khashmir.py +++ b/khashmir.py @@ -21,6 +21,8 @@ threadable.init() from bsddb3 import db ## find this at http://pybsddb.sf.net/ from bsddb3._db import DBNotFoundError +from base64 import decodestring as decode + # don't ping unless it's been at least this many seconds since we've heard from a peer MAX_PING_INTERVAL = 60 * 15 # fifteen minutes @@ -30,7 +32,7 @@ MAX_PING_INTERVAL = 60 * 15 # fifteen minutes class Khashmir(xmlrpc.XMLRPC): __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app'] def __init__(self, host, port): - self.node = Node(newID(), host, port) + self.node = Node().init(newID(), host, port) self.table = KTable(self.node) self.app = Application("xmlrpc") self.app.listenTCP(port, server.Site(self)) @@ -67,7 +69,7 @@ class Khashmir(xmlrpc.XMLRPC): """ ping this node and add the contact info to the table on pong! """ - n =Node(" "*20, host, port) # note, we + n =Node().init(" "*20, host, port) # note, we self.sendPing(n) @@ -90,8 +92,12 @@ class Khashmir(xmlrpc.XMLRPC): def valueForKey(self, key, callback): """ returns the values found for key in global table """ nodes = self.table.findNodes(key) + # decode values, they will be base64 encoded + def cbwrap(values, cb=callback): + values = map(lambda x: decode(x), values) + callback(values) # create our search state - state = GetValue(self, key, callback) + state = GetValue(self, key, cbwrap) reactor.callFromThread(state.goWithNodes, nodes) @@ -156,7 +162,9 @@ class Khashmir(xmlrpc.XMLRPC): pass else: #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port) - n = Node(sender['id'], host, port) + sender['host'] = host + sender['port'] = port + n = Node().initWithDict(sender) table.insertNode(n) return def _defaultPong(err): @@ -199,7 +207,8 @@ class Khashmir(xmlrpc.XMLRPC): returns sender dict """ ip = self.crequest.getClientIP() - n = Node(sender['id'], ip, sender['port']) + sender['host'] = ip + n = Node().initWithDict(sender) self.insertNode(n) return self.node.senderDict() @@ -207,11 +216,13 @@ class Khashmir(xmlrpc.XMLRPC): nodes = self.table.findNodes(target) nodes = map(lambda node: node.senderDict(), nodes) ip = self.crequest.getClientIP() - n = Node(sender['id'], ip, sender['port']) + sender['host'] = ip + n = Node().initWithDict(sender) self.insertNode(n) return nodes, self.node.senderDict() def xmlrpc_store_value(self, key, value, sender): + key = decode(key) h1 = sha(key+value).digest() t = `time.time()` if not self.store.has_key(h1): @@ -226,14 +237,18 @@ class Khashmir(xmlrpc.XMLRPC): self.itime.put(t, h1) ip = self.crequest.getClientIP() - n = Node(sender['id'], ip, sender['port']) + sender['host'] = ip + n = Node().initWithDict(sender) self.insertNode(n) return self.node.senderDict() def xmlrpc_find_value(self, key, sender): ip = self.crequest.getClientIP() - n = Node(sender['id'], ip, sender['port']) + key = decode(key) + sender['host'] = ip + n = Node().initWithDict(sender) self.insertNode(n) + if self.kw.has_key(key): c = self.kw.cursor() tup = c.set(key) @@ -255,7 +270,7 @@ class Khashmir(xmlrpc.XMLRPC): #------ testing -def test_build_net(quiet=0, peers=8, pause=1): +def test_build_net(quiet=0, peers=24, host='localhost', pause=1): from whrandom import randrange import thread port = 2001 @@ -265,7 +280,7 @@ def test_build_net(quiet=0, peers=8, pause=1): print "Building %s peer table." % peers for i in xrange(peers): - a = Khashmir('localhost', port + i) + a = Khashmir(host, port + i) l.append(a) @@ -279,11 +294,11 @@ def test_build_net(quiet=0, peers=8, pause=1): for peer in l[1:]: n = l[randrange(0, len(l))].node - peer.addContact(n.host, n.port) + peer.addContact(host, n.port) n = l[randrange(0, len(l))].node - peer.addContact(n.host, n.port) + peer.addContact(host, n.port) n = l[randrange(0, len(l))].node - peer.addContact(n.host, n.port) + peer.addContact(host, n.port) if pause: time.sleep(.30) @@ -310,8 +325,8 @@ def test_find_nodes(l, quiet=0): a = l[randrange(0,n)] b = l[randrange(0,n)] - def callback(nodes, flag=flag): - if (len(nodes) >0) and (nodes[0].id == b.node.id): + def callback(nodes, flag=flag, id = b.node.id): + if (len(nodes) >0) and (nodes[0].id == id): print "test_find_nodes PASSED" else: print "test_find_nodes FAILED" diff --git a/knode.py b/knode.py index aecf14e..52abbed 100644 --- a/knode.py +++ b/knode.py @@ -2,6 +2,7 @@ from node import Node from twisted.internet.defer import Deferred from xmlrpcclient import XMLRPCClientFactory as factory from const import reactor +from base64 import encodestring as encode class KNode(Node): def ping(self, sender): @@ -16,11 +17,11 @@ class KNode(Node): return df def storeValue(self, key, value, sender): df = Deferred() - f = factory('store_value', (key, value, sender), df.callback, df.errback) + f = factory('store_value', (encode(key), encode(value), sender), df.callback, df.errback) reactor.connectTCP(self.host, self.port, f) return df def findValue(self, key, sender): df = Deferred() - f = factory('find_value', (key, sender), df.callback, df.errback) + f = factory('find_value', (encode(key), sender), df.callback, df.errback) reactor.connectTCP(self.host, self.port, f) return df diff --git a/node.py b/node.py index 6bc2ef2..815f00c 100644 --- a/node.py +++ b/node.py @@ -1,21 +1,33 @@ import hash import time from types import * +from xmlrpclib import Binary class Node: """encapsulate contact info""" - def __init__(self, id, host, port): + def init(self, id, host, port): self.id = id self.int = hash.intify(id) self.host = host self.port = port self.lastSeen = time.time() + self._senderDict = {'id': Binary(self.id), 'port' : self.port, 'host' : self.host} + return self + + def initWithDict(self, dict): + self._senderDict = dict + self.id = dict['id'].data + self.int = hash.intify(self.id) + self.port = dict['port'] + self.host = dict['host'] + self.lastSeen = time.time() + return self def updateLastSeen(self): self.lastSeen = time.time() def senderDict(self): - return {'id': self.id, 'port' : self.port, 'host' : self.host} + return self._senderDict def __repr__(self): return `(self.id, self.host, self.port)` diff --git a/xmlrpcclient.py b/xmlrpcclient.py index 230f3ad..5c8f3cf 100644 --- a/xmlrpcclient.py +++ b/xmlrpcclient.py @@ -17,9 +17,13 @@ class XMLRPCClient(HTTPClient): self.transport.write('\r\n') def handleResponse(self, buf): - args, name = loads(buf) - apply(self.d.callback, args) - + try: + args, name = loads(buf) + except Exception, e: + print "response decode error: " + `e` + self.d.errback() + else: + apply(self.d.callback, args) class XMLRPCClientFactory(ClientFactory): def __init__(self, method, args, callback=None, errback=None):