X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=khashmir.py;h=19bd3d8537af3cb2d884a487ad6f1de82a6558c0;hp=b2a624614abde817f1f1caf05fe4f9ed49c6adeb;hb=51f0e78ede4836ab8b1d8c5f389e511d4ec9eac2;hpb=4ce86361073035f184d541b52f23a3bc07aae952 diff --git a/khashmir.py b/khashmir.py index b2a6246..19bd3d8 100644 --- a/khashmir.py +++ b/khashmir.py @@ -22,39 +22,51 @@ from twisted.internet.app import Application from twisted.web import server threadable.init() -from bsddb3 import db ## find this at http://pybsddb.sf.net/ -from bsddb3._db import DBNotFoundError - -from xmlrpclib import Binary +import sqlite ## find this at http://pysqlite.sourceforge.net/ +KhashmirDBExcept = "KhashmirDBExcept" # this is the main class! class Khashmir(xmlrpc.XMLRPC): - __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app'] - def __init__(self, host, port): + __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last') + def __init__(self, host, port, db='khashmir.db'): self.node = Node().init(newID(), host, port) self.table = KTable(self.node) self.app = Application("xmlrpc") self.app.listenTCP(port, server.Site(self)) - - ## these databases may be more suited to on-disk rather than in-memory - # h((key, value)) -> (key, value, time) mappings - self.store = db.DB() - self.store.open(None, None, db.DB_BTREE) - - # -> h((key, value)) - self.itime = db.DB() - self.itime.set_flags(db.DB_DUP) - self.itime.open(None, None, db.DB_BTREE) - - # key -> h((key, value)) - self.kw = db.DB() - self.kw.set_flags(db.DB_DUP) - self.kw.open(None, None, db.DB_BTREE) - - KeyExpirer(store=self.store, itime=self.itime, kw=self.kw) - + self.findDB(db) + self.last = time.time() + KeyExpirer(store=self.store) + + def findDB(self, db): + import os + try: + os.stat(db) + except OSError: + self.createNewDB(db) + else: + self.loadDB(db) + + def loadDB(self, db): + try: + self.store = sqlite.connect(db=db) + except: + import traceback + raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback + + def createNewDB(self, db): + self.store = sqlite.connect(db=db) + s = """ + create table kv (hkv text primary key, key text, value text, time timestamp); + create index kv_key on kv(key); + + create table nodes (id text primary key, host text, port number); + """ + c = self.store.cursor() + c.execute(s) + self.store.commit() + def render(self, request): """ Override the built in render so we can have access to the request object! @@ -94,13 +106,16 @@ class Khashmir(xmlrpc.XMLRPC): ## also async def valueForKey(self, key, callback): - """ returns the values found for key in global table """ + """ returns the values found for key in global table + callback will be called with a list of values for each peer that returns unique values + final callback will be an empty list - probably should change to 'more coming' arg + """ nodes = self.table.findNodes(key) # get locals l = self.retrieveValues(key) if len(l) > 0: - reactor.callFromThread(callback, l) + reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l)) # create our search state state = GetValue(self, key, callback) @@ -108,16 +123,15 @@ class Khashmir(xmlrpc.XMLRPC): - ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now + ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) def storeValueForKey(self, key, value, callback=None): """ stores the value for key in the global table, returns immediately, no status in this implementation, peers respond but don't indicate status to storing values - values are stored in peers on a first-come first-served basis - this will probably change so more than one value can be stored under a key + a key can have many values """ def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): if not callback: - # default callback - this will get called for each successful store value + # default callback def _storedValueHandler(sender): pass response=_storedValueHandler @@ -171,7 +185,7 @@ class Khashmir(xmlrpc.XMLRPC): ## these are the callbacks we use when we issue a PING def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table): l, sender = args - if id != const.NULL_ID and id != sender['id'].data: + if id != const.NULL_ID and id != sender['id'].decode('base64'): # whoah, got response from different peer than we were expecting pass else: @@ -211,17 +225,15 @@ class Khashmir(xmlrpc.XMLRPC): def retrieveValues(self, key): - if self.kw.has_key(key): - c = self.kw.cursor() - tup = c.set(key) - l = [] - while(tup and tup[0] == key): - h1 = tup[1] - v = loads(self.store[h1])[1] - l.append(v) - tup = c.next() - return l - return [] + s = "select value from kv where key = '%s';" % key.encode('base64') + c = self.store.cursor() + c.execute(s) + t = c.fetchone() + l = [] + while t: + l.append(t['value']) + t = c.fetchone() + return l ##### ##### INCOMING MESSAGE HANDLERS @@ -238,29 +250,26 @@ class Khashmir(xmlrpc.XMLRPC): return (), self.node.senderDict() def xmlrpc_find_node(self, target, sender): - nodes = self.table.findNodes(target.data) + nodes = self.table.findNodes(target.decode('base64')) nodes = map(lambda node: node.senderDict(), nodes) ip = self.crequest.getClientIP() sender['host'] = ip n = Node().initWithDict(sender) self.insertNode(n, contacted=0) return nodes, self.node.senderDict() - + def xmlrpc_store_value(self, key, value, sender): - key = key.data - h1 = sha(key+value.data).digest() + h1 = sha(key+value).digest().encode('base64') t = `time.time()` - if not self.store.has_key(h1): - v = dumps((key, value.data, t)) - self.store.put(h1, v) - self.itime.put(t, h1) - self.kw.put(key, h1) - else: + s = "insert into kv values ('%s', '%s', '%s', '%s')" % (h1, key, value, t) + c = self.store.cursor() + try: + c.execute(s) + except: # update last insert time - tup = loads(self.store[h1]) - self.store[h1] = dumps((tup[0], tup[1], t)) - self.itime.put(t, h1) - + s = "update kv set time = '%s' where hkv = '%s'" % (t, h1) + c.execute(s) + self.store.commit() ip = self.crequest.getClientIP() sender['host'] = ip n = Node().initWithDict(sender) @@ -269,14 +278,13 @@ class Khashmir(xmlrpc.XMLRPC): def xmlrpc_find_value(self, key, sender): ip = self.crequest.getClientIP() - key = key.data + key = key.decode('base64') sender['host'] = ip n = Node().initWithDict(sender) self.insertNode(n, contacted=0) l = self.retrieveValues(key) if len(l) > 0: - l = map(lambda v: Binary(v), l) return {'values' : l}, self.node.senderDict() else: nodes = self.table.findNodes(key) @@ -299,7 +307,7 @@ def test_build_net(quiet=0, peers=24, host='localhost', pause=1): print "Building %s peer table." % peers for i in xrange(peers): - a = Khashmir(host, port + i) + a = Khashmir(host, port + i, db = '/tmp/test'+`i`) l.append(a) @@ -406,9 +414,9 @@ def test_find_value(l, quiet=0): d.valueForKey(key, cb(fc).callback) fc.wait() -def test_one(host, port): +def test_one(host, port, db='/tmp/test'): import thread - k = Khashmir(host, port) + k = Khashmir(host, port, db) thread.start_new_thread(k.app.run, ()) return k