X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=khashmir.py;h=3ca5d7250d0cfb82c0529114606e7e62ed04ebd8;hb=2ce2ba6146457d6c321e8ece8116d422cda39a70;hp=2b716e62937d7bf65a68ba0bea90ddba6fae95b0;hpb=75febfbd4cda5f4953f5b93d355444671e853551;p=quix0rs-apt-p2p.git diff --git a/khashmir.py b/khashmir.py index 2b716e6..3ca5d72 100644 --- a/khashmir.py +++ b/khashmir.py @@ -1,16 +1,18 @@ ## Copyright 2002 Andrew Loewenstern, All Rights Reserved from const import reactor +import const + import time -from pickle import loads, dumps + from sha import sha from ktable import KTable, K from knode import KNode as Node -from hash import newID +from hash import newID, newIDInRange -from actions import FindNode, GetValue +from actions import FindNode, GetValue, KeyExpirer from twisted.web import xmlrpc from twisted.internet.defer import Deferred from twisted.python import threadable @@ -18,371 +20,466 @@ from twisted.internet.app import Application from twisted.web import server threadable.init() -from bsddb3 import db ## find this at http://pybsddb.sf.net/ -from bsddb3._db import DBNotFoundError - -# don't ping unless it's been at least this many seconds since we've heard from a peer -MAX_PING_INTERVAL = 60 * 15 # fifteen minutes - +import sqlite ## find this at http://pysqlite.sourceforge.net/ +import pysqlite_exceptions +KhashmirDBExcept = "KhashmirDBExcept" # this is the main class! class Khashmir(xmlrpc.XMLRPC): - __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app'] - def __init__(self, host, port): - self.node = Node(newID(), host, port) - self.table = KTable(self.node) - self.app = Application("xmlrpc") - self.app.listenTCP(port, server.Site(self)) + __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last') + def __init__(self, host, port, db='khashmir.db'): + self.setup(host, port, db) + + def setup(self, host, port, db='khashmir.db'): + self._findDB(db) + self.node = self._loadSelfNode(host, port) + self.table = KTable(self.node) + self._loadRoutingTable() + self.app = Application("xmlrpc") + self.app.listenTCP(port, server.Site(self)) + self.last = time.time() + KeyExpirer(store=self.store) + #self.refreshTable(force=1) + reactor.callLater(60, self.checkpoint, (1,)) + + def _loadSelfNode(self, host, port): + c = self.store.cursor() + c.execute('select id from self where num = 0;') + if c.rowcount > 0: + id = c.fetchone()[0].decode('base64') + else: + id = newID() + return Node().init(id, host, port) + + def _saveSelfNode(self): + self.store.autocommit = 0 + c = self.store.cursor() + c.execute('delete from self where num = 0;') + c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64')) + self.store.commit() + self.store.autocommit = 1 + + def checkpoint(self, auto=0): + self._saveSelfNode() + self._dumpRoutingTable() + if auto: + reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint) + + def _findDB(self, db): + import os + try: + os.stat(db) + except OSError: + self._createNewDB(db) + else: + self._loadDB(db) + + def _loadDB(self, db): + try: + self.store = sqlite.connect(db=db) + self.store.autocommit = 1 + except: + import traceback + raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback + + def _createNewDB(self, db): + self.store = sqlite.connect(db=db) + self.store.autocommit = 1 + s = """ + create table kv (key text, value text, time timestamp, primary key (key, value)); + create index kv_key on kv(key); + create index kv_timestamp on kv(time); + + create table nodes (id text primary key, host text, port number); + + create table self (num number primary key, id text); + """ + c = self.store.cursor() + c.execute(s) + + def _dumpRoutingTable(self): + """ + save routing table nodes to the database + """ + self.store.autocommit = 0; + c = self.store.cursor() + c.execute("delete from nodes where id not NULL;") + for bucket in self.table.buckets: + for node in bucket.l: + d = node.senderDict() + c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port'])) + self.store.commit() + self.store.autocommit = 1; + + def _loadRoutingTable(self): + """ + load routing table nodes from database + it's usually a good idea to call refreshTable(force=1) after loading the table + """ + c = self.store.cursor() + c.execute("select * from nodes;") + for rec in c.fetchall(): + n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) + self.table.insertNode(n, contacted=0) + + def render(self, request): + """ + Override the built in render so we can have access to the request object! + note, crequest is probably only valid on the initial call (not after deferred!) + """ + self.crequest = request + return xmlrpc.XMLRPC.render(self, request) + + + ####### + ####### LOCAL INTERFACE - use these methods! + def addContact(self, host, port): + """ + ping this node and add the contact info to the table on pong! + """ + n =Node().init(const.NULL_ID, host, port) # note, we + self.sendPing(n) + + ## this call is async! + def findNode(self, id, callback, errback=None): + """ returns the contact info for node, or the k closest nodes, from the global table """ + # get K nodes out of local table/cache, or the node we want + nodes = self.table.findNodes(id) + d = Deferred() + if errback: + d.addCallbacks(callback, errback) + else: + d.addCallback(callback) + if len(nodes) == 1 and nodes[0].id == id : + d.callback(nodes) + else: + # create our search state + state = FindNode(self, id, d.callback) + reactor.callFromThread(state.goWithNodes, nodes) - ## these databases may be more suited to on-disk rather than in-memory - # h((key, value)) -> (key, value, time) mappings - self.store = db.DB() - self.store.open(None, None, db.DB_BTREE) - # -> h((key, value)) - self.itime = db.DB() - self.itime.set_flags(db.DB_DUP) - self.itime.open(None, None, db.DB_BTREE) - - # key -> h((key, value)) - self.kw = db.DB() - self.kw.set_flags(db.DB_DUP) - self.kw.open(None, None, db.DB_BTREE) - - - def render(self, request): - """ - Override the built in render so we can have access to the request object! - note, crequest is probably only valid on the initial call (not after deferred!) - """ - self.crequest = request - return xmlrpc.XMLRPC.render(self, request) - + ## also async + def valueForKey(self, key, callback): + """ returns the values found for key in global table + callback will be called with a list of values for each peer that returns unique values + final callback will be an empty list - probably should change to 'more coming' arg + """ + nodes = self.table.findNodes(key) + + # get locals + l = self.retrieveValues(key) + if len(l) > 0: + reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l)) + + # create our search state + state = GetValue(self, key, callback) + reactor.callFromThread(state.goWithNodes, nodes, l) + + ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) + def storeValueForKey(self, key, value, callback=None): + """ stores the value for key in the global table, returns immediately, no status + in this implementation, peers respond but don't indicate status to storing values + a key can have many values + """ + def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + if not response: + # default callback + def _storedValueHandler(sender): + pass + response=_storedValueHandler + + for node in nodes[:const.STORE_REDUNDANCY]: + def cb(t, table = table, node=node, resp=response): + self.table.insertNode(node) + response(t) + if node.id != self.node.id: + def default(err, node=node, table=table): + table.nodeFailed(node) + df = node.storeValue(key, value, self.node.senderDict()) + df.addCallbacks(cb, default) + # this call is asynch + self.findNode(key, _storeValueForKey) - ####### - ####### LOCAL INTERFACE - use these methods! - def addContact(self, host, port): - """ - ping this node and add the contact info to the table on pong! - """ - n =Node(" "*20, host, port) # note, we - self.sendPing(n) - - - ## this call is async! - def findNode(self, id, callback, errback=None): - """ returns the contact info for node, or the k closest nodes, from the global table """ - # get K nodes out of local table/cache, or the node we want - nodes = self.table.findNodes(id) - d = Deferred() - d.addCallbacks(callback, errback) - if len(nodes) == 1 and nodes[0].id == id : - d.callback(nodes) - else: - # create our search state - state = FindNode(self, id, d.callback) - reactor.callFromThread(state.goWithNodes, nodes) - - - ## also async - def valueForKey(self, key, callback): - """ returns the values found for key in global table """ - nodes = self.table.findNodes(key) - # create our search state - state = GetValue(self, key, callback) - reactor.callFromThread(state.goWithNodes, nodes) - + + def insertNode(self, n, contacted=1): + """ + insert a node in our local table, pinging oldest contact in bucket, if necessary + + If all you have is a host/port, then use addContact, which calls this method after + receiving the PONG from the remote node. The reason for the seperation is we can't insert + a node into the table without it's peer-ID. That means of course the node passed into this + method needs to be a properly formed Node object with a valid ID. + """ + old = self.table.insertNode(n, contacted=contacted) + if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: + # the bucket is full, check to see if old node is still around and if so, replace it + + ## these are the callbacks used when we ping the oldest node in a bucket + def _staleNodeHandler(oldnode=old, newnode = n): + """ called if the pinged node never responds """ + self.table.replaceStaleNode(old, newnode) + + def _notStaleNodeHandler(sender, old=old): + """ called when we get a pong from the old node """ + args, sender = sender + sender = Node().initWithDict(sender) + if sender.id == old.id: + self.table.justSeenNode(old) + + df = old.ping(self.node.senderDict()) + df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) + + def sendPing(self, node): + """ + ping a node + """ + df = node.ping(self.node.senderDict()) + ## these are the callbacks we use when we issue a PING + def _pongHandler(args, node=node, table=self.table): + l, sender = args + if node.id != const.NULL_ID and node.id != sender['id'].decode('base64'): + # whoah, got response from different peer than we were expecting + self.table.invalidateNode(node) + else: + sender['host'] = node.host + sender['port'] = node.port + n = Node().initWithDict(sender) + table.insertNode(n) + return + def _defaultPong(err, node=node, table=self.table): + table.nodeFailed(node) + + df.addCallbacks(_pongHandler,_defaultPong) + + def findCloseNodes(self, callback=lambda a: None): + """ + This does a findNode on the ID one away from our own. + This will allow us to populate our table with nodes on our network closest to our own. + This is called as soon as we start up with an empty table + """ + id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) + self.findNode(id, callback) - ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now - def storeValueForKey(self, key, value): - """ stores the value for key in the global table, returns immediately, no status - in this implementation, peers respond but don't indicate status to storing values - values are stored in peers on a first-come first-served basis - this will probably change so more than one value can be stored under a key - """ - def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"): - for node in nodes: - if node.id != self.node.id: - df = node.storeValue(key, value, self.node.senderDict()) - df.addCallbacks(response, default) - # this call is asynch - self.findNode(key, _storeValueForKey) + def refreshTable(self, force=0): + """ + force=1 will refresh table regardless of last bucket access time + """ + def callback(nodes): + pass + for bucket in self.table.buckets: + if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS): + id = newIDInRange(bucket.min, bucket.max) + self.findNode(id, callback) + + + def retrieveValues(self, key): + s = "select value from kv where key = '%s';" % key.encode('base64') + c = self.store.cursor() + c.execute(s) + t = c.fetchone() + l = [] + while t: + l.append(t['value']) + t = c.fetchone() + return l - def insertNode(self, n): - """ - insert a node in our local table, pinging oldest contact in bucket, if necessary + ##### + ##### INCOMING MESSAGE HANDLERS - If all you have is a host/port, then use addContact, which calls this method after - receiving the PONG from the remote node. The reason for the seperation is we can't insert - a node into the table without it's peer-ID. That means of course the node passed into this - method needs to be a properly formed Node object with a valid ID. - """ - old = self.table.insertNode(n) - if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id: - # the bucket is full, check to see if old node is still around and if so, replace it - - ## these are the callbacks used when we ping the oldest node in a bucket - def _staleNodeHandler(oldnode=old, newnode = n): - """ called if the pinged node never responds """ - self.table.replaceStaleNode(old, newnode) + def xmlrpc_ping(self, sender): + """ + takes sender dict = {'id', , 'port', port} optional keys = 'ip' + returns sender dict + """ + ip = self.crequest.getClientIP() + sender['host'] = ip + n = Node().initWithDict(sender) + self.insertNode(n, contacted=0) + return (), self.node.senderDict() + + def xmlrpc_find_node(self, target, sender): + nodes = self.table.findNodes(target.decode('base64')) + nodes = map(lambda node: node.senderDict(), nodes) + ip = self.crequest.getClientIP() + sender['host'] = ip + n = Node().initWithDict(sender) + self.insertNode(n, contacted=0) + return nodes, self.node.senderDict() + + def xmlrpc_store_value(self, key, value, sender): + t = "%0.6f" % time.time() + s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t) + c = self.store.cursor() + try: + c.execute(s) + except pysqlite_exceptions.IntegrityError, reason: + # update last insert time + s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value) + c.execute(s) + ip = self.crequest.getClientIP() + sender['host'] = ip + n = Node().initWithDict(sender) + self.insertNode(n, contacted=0) + return (), self.node.senderDict() - def _notStaleNodeHandler(sender, old=old): - """ called when we get a ping from the remote node """ - if sender['id'] == old.id: - self.table.insertNode(old) - - df = old.ping(self.node.senderDict()) - df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler) - - - def sendPing(self, node): - """ - ping a node - """ - df = node.ping(self.node.senderDict()) - ## these are the callbacks we use when we issue a PING - def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table): - if id != 20 * ' ' and id != sender['id']: - # whoah, got response from different peer than we were expecting - pass - else: - #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port) - n = Node(sender['id'], host, port) - table.insertNode(n) - return - def _defaultPong(err): - # this should probably increment a failed message counter and dump the node if it gets over a threshold - return - - df.addCallbacks(_pongHandler,_defaultPong) - + def xmlrpc_find_value(self, key, sender): + ip = self.crequest.getClientIP() + key = key.decode('base64') + sender['host'] = ip + n = Node().initWithDict(sender) + self.insertNode(n, contacted=0) + + l = self.retrieveValues(key) + if len(l) > 0: + return {'values' : l}, self.node.senderDict() + else: + nodes = self.table.findNodes(key) + nodes = map(lambda node: node.senderDict(), nodes) + return {'nodes' : nodes}, self.node.senderDict() - def findCloseNodes(self): - """ - This does a findNode on the ID one away from our own. - This will allow us to populate our table with nodes on our network closest to our own. - This is called as soon as we start up with an empty table - """ - id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) - def callback(nodes): - pass - self.findNode(id, callback) +### TESTING ### +from random import randrange +import threading, thread, sys, time +from sha import sha +from hash import newID - def refreshTable(self): - """ - - """ - def callback(nodes): - pass - for bucket in self.table.buckets: - if time.time() - bucket.lastAccessed >= 60 * 60: - id = randRange(bucket.min, bucket.max) - self.findNode(id, callback) +def test_net(peers=24, startport=2001, dbprefix='/tmp/test'): + import thread + l = [] + for i in xrange(peers): + a = Khashmir('localhost', startport + i, db = dbprefix+`i`) + l.append(a) + thread.start_new_thread(l[0].app.run, ()) + for peer in l[1:]: + peer.app.run() + return l - - ##### - ##### INCOMING MESSAGE HANDLERS - - def xmlrpc_ping(self, sender): - """ - takes sender dict = {'id', , 'port', port} optional keys = 'ip' - returns sender dict - """ - ip = self.crequest.getClientIP() - n = Node(sender['id'], ip, sender['port']) - self.insertNode(n) - return self.node.senderDict() - - def xmlrpc_find_node(self, target, sender): - nodes = self.table.findNodes(target) - nodes = map(lambda node: node.senderDict(), nodes) - ip = self.crequest.getClientIP() - n = Node(sender['id'], ip, sender['port']) - self.insertNode(n) - return nodes, self.node.senderDict() - - def xmlrpc_store_value(self, key, value, sender): - h1 = sha(key+value).digest() - t = `time.time()` - if not self.store.has_key(h1): - v = dumps((key, value, t)) - self.store.put(h1, v) - self.itime.put(t, h1) - self.kw.put(key, h1) - else: - # update last insert time - tup = loads(self.store[h1]) - self.store[h1] = dumps((tup[0], tup[1], t)) - self.itime.put(t, h1) - - ip = self.crequest.getClientIP() - n = Node(sender['id'], ip, sender['port']) - self.insertNode(n) - return self.node.senderDict() +def test_build_net(quiet=0, peers=24, host='localhost', pause=0, startport=2001, dbprefix='/tmp/test'): + from whrandom import randrange + import threading + import thread + import sys + port = startport + l = [] + if not quiet: + print "Building %s peer table." % peers - def xmlrpc_find_value(self, key, sender): - ip = self.crequest.getClientIP() - n = Node(sender['id'], ip, sender['port']) - self.insertNode(n) - if self.kw.has_key(key): - c = self.kw.cursor() - tup = c.set_range(key) - l = [] - while(tup): - h1 = tup[1] - v = loads(self.store[h1])[1] - l.append(v) - tup = c.next() - return {'values' : l}, self.node.senderDict() - else: - nodes = self.table.findNodes(key) - nodes = map(lambda node: node.senderDict(), nodes) - return {'nodes' : nodes}, self.node.senderDict() - - ### - ### message response callbacks - # called when we get a response to store value - def _storedValueHandler(self, sender): - pass - - - - - - -#------ testing - -def test_build_net(quiet=0, peers=64, pause=1): - from whrandom import randrange - import thread - port = 2001 - l = [] - - if not quiet: - print "Building %s peer table." % peers + for i in xrange(peers): + a = Khashmir(host, port + i, db = dbprefix +`i`) + l.append(a) - for i in xrange(peers): - a = Khashmir('localhost', port + i) - l.append(a) - - - thread.start_new_thread(l[0].app.run, ()) - time.sleep(1) - for peer in l[1:]: - peer.app.run() - #time.sleep(.25) - - print "adding contacts...." - - for peer in l[1:]: - n = l[randrange(0, len(l))].node - peer.addContact(n.host, n.port) - n = l[randrange(0, len(l))].node - peer.addContact(n.host, n.port) - n = l[randrange(0, len(l))].node - peer.addContact(n.host, n.port) - if pause: - time.sleep(.30) - - time.sleep(1) - print "finding close nodes...." - - for peer in l: - peer.findCloseNodes() - if pause: - time.sleep(.5) - if pause: - time.sleep(10) -# for peer in l: -# peer.refreshTable() - return l + + thread.start_new_thread(l[0].app.run, ()) + time.sleep(1) + for peer in l[1:]: + peer.app.run() + time.sleep(3) + + print "adding contacts...." + + for peer in l: + n = l[randrange(0, len(l))].node + peer.addContact(host, n.port) + n = l[randrange(0, len(l))].node + peer.addContact(host, n.port) + n = l[randrange(0, len(l))].node + peer.addContact(host, n.port) + if pause: + time.sleep(.33) + + time.sleep(10) + print "finding close nodes...." + + for peer in l: + flag = threading.Event() + def cb(nodes, f=flag): + f.set() + peer.findCloseNodes(cb) + flag.wait() + # for peer in l: + # peer.refreshTable() + return l def test_find_nodes(l, quiet=0): - import threading, sys - from whrandom import randrange - flag = threading.Event() - - n = len(l) - - a = l[randrange(0,n)] - b = l[randrange(0,n)] - - def callback(nodes, flag=flag): - if (len(nodes) >0) and (nodes[0].id == b.node.id): - print "test_find_nodes PASSED" - else: - print "test_find_nodes FAILED" - flag.set() - a.findNode(b.node.id, callback) - flag.wait() + flag = threading.Event() + + n = len(l) + + a = l[randrange(0,n)] + b = l[randrange(0,n)] + + def callback(nodes, flag=flag, id = b.node.id): + if (len(nodes) >0) and (nodes[0].id == id): + print "test_find_nodes PASSED" + else: + print "test_find_nodes FAILED" + flag.set() + a.findNode(b.node.id, callback) + flag.wait() def test_find_value(l, quiet=0): - from whrandom import randrange - from sha import sha - import time, threading, sys - - fa = threading.Event() - fb = threading.Event() - fc = threading.Event() - - n = len(l) - a = l[randrange(0,n)] - b = l[randrange(0,n)] - c = l[randrange(0,n)] - d = l[randrange(0,n)] - - key = sha(`randrange(0,100000)`).digest() - value = sha(`randrange(0,100000)`).digest() - if not quiet: - print "inserting value..." - sys.stdout.flush() - a.storeValueForKey(key, value) - time.sleep(3) - print "finding..." - sys.stdout.flush() - - class cb: - def __init__(self, flag, value=value): - self.flag = flag - self.val = value - self.found = 0 - def callback(self, values): - try: - if(len(values) == 0): - if not self.found: - print "find FAILED" - else: - print "find FOUND" - sys.stdout.flush() - - else: - if self.val in values: - self.found = 1 - finally: - self.flag.set() - - b.valueForKey(key, cb(fa).callback) - fa.wait() - c.valueForKey(key, cb(fb).callback) - fb.wait() - d.valueForKey(key, cb(fc).callback) - fc.wait() + + fa = threading.Event() + fb = threading.Event() + fc = threading.Event() + + n = len(l) + a = l[randrange(0,n)] + b = l[randrange(0,n)] + c = l[randrange(0,n)] + d = l[randrange(0,n)] + + key = newID() + value = newID() + if not quiet: print "inserting value..." + a.storeValueForKey(key, value) + time.sleep(3) + if not quiet: + print "finding..." + + class cb: + def __init__(self, flag, value=value): + self.flag = flag + self.val = value + self.found = 0 + def callback(self, values): + try: + if(len(values) == 0): + if not self.found: + print "find NOT FOUND" + else: + print "find FOUND" + else: + if self.val in values: + self.found = 1 + finally: + self.flag.set() + + b.valueForKey(key, cb(fa).callback) + fa.wait() + c.valueForKey(key, cb(fb).callback) + fb.wait() + d.valueForKey(key, cb(fc).callback) + fc.wait() -def test_one(port): - import thread - k = Khashmir('localhost', port) - thread.start_new_thread(k.app.run, ()) - return k +def test_one(host, port, db='/tmp/test'): + import thread + k = Khashmir(host, port, db) + thread.start_new_thread(k.app.run, ()) + return k if __name__ == "__main__": - l = test_build_net() + import sys + n = 8 + if len(sys.argv) > 1: n = int(sys.argv[1]) + l = test_build_net(peers=n) time.sleep(3) print "finding nodes..." for i in range(10): - test_find_nodes(l) + test_find_nodes(l) print "inserting and fetching values..." for i in range(10): - test_find_value(l) + test_find_value(l)