X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=khashmir.py;fp=khashmir.py;h=e9e53a76ebd60a2f574f38c743e5dd95ced378f0;hp=0c0ed52467896e88ba2e77d6dcc26793ea828ad3;hb=9048402f56c24474c79920ab849748223ed339cf;hpb=ed8094830c2c2ab4785d10b3d562d609a42741f0 diff --git a/khashmir.py b/khashmir.py index 0c0ed52..e9e53a7 100644 --- a/khashmir.py +++ b/khashmir.py @@ -11,22 +11,20 @@ from sha import sha from ktable import KTable, K from knode import KNode as Node -from hash import newID, newIDInRange +from khash import newID, newIDInRange from actions import FindNode, GetValue, KeyExpirer, StoreValue import krpc -import airhook from twisted.internet.defer import Deferred from twisted.internet import protocol from twisted.python import threadable -from twisted.internet.app import Application +from twisted.application import service, internet from twisted.web import server threadable.init() import sys import sqlite ## find this at http://pysqlite.sourceforge.net/ -import pysqlite_exceptions class KhashmirDBExcept(Exception): pass @@ -34,7 +32,6 @@ class KhashmirDBExcept(Exception): # this is the main class! class Khashmir(protocol.Factory): __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol') - protocol = krpc.KRPC def __init__(self, host, port, db='khashmir.db'): self.setup(host, port, db) @@ -43,19 +40,24 @@ class Khashmir(protocol.Factory): self.port = port self.node = self._loadSelfNode(host, port) self.table = KTable(self.node) - self.app = Application("krpc") - self.airhook = airhook.listenAirhookStream(port, self) + self.app = service.Application("krpc") + self.udp = krpc.hostbroker(self) + self.udp.protocol = krpc.KRPC + self.listenport = reactor.listenUDP(port, self.udp) self.last = time.time() self._loadRoutingTable() KeyExpirer(store=self.store) #self.refreshTable(force=1) reactor.callLater(60, self.checkpoint, (1,)) + def __del__(self): + self.listenport.stopListening() + def _loadSelfNode(self, host, port): c = self.store.cursor() c.execute('select id from self where num = 0;') if c.rowcount > 0: - id = c.fetchone()[0].decode('hex') + id = c.fetchone()[0] else: id = newID() return Node().init(id, host, port) @@ -64,7 +66,7 @@ class Khashmir(protocol.Factory): self.store.autocommit = 0 c = self.store.cursor() c.execute('delete from self where num = 0;') - c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex')) + c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id)) self.store.commit() self.store.autocommit = 1 @@ -95,13 +97,13 @@ class Khashmir(protocol.Factory): self.store = sqlite.connect(db=db) self.store.autocommit = 1 s = """ - create table kv (key text, value text, time timestamp, primary key (key, value)); + create table kv (key binary, value binary, time timestamp, primary key (key, value)); create index kv_key on kv(key); create index kv_timestamp on kv(time); - create table nodes (id text primary key, host text, port number); + create table nodes (id binary primary key, host text, port number); - create table self (num number primary key, id text); + create table self (num number primary key, id binary); """ c = self.store.cursor() c.execute(s) @@ -116,7 +118,7 @@ class Khashmir(protocol.Factory): for bucket in self.table.buckets: for node in bucket.l: d = node.senderDict() - c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port'])) + c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(d['id']), d['host'], d['port'])) self.store.commit() self.store.autocommit = 1; @@ -128,8 +130,8 @@ class Khashmir(protocol.Factory): c = self.store.cursor() c.execute("select * from nodes;") for rec in c.fetchall(): - n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])}) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) + n.conn = self.udp.connectionForAddr((n.host, n.port)) self.table.insertNode(n, contacted=0) @@ -140,7 +142,7 @@ class Khashmir(protocol.Factory): ping this node and add the contact info to the table on pong! """ n =Node().init(const.NULL_ID, host, port) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.udp.connectionForAddr((n.host, n.port)) self.sendPing(n, callback=callback) ## this call is async! @@ -246,7 +248,7 @@ class Khashmir(protocol.Factory): sender['host'] = node.host sender['port'] = node.port n = Node().initWithDict(sender) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.udp.connectionForAddr((n.host, n.port)) table.insertNode(n) if callback: callback() @@ -280,13 +282,12 @@ class Khashmir(protocol.Factory): def retrieveValues(self, key): - s = "select value from kv where key = '%s';" % key.encode('hex') c = self.store.cursor() - c.execute(s) + c.execute("select value from kv where key = %s;", sqlite.encode(key)) t = c.fetchone() l = [] while t: - l.append(t['value'].decode('base64')) + l.append(t['value']) t = c.fetchone() return l @@ -301,7 +302,7 @@ class Khashmir(protocol.Factory): sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) return {"sender" : self.node.senderDict()} @@ -311,24 +312,22 @@ class Khashmir(protocol.Factory): sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) return {"nodes" : nodes, "sender" : self.node.senderDict()} def krpc_store_value(self, key, value, sender, _krpc_sender): t = "%0.6f" % time.time() - s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t) c = self.store.cursor() try: - c.execute(s) - except pysqlite_exceptions.IntegrityError, reason: + c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t)) + except sqlite.IntegrityError, reason: # update last insert time - s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64")) - c.execute(s) + c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value))) sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) return {"sender" : self.node.senderDict()} @@ -336,7 +335,7 @@ class Khashmir(protocol.Factory): sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) l = self.retrieveValues(key) @@ -347,171 +346,3 @@ class Khashmir(protocol.Factory): nodes = map(lambda node: node.senderDict(), nodes) return {'nodes' : nodes, "sender": self.node.senderDict()} -### TESTING ### -from random import randrange -import threading, thread, sys, time -from sha import sha -from hash import newID - - -def test_net(host='127.0.0.1', peers=24, startport=2001, dbprefix='/tmp/test'): - import thread - l = [] - for i in xrange(peers): - a = Khashmir(host, startport + i, db = dbprefix+`i`) - l.append(a) - thread.start_new_thread(l[0].app.run, ()) - for peer in l[1:]: - peer.app.run(installSignalHandlers=0) - return l - -def test_build_net(quiet=0, peers=24, host='127.0.0.1', pause=0, startport=2001, dbprefix='/tmp/test'): - from whrandom import randrange - import threading - import thread - import sys - port = startport - l = [] - if not quiet: - print "Building %s peer table." % peers - - for i in xrange(peers): - a = Khashmir(host, port + i, db = dbprefix +`i`) - l.append(a) - - - thread.start_new_thread(l[0].app.run, ()) - time.sleep(1) - for peer in l[1:]: - peer.app.run(installSignalHandlers=0) - #time.sleep(3) - - def spewer(frame, s, ignored): - from twisted.python import reflect - if frame.f_locals.has_key('self'): - se = frame.f_locals['self'] - print 'method %s of %s at %s' % ( - frame.f_code.co_name, reflect.qual(se.__class__), id(se) - ) - #sys.settrace(spewer) - - print "adding contacts...." - def makecb(flag): - def cb(f=flag): - f.set() - return cb - - for peer in l: - p = l[randrange(0, len(l))] - if p != peer: - n = p.node - flag = threading.Event() - peer.addContact(host, n.port, makecb(flag)) - flag.wait() - p = l[randrange(0, len(l))] - if p != peer: - n = p.node - flag = threading.Event() - peer.addContact(host, n.port, makecb(flag)) - flag.wait() - p = l[randrange(0, len(l))] - if p != peer: - n = p.node - flag = threading.Event() - peer.addContact(host, n.port, makecb(flag)) - flag.wait() - - print "finding close nodes...." - - for peer in l: - flag = threading.Event() - def cb(nodes, f=flag): - f.set() - peer.findCloseNodes(cb) - flag.wait() - # for peer in l: - # peer.refreshTable() - return l - -def test_find_nodes(l, quiet=0): - flag = threading.Event() - - n = len(l) - - a = l[randrange(0,n)] - b = l[randrange(0,n)] - - def callback(nodes, flag=flag, id = b.node.id): - if (len(nodes) >0) and (nodes[0].id == id): - print "test_find_nodes PASSED" - else: - print "test_find_nodes FAILED" - flag.set() - a.findNode(b.node.id, callback) - flag.wait() - -def test_find_value(l, quiet=0): - ff = threading.Event() - fa = threading.Event() - fb = threading.Event() - fc = threading.Event() - - n = len(l) - a = l[randrange(0,n)] - b = l[randrange(0,n)] - c = l[randrange(0,n)] - d = l[randrange(0,n)] - - key = newID() - value = newID() - if not quiet: print "inserting value..." - def acb(p, f=ff): - f.set() - a.storeValueForKey(key, value, acb) - ff.wait() - - if not quiet: - print "finding..." - - class cb: - def __init__(self, flag, value=value, port=None): - self.flag = flag - self.val = value - self.found = 0 - self.port = port - def callback(self, values): - if(len(values) == 0): - if not self.found: - print "find %s NOT FOUND" % self.port - else: - print "find %s FOUND" % self.port - self.flag.set() - else: - if self.val in values: - self.found = 1 - - b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0) - fa.wait() - c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0) - fb.wait() - d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0) - fc.wait() - -def test_one(host, port, db='/tmp/test'): - import thread - k = Khashmir(host, port, db) - thread.start_new_thread(reactor.run, ()) - return k - -if __name__ == "__main__": - import sys - n = 8 - if len(sys.argv) > 1: n = int(sys.argv[1]) - l = test_build_net(peers=n) - time.sleep(3) - print "finding nodes..." - for i in range(n): - test_find_nodes(l) - print "inserting and fetching values..." - for i in range(10): - test_find_value(l)