X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=khashmir.py;h=92619fdb2e2c4627042a2df1cef0f3e93d5a33e6;hb=3a8164f9c878f1c8af2958bc932ddfdb9f799a09;hp=48319132b6a848b38fe5407164119b21e37c78dd;hpb=4606f209e1086c85a3578baa18842327ec99e265;p=quix0rs-apt-p2p.git diff --git a/khashmir.py b/khashmir.py index 4831913..92619fd 100644 --- a/khashmir.py +++ b/khashmir.py @@ -1,4 +1,5 @@ -## Copyright 2002 Andrew Loewenstern, All Rights Reserved +## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information from const import reactor import const @@ -10,424 +11,335 @@ from sha import sha from ktable import KTable, K from knode import KNode as Node -from hash import newID, newIDInRange +from khash import newID, newIDInRange + +from actions import FindNode, GetValue, KeyExpirer, StoreValue +import krpc -from actions import FindNode, GetValue, KeyExpirer -from twisted.web import xmlrpc from twisted.internet.defer import Deferred +from twisted.internet import protocol from twisted.python import threadable -from twisted.internet.app import Application +from twisted.application import service, internet from twisted.web import server threadable.init() +import sys import sqlite ## find this at http://pysqlite.sourceforge.net/ -import pysqlite_exceptions -KhashmirDBExcept = "KhashmirDBExcept" +class KhashmirDBExcept(Exception): + pass # this is the main class! -class Khashmir(xmlrpc.XMLRPC): - __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last') +class Khashmir(protocol.Factory): + __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol') def __init__(self, host, port, db='khashmir.db'): - self.node = Node().init(newID(), host, port) - self.table = KTable(self.node) - self.app = Application("xmlrpc") - self.app.listenTCP(port, server.Site(self)) - self.findDB(db) - self.last = time.time() - KeyExpirer(store=self.store) - - def findDB(self, db): - import os - try: - os.stat(db) - except OSError: - self.createNewDB(db) - else: - self.loadDB(db) - - def loadDB(self, db): - try: - self.store = sqlite.connect(db=db) - except: - import traceback - raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback - - def createNewDB(self, db): - self.store = sqlite.connect(db=db) - s = """ - create table kv (key text, value text, time timestamp, primary key (key, value)); - create index kv_key on kv(key); - create index kv_timestamp on kv(time); - - create table nodes (id text primary key, host text, port number); - """ - c = self.store.cursor() - c.execute(s) - self.store.commit() - - def render(self, request): - """ - Override the built in render so we can have access to the request object! - note, crequest is probably only valid on the initial call (not after deferred!) - """ - self.crequest = request - return xmlrpc.XMLRPC.render(self, request) + self.setup(host, port, db) + + def setup(self, host, port, db='khashmir.db'): + self._findDB(db) + self.port = port + self.node = self._loadSelfNode(host, port) + self.table = KTable(self.node) + self.app = service.Application("krpc") + self.udp = krpc.hostbroker(self) + self.udp.protocol = krpc.KRPC + self.listenport = reactor.listenUDP(port, self.udp) + self.last = time.time() + self._loadRoutingTable() + KeyExpirer(store=self.store) + #self.refreshTable(force=1) + reactor.callLater(60, self.checkpoint, (1,)) + + def __del__(self): + self.listenport.stopListening() + + def _loadSelfNode(self, host, port): + c = self.store.cursor() + c.execute('select id from self where num = 0;') + if c.rowcount > 0: + id = c.fetchone()[0] + else: + id = newID() + return Node().init(id, host, port) + + def _saveSelfNode(self): + self.store.autocommit = 0 + c = self.store.cursor() + c.execute('delete from self where num = 0;') + c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id)) + self.store.commit() + self.store.autocommit = 1 + + def checkpoint(self, auto=0): + self._saveSelfNode() + self._dumpRoutingTable() + if auto: + reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint) + + def _findDB(self, db): + import os + try: + os.stat(db) + except OSError: + self._createNewDB(db) + else: + self._loadDB(db) + + def _loadDB(self, db): + try: + self.store = sqlite.connect(db=db) + self.store.autocommit = 1 + except: + import traceback + raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback + + def _createNewDB(self, db): + self.store = sqlite.connect(db=db) + self.store.autocommit = 1 + s = """ + create table kv (key binary, value binary, time timestamp, primary key (key, value)); + create index kv_key on kv(key); + create index kv_timestamp on kv(time); + + create table nodes (id binary primary key, host text, port number); + + create table self (num number primary key, id binary); + """ + c = self.store.cursor() + c.execute(s) + + def _dumpRoutingTable(self): + """ + save routing table nodes to the database + """ + self.store.autocommit = 0; + c = self.store.cursor() + c.execute("delete from nodes where id not NULL;") + for bucket in self.table.buckets: + for node in bucket.l: + c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port)) + self.store.commit() + self.store.autocommit = 1; + + def _loadRoutingTable(self): + """ + load routing table nodes from database + it's usually a good idea to call refreshTable(force=1) after loading the table + """ + c = self.store.cursor() + c.execute("select * from nodes;") + for rec in c.fetchall(): + n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.table.insertNode(n, contacted=0) + - ####### ####### LOCAL INTERFACE - use these methods! - def addContact(self, host, port): - """ - ping this node and add the contact info to the table on pong! - """ - n =Node().init(const.NULL_ID, host, port) # note, we - self.sendPing(n) - + def addContact(self, host, port, callback=None): + """ + ping this node and add the contact info to the table on pong! + """ + n =Node().init(const.NULL_ID, host, port) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.sendPing(n, callback=callback) ## this call is async! def findNode(self, id, callback, errback=None): - """ returns the contact info for node, or the k closest nodes, from the global table """ - # get K nodes out of local table/cache, or the node we want - nodes = self.table.findNodes(id) - d = Deferred() - if errback: - d.addCallbacks(callback, errback) - else: - d.addCallback(callback) - if len(nodes) == 1 and nodes[0].id == id : - d.callback(nodes) - else: - # create our search state - state = FindNode(self, id, d.callback) - reactor.callFromThread(state.goWithNodes, nodes) + """ returns the contact info for node, or the k closest nodes, from the global table """ + # get K nodes out of local table/cache, or the node we want + nodes = self.table.findNodes(id) + d = Deferred() + if errback: + d.addCallbacks(callback, errback) + else: + d.addCallback(callback) + if len(nodes) == 1 and nodes[0].id == id : + d.callback(nodes) + else: + # create our search state + state = FindNode(self, id, d.callback) + reactor.callFromThread(state.goWithNodes, nodes) ## also async - def valueForKey(self, key, callback): - """ returns the values found for key in global table - callback will be called with a list of values for each peer that returns unique values - final callback will be an empty list - probably should change to 'more coming' arg - """ - nodes = self.table.findNodes(key) - - # get locals - l = self.retrieveValues(key) - if len(l) > 0: - reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l)) - - # create our search state - state = GetValue(self, key, callback) - reactor.callFromThread(state.goWithNodes, nodes, l) - - + def valueForKey(self, key, callback, searchlocal = 1): + """ returns the values found for key in global table + callback will be called with a list of values for each peer that returns unique values + final callback will be an empty list - probably should change to 'more coming' arg + """ + nodes = self.table.findNodes(key) + + # get locals + if searchlocal: + l = self.retrieveValues(key) + if len(l) > 0: + reactor.callLater(0, callback, (l)) + else: + l = [] + + # create our search state + state = GetValue(self, key, callback) + reactor.callFromThread(state.goWithNodes, nodes, l) ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) def storeValueForKey(self, key, value, callback=None): - """ stores the value for key in the global table, returns immediately, no status - in this implementation, peers respond but don't indicate status to storing values - a key can have many values - """ - def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): - if not callback: - # default callback - def _storedValueHandler(sender): - pass - response=_storedValueHandler - - for node in nodes[:const.STORE_REDUNDANCY]: - def cb(t, table = table, node=node, resp=response): - self.table.insertNode(node) - response(t) - if node.id != self.node.id: - def default(err, node=node, table=table): - table.nodeFailed(node) - df = node.storeValue(key, value, self.node.senderDict()) - df.addCallbacks(cb, lambda x: None) - # this call is asynch - self.findNode(key, _storeValueForKey) - - + """ stores the value for key in the global table, returns immediately, no status + in this implementation, peers respond but don't indicate status to storing values + a key can have many values + """ + def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + if not response: + # default callback + def _storedValueHandler(sender): + pass + response=_storedValueHandler + action = StoreValue(self.table, key, value, response) + reactor.callFromThread(action.goWithNodes, nodes) + + # this call is asynch + self.findNode(key, _storeValueForKey) + + def insertNode(self, n, contacted=1): - """ - insert a node in our local table, pinging oldest contact in bucket, if necessary - - If all you have is a host/port, then use addContact, which calls this method after - receiving the PONG from the remote node. The reason for the seperation is we can't insert - a node into the table without it's peer-ID. That means of course the node passed into this - method needs to be a properly formed Node object with a valid ID. - """ - old = self.table.insertNode(n, contacted=contacted) - if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: - # the bucket is full, check to see if old node is still around and if so, replace it - - ## these are the callbacks used when we ping the oldest node in a bucket - def _staleNodeHandler(oldnode=old, newnode = n): - """ called if the pinged node never responds """ - self.table.replaceStaleNode(old, newnode) - - def _notStaleNodeHandler(sender, old=old): - """ called when we get a pong from the old node """ - sender = Node().initWithDict(sender) - if sender.id == old.id: - self.table.justSeenNode(old) - - df = old.ping(self.node.senderDict()) - df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) - - - def sendPing(self, node): - """ - ping a node - """ - df = node.ping(self.node.senderDict()) - ## these are the callbacks we use when we issue a PING - def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table): - l, sender = args - if id != const.NULL_ID and id != sender['id'].decode('base64'): - # whoah, got response from different peer than we were expecting - pass - else: - sender['host'] = host - sender['port'] = port - n = Node().initWithDict(sender) - table.insertNode(n) - return - def _defaultPong(err, node=node, table=self.table): - table.nodeFailed(node) - - df.addCallbacks(_pongHandler,_defaultPong) - + """ + insert a node in our local table, pinging oldest contact in bucket, if necessary + + If all you have is a host/port, then use addContact, which calls this method after + receiving the PONG from the remote node. The reason for the seperation is we can't insert + a node into the table without it's peer-ID. That means of course the node passed into this + method needs to be a properly formed Node object with a valid ID. + """ + old = self.table.insertNode(n, contacted=contacted) + if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: + # the bucket is full, check to see if old node is still around and if so, replace it + + ## these are the callbacks used when we ping the oldest node in a bucket + def _staleNodeHandler(oldnode=old, newnode = n): + """ called if the pinged node never responds """ + self.table.replaceStaleNode(old, newnode) + + def _notStaleNodeHandler(dict, old=old): + """ called when we get a pong from the old node """ + dict = dict['rsp'] + if dict['id'] == old.id: + self.table.justSeenNode(old.id) + + df = old.ping(self.node.id) + df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) + + def sendPing(self, node, callback=None): + """ + ping a node + """ + df = node.ping(self.node.id) + ## these are the callbacks we use when we issue a PING + def _pongHandler(dict, node=node, table=self.table, callback=callback): + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] + sender = {'id' : dict['id']} + if node.id != const.NULL_ID and node.id != sender['id']: + # whoah, got response from different peer than we were expecting + self.table.invalidateNode(node) + else: + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + table.insertNode(n) + if callback: + callback() + def _defaultPong(err, node=node, table=self.table, callback=callback): + table.nodeFailed(node) + if callback: + callback() + + df.addCallbacks(_pongHandler,_defaultPong) def findCloseNodes(self, callback=lambda a: None): - """ - This does a findNode on the ID one away from our own. - This will allow us to populate our table with nodes on our network closest to our own. - This is called as soon as we start up with an empty table - """ - id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) - self.findNode(id, callback) + """ + This does a findNode on the ID one away from our own. + This will allow us to populate our table with nodes on our network closest to our own. + This is called as soon as we start up with an empty table + """ + id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) + self.findNode(id, callback) + + def refreshTable(self, force=0): + """ + force=1 will refresh table regardless of last bucket access time + """ + def callback(nodes): + pass + + for bucket in self.table.buckets: + if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS): + id = newIDInRange(bucket.min, bucket.max) + self.findNode(id, callback) - def refreshTable(self): - """ - - """ - def callback(nodes): - pass - for bucket in self.table.buckets: - if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS: - id = newIDInRange(bucket.min, bucket.max) - self.findNode(id, callback) - - def retrieveValues(self, key): - s = "select value from kv where key = '%s';" % key.encode('base64') - c = self.store.cursor() - c.execute(s) - t = c.fetchone() - l = [] - while t: - l.append(t['value']) - t = c.fetchone() - return l - + c = self.store.cursor() + c.execute("select value from kv where key = %s;", sqlite.encode(key)) + t = c.fetchone() + l = [] + while t: + l.append(t['value']) + t = c.fetchone() + return l + ##### ##### INCOMING MESSAGE HANDLERS - def xmlrpc_ping(self, sender): - """ - takes sender dict = {'id', , 'port', port} optional keys = 'ip' - returns sender dict - """ - ip = self.crequest.getClientIP() - sender['host'] = ip - n = Node().initWithDict(sender) - self.insertNode(n, contacted=0) - return (), self.node.senderDict() - - def xmlrpc_find_node(self, target, sender): - nodes = self.table.findNodes(target.decode('base64')) - nodes = map(lambda node: node.senderDict(), nodes) - ip = self.crequest.getClientIP() - sender['host'] = ip - n = Node().initWithDict(sender) - self.insertNode(n, contacted=0) - return nodes, self.node.senderDict() - - def xmlrpc_store_value(self, key, value, sender): - t = "%0.6f" % time.time() - s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t) - c = self.store.cursor() - try: - c.execute(s) - except pysqlite_exceptions.IntegrityError, reason: - # update last insert time - s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value) - c.execute(s) - self.store.commit() - ip = self.crequest.getClientIP() - sender['host'] = ip - n = Node().initWithDict(sender) - self.insertNode(n, contacted=0) - return (), self.node.senderDict() - - def xmlrpc_find_value(self, key, sender): - ip = self.crequest.getClientIP() - key = key.decode('base64') - sender['host'] = ip - n = Node().initWithDict(sender) - self.insertNode(n, contacted=0) - - l = self.retrieveValues(key) - if len(l) > 0: - return {'values' : l}, self.node.senderDict() - else: - nodes = self.table.findNodes(key) - nodes = map(lambda node: node.senderDict(), nodes) - return {'nodes' : nodes}, self.node.senderDict() - - - - - -#------ testing - -def test_build_net(quiet=0, peers=24, host='localhost', pause=1): - from whrandom import randrange - import threading - import thread - port = 2001 - l = [] + def krpc_ping(self, id, _krpc_sender): + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"id" : self.node.id} - if not quiet: - print "Building %s peer table." % peers - - for i in xrange(peers): - a = Khashmir(host, port + i, db = '/tmp/test'+`i`) - l.append(a) + def krpc_find_node(self, target, id, _krpc_sender): + nodes = self.table.findNodes(target) + nodes = map(lambda node: node.senderDict(), nodes) + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"nodes" : nodes, "id" : self.node.id} + + def krpc_store_value(self, key, value, id, _krpc_sender): + t = "%0.6f" % time.time() + c = self.store.cursor() + try: + c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t)) + except sqlite.IntegrityError, reason: + # update last insert time + c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value))) + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"id" : self.node.id} - - thread.start_new_thread(l[0].app.run, ()) - time.sleep(1) - for peer in l[1:]: - peer.app.run() - time.sleep(10) - - print "adding contacts...." - - for peer in l[1:]: - n = l[randrange(0, len(l))].node - peer.addContact(host, n.port) - n = l[randrange(0, len(l))].node - peer.addContact(host, n.port) - n = l[randrange(0, len(l))].node - peer.addContact(host, n.port) - if pause: - time.sleep(.33) - - time.sleep(10) - print "finding close nodes...." - - for peer in l: - flag = threading.Event() - def cb(nodes, f=flag): - f.set() - peer.findCloseNodes(cb) - flag.wait() - -# for peer in l: -# peer.refreshTable() - return l - -def test_find_nodes(l, quiet=0): - import threading, sys - from whrandom import randrange - flag = threading.Event() - - n = len(l) - - a = l[randrange(0,n)] - b = l[randrange(0,n)] - - def callback(nodes, flag=flag, id = b.node.id): - if (len(nodes) >0) and (nodes[0].id == id): - print "test_find_nodes PASSED" - else: - print "test_find_nodes FAILED" - flag.set() - a.findNode(b.node.id, callback) - flag.wait() - -def test_find_value(l, quiet=0): - from whrandom import randrange - from sha import sha - from hash import newID - import time, threading, sys + def krpc_find_value(self, key, id, _krpc_sender): + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) - fa = threading.Event() - fb = threading.Event() - fc = threading.Event() - - n = len(l) - a = l[randrange(0,n)] - b = l[randrange(0,n)] - c = l[randrange(0,n)] - d = l[randrange(0,n)] - - key = newID() - value = newID() - if not quiet: - print "inserting value..." - sys.stdout.flush() - a.storeValueForKey(key, value) - time.sleep(3) - print "finding..." - sys.stdout.flush() - - class cb: - def __init__(self, flag, value=value): - self.flag = flag - self.val = value - self.found = 0 - def callback(self, values): - try: - if(len(values) == 0): - if not self.found: - print "find NOT FOUND" - else: - print "find FOUND" - sys.stdout.flush() + l = self.retrieveValues(key) + if len(l) > 0: + return {'values' : l, "id": self.node.id} + else: + nodes = self.table.findNodes(key) + nodes = map(lambda node: node.senderDict(), nodes) + return {'nodes' : nodes, "id": self.node.id} - else: - if self.val in values: - self.found = 1 - finally: - self.flag.set() - - b.valueForKey(key, cb(fa).callback) - fa.wait() - c.valueForKey(key, cb(fb).callback) - fb.wait() - d.valueForKey(key, cb(fc).callback) - fc.wait() - -def test_one(host, port, db='/tmp/test'): - import thread - k = Khashmir(host, port, db) - thread.start_new_thread(k.app.run, ()) - return k - -if __name__ == "__main__": - import sys - n = 8 - if len(sys.argv) > 1: - n = int(sys.argv[1]) - l = test_build_net(peers=n) - time.sleep(3) - print "finding nodes..." - for i in range(10): - test_find_nodes(l) - print "inserting and fetching values..." - for i in range(10): - test_find_value(l)