X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=khashmir.py;h=2b716e62937d7bf65a68ba0bea90ddba6fae95b0;hb=e0d99ef1d31cef0cf5aec945030bb348ad1ab3d7;hp=748ce2196b8b93d3e792f1875cb3a4771d1b1275;hpb=eab6f7ede51301780bcc43f0ee17c717538edd16;p=quix0rs-apt-p2p.git diff --git a/khashmir.py b/khashmir.py index 748ce21..2b716e6 100644 --- a/khashmir.py +++ b/khashmir.py @@ -2,6 +2,8 @@ from const import reactor import time +from pickle import loads, dumps +from sha import sha from ktable import KTable, K from knode import KNode as Node @@ -12,6 +14,8 @@ from actions import FindNode, GetValue from twisted.web import xmlrpc from twisted.internet.defer import Deferred from twisted.python import threadable +from twisted.internet.app import Application +from twisted.web import server threadable.init() from bsddb3 import db ## find this at http://pybsddb.sf.net/ @@ -24,17 +28,28 @@ MAX_PING_INTERVAL = 60 * 15 # fifteen minutes # this is the main class! class Khashmir(xmlrpc.XMLRPC): - __slots__ = ['listener', 'node', 'table', 'store', 'app'] + __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app'] def __init__(self, host, port): self.node = Node(newID(), host, port) self.table = KTable(self.node) - from twisted.internet.app import Application - from twisted.web import server self.app = Application("xmlrpc") self.app.listenTCP(port, server.Site(self)) + + ## these databases may be more suited to on-disk rather than in-memory + # h((key, value)) -> (key, value, time) mappings self.store = db.DB() self.store.open(None, None, db.DB_BTREE) + # -> h((key, value)) + self.itime = db.DB() + self.itime.set_flags(db.DB_DUP) + self.itime.open(None, None, db.DB_BTREE) + + # key -> h((key, value)) + self.kw = db.DB() + self.kw.set_flags(db.DB_DUP) + self.kw.open(None, None, db.DB_BTREE) + def render(self, request): """ @@ -118,7 +133,7 @@ class Khashmir(xmlrpc.XMLRPC): if sender['id'] == old.id: self.table.insertNode(old) - df = old.ping() + df = old.ping(self.node.senderDict()) df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler) @@ -190,8 +205,19 @@ class Khashmir(xmlrpc.XMLRPC): return nodes, self.node.senderDict() def xmlrpc_store_value(self, key, value, sender): - if not self.store.has_key(key): - self.store.put(key, value) + h1 = sha(key+value).digest() + t = `time.time()` + if not self.store.has_key(h1): + v = dumps((key, value, t)) + self.store.put(h1, v) + self.itime.put(t, h1) + self.kw.put(key, h1) + else: + # update last insert time + tup = loads(self.store[h1]) + self.store[h1] = dumps((tup[0], tup[1], t)) + self.itime.put(t, h1) + ip = self.crequest.getClientIP() n = Node(sender['id'], ip, sender['port']) self.insertNode(n) @@ -201,10 +227,18 @@ class Khashmir(xmlrpc.XMLRPC): ip = self.crequest.getClientIP() n = Node(sender['id'], ip, sender['port']) self.insertNode(n) - if self.store.has_key(key): - return {'values' : self.store[key]}, self.node.senderDict() + if self.kw.has_key(key): + c = self.kw.cursor() + tup = c.set_range(key) + l = [] + while(tup): + h1 = tup[1] + v = loads(self.store[h1])[1] + l.append(v) + tup = c.next() + return {'values' : l}, self.node.senderDict() else: - nodes = self.table.findNodes(msg['key']) + nodes = self.table.findNodes(key) nodes = map(lambda node: node.senderDict(), nodes) return {'nodes' : nodes}, self.node.senderDict() @@ -215,15 +249,18 @@ class Khashmir(xmlrpc.XMLRPC): pass -#------ -def test_build_net(quiet=0): + + + +#------ testing + +def test_build_net(quiet=0, peers=64, pause=1): from whrandom import randrange import thread port = 2001 l = [] - peers = 16 - + if not quiet: print "Building %s peer table." % peers @@ -231,18 +268,15 @@ def test_build_net(quiet=0): a = Khashmir('localhost', port + i) l.append(a) - def run(l=l): - while(1): - events = 0 - for peer in l: - events = events + peer.dispatcher.runOnce() - if events == 0: - time.sleep(.25) thread.start_new_thread(l[0].app.run, ()) + time.sleep(1) for peer in l[1:]: peer.app.run() - + #time.sleep(.25) + + print "adding contacts...." + for peer in l[1:]: n = l[randrange(0, len(l))].node peer.addContact(n.host, n.port) @@ -250,14 +284,20 @@ def test_build_net(quiet=0): peer.addContact(n.host, n.port) n = l[randrange(0, len(l))].node peer.addContact(n.host, n.port) - - time.sleep(5) + if pause: + time.sleep(.30) + + time.sleep(1) + print "finding close nodes...." for peer in l: peer.findCloseNodes() - time.sleep(5) - for peer in l: - peer.refreshTable() + if pause: + time.sleep(.5) + if pause: + time.sleep(10) +# for peer in l: +# peer.refreshTable() return l def test_find_nodes(l, quiet=0): @@ -270,7 +310,7 @@ def test_find_nodes(l, quiet=0): a = l[randrange(0,n)] b = l[randrange(0,n)] - def callback(nodes, l=l, flag=flag): + def callback(nodes, flag=flag): if (len(nodes) >0) and (nodes[0].id == b.node.id): print "test_find_nodes PASSED" else: @@ -297,44 +337,52 @@ def test_find_value(l, quiet=0): key = sha(`randrange(0,100000)`).digest() value = sha(`randrange(0,100000)`).digest() if not quiet: - print "inserting value...", + print "inserting value..." sys.stdout.flush() a.storeValueForKey(key, value) time.sleep(3) print "finding..." + sys.stdout.flush() - def mc(flag, value=value): - def callback(values, f=flag, val=value): + class cb: + def __init__(self, flag, value=value): + self.flag = flag + self.val = value + self.found = 0 + def callback(self, values): try: if(len(values) == 0): - print "find FAILED" - else: - if values[0]['value'] != val: + if not self.found: print "find FAILED" else: print "find FOUND" + sys.stdout.flush() + + else: + if self.val in values: + self.found = 1 finally: - f.set() - return callback - b.valueForKey(key, mc(fa)) - c.valueForKey(key, mc(fb)) - d.valueForKey(key, mc(fc)) - + self.flag.set() + + b.valueForKey(key, cb(fa).callback) fa.wait() + c.valueForKey(key, cb(fb).callback) fb.wait() + d.valueForKey(key, cb(fc).callback) fc.wait() +def test_one(port): + import thread + k = Khashmir('localhost', port) + thread.start_new_thread(k.app.run, ()) + return k + if __name__ == "__main__": l = test_build_net() time.sleep(3) print "finding nodes..." - test_find_nodes(l) - test_find_nodes(l) - test_find_nodes(l) + for i in range(10): + test_find_nodes(l) print "inserting and fetching values..." - test_find_value(l) - test_find_value(l) - test_find_value(l) - test_find_value(l) - test_find_value(l) - test_find_value(l) + for i in range(10): + test_find_value(l)