From 51f0e78ede4836ab8b1d8c5f389e511d4ec9eac2 Mon Sep 17 00:00:00 2001 From: burris Date: Mon, 14 Oct 2002 03:53:01 +0000 Subject: [PATCH] changed from bsddb3 to pysqlite --- actions.py | 46 +++---------------- const.py | 6 ++- hash.py | 4 +- khashmir.py | 130 ++++++++++++++++++++++++++++------------------------ knode.py | 10 ++-- node.py | 4 +- 6 files changed, 87 insertions(+), 113 deletions(-) diff --git a/actions.py b/actions.py index d5b2b68..42bec99 100644 --- a/actions.py +++ b/actions.py @@ -127,7 +127,7 @@ class GetValue(FindNode): self.found[n.id] = n elif l.has_key('values'): def x(y, z=self.results): - y = y.data + y = y.decode('base64') if not z.has_key(y): z[y] = 1 return y @@ -178,10 +178,8 @@ class GetValue(FindNode): class KeyExpirer: - def __init__(self, store, itime, kw): + def __init__(self, store): self.store = store - self.itime = itime - self.kw = kw reactor.callLater(const.KEINITIAL_DELAY, self.doExpire) def doExpire(self): @@ -189,41 +187,9 @@ class KeyExpirer: self._expire() def _expire(self): - ic = self.itime.cursor() - sc = self.store.cursor() - kc = self.kw.cursor() - irec = None - try: - irec = ic.set_range(self.cut) - except db.DBNotFoundError: - # everything is expired - f = ic.prev - irec = f() - else: - f = ic.next - i = 0 - while irec: - it, h = irec - try: - k, v, lt = loads(self.store[h]) - except KeyError: - ic.delete() - else: - if lt < self.cut: - try: - kc.set_both(k, h) - except db.DBNotFoundError: - print "Database inconsistency! No key->value entry when a store entry was found!" - else: - kc.delete() - self.store.delete(h) - ic.delete() - i = i + 1 - else: - break - irec = f() - + c = self.store.cursor() + s = "delete from kv where time < '%s';" % self.cut + c.execute(s) + self.store.commit() reactor.callLater(const.KE_DELAY, self.doExpire) - if(i > 0): - print ">>>KE: done expiring %d" % i \ No newline at end of file diff --git a/const.py b/const.py index c1a7360..f5feed4 100644 --- a/const.py +++ b/const.py @@ -7,6 +7,7 @@ main.installReactor(reactor) # magic id to use before we know a peer's id NULL_ID = 20 * '\0' + ### SEARCHING/STORING # concurrent xmlrpc calls per find node/value request! CONCURRENT_REQS = 4 @@ -14,6 +15,7 @@ CONCURRENT_REQS = 4 # how many hosts to post to STORE_REDUNDANCY = 3 + ### ROUTING TABLE STUFF # how many times in a row a node can fail to respond before it's booted from the routing table MAX_FAILURES = 3 @@ -27,10 +29,10 @@ BUCKET_STALENESS = 60 # one hour ### KEY EXPIRER # time before expirer starts running -KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours +KEINITIAL_DELAY = 15 # 15 seconds - to clean out old stuff in persistent db # time between expirer runs KE_DELAY = 60 * 60 # 1 hour # expire entries older than this -KE_AGE = KEINITIAL_DELAY +KE_AGE = 60 * 60 * 24 # 24 hours diff --git a/hash.py b/hash.py index 4210385..9ab4e0b 100644 --- a/hash.py +++ b/hash.py @@ -1,7 +1,7 @@ ## Copyright 2002 Andrew Loewenstern, All Rights Reserved from sha import sha -from whrandom import randrange +import whrandom ## takes a 20 bit hash, big-endian, and returns it expressed a long python integer def intify(hstr): @@ -27,7 +27,7 @@ def distance(a, b): def newID(): h = sha() for i in range(20): - h.update(chr(randrange(0,256))) + h.update(chr(whrandom.randrange(0,256))) return h.digest() def newIDInRange(min, max): diff --git a/khashmir.py b/khashmir.py index b2a6246..19bd3d8 100644 --- a/khashmir.py +++ b/khashmir.py @@ -22,39 +22,51 @@ from twisted.internet.app import Application from twisted.web import server threadable.init() -from bsddb3 import db ## find this at http://pybsddb.sf.net/ -from bsddb3._db import DBNotFoundError - -from xmlrpclib import Binary +import sqlite ## find this at http://pysqlite.sourceforge.net/ +KhashmirDBExcept = "KhashmirDBExcept" # this is the main class! class Khashmir(xmlrpc.XMLRPC): - __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app'] - def __init__(self, host, port): + __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last') + def __init__(self, host, port, db='khashmir.db'): self.node = Node().init(newID(), host, port) self.table = KTable(self.node) self.app = Application("xmlrpc") self.app.listenTCP(port, server.Site(self)) - - ## these databases may be more suited to on-disk rather than in-memory - # h((key, value)) -> (key, value, time) mappings - self.store = db.DB() - self.store.open(None, None, db.DB_BTREE) - - # -> h((key, value)) - self.itime = db.DB() - self.itime.set_flags(db.DB_DUP) - self.itime.open(None, None, db.DB_BTREE) - - # key -> h((key, value)) - self.kw = db.DB() - self.kw.set_flags(db.DB_DUP) - self.kw.open(None, None, db.DB_BTREE) - - KeyExpirer(store=self.store, itime=self.itime, kw=self.kw) - + self.findDB(db) + self.last = time.time() + KeyExpirer(store=self.store) + + def findDB(self, db): + import os + try: + os.stat(db) + except OSError: + self.createNewDB(db) + else: + self.loadDB(db) + + def loadDB(self, db): + try: + self.store = sqlite.connect(db=db) + except: + import traceback + raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback + + def createNewDB(self, db): + self.store = sqlite.connect(db=db) + s = """ + create table kv (hkv text primary key, key text, value text, time timestamp); + create index kv_key on kv(key); + + create table nodes (id text primary key, host text, port number); + """ + c = self.store.cursor() + c.execute(s) + self.store.commit() + def render(self, request): """ Override the built in render so we can have access to the request object! @@ -94,13 +106,16 @@ class Khashmir(xmlrpc.XMLRPC): ## also async def valueForKey(self, key, callback): - """ returns the values found for key in global table """ + """ returns the values found for key in global table + callback will be called with a list of values for each peer that returns unique values + final callback will be an empty list - probably should change to 'more coming' arg + """ nodes = self.table.findNodes(key) # get locals l = self.retrieveValues(key) if len(l) > 0: - reactor.callFromThread(callback, l) + reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l)) # create our search state state = GetValue(self, key, callback) @@ -108,16 +123,15 @@ class Khashmir(xmlrpc.XMLRPC): - ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now + ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) def storeValueForKey(self, key, value, callback=None): """ stores the value for key in the global table, returns immediately, no status in this implementation, peers respond but don't indicate status to storing values - values are stored in peers on a first-come first-served basis - this will probably change so more than one value can be stored under a key + a key can have many values """ def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): if not callback: - # default callback - this will get called for each successful store value + # default callback def _storedValueHandler(sender): pass response=_storedValueHandler @@ -171,7 +185,7 @@ class Khashmir(xmlrpc.XMLRPC): ## these are the callbacks we use when we issue a PING def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table): l, sender = args - if id != const.NULL_ID and id != sender['id'].data: + if id != const.NULL_ID and id != sender['id'].decode('base64'): # whoah, got response from different peer than we were expecting pass else: @@ -211,17 +225,15 @@ class Khashmir(xmlrpc.XMLRPC): def retrieveValues(self, key): - if self.kw.has_key(key): - c = self.kw.cursor() - tup = c.set(key) - l = [] - while(tup and tup[0] == key): - h1 = tup[1] - v = loads(self.store[h1])[1] - l.append(v) - tup = c.next() - return l - return [] + s = "select value from kv where key = '%s';" % key.encode('base64') + c = self.store.cursor() + c.execute(s) + t = c.fetchone() + l = [] + while t: + l.append(t['value']) + t = c.fetchone() + return l ##### ##### INCOMING MESSAGE HANDLERS @@ -238,29 +250,26 @@ class Khashmir(xmlrpc.XMLRPC): return (), self.node.senderDict() def xmlrpc_find_node(self, target, sender): - nodes = self.table.findNodes(target.data) + nodes = self.table.findNodes(target.decode('base64')) nodes = map(lambda node: node.senderDict(), nodes) ip = self.crequest.getClientIP() sender['host'] = ip n = Node().initWithDict(sender) self.insertNode(n, contacted=0) return nodes, self.node.senderDict() - + def xmlrpc_store_value(self, key, value, sender): - key = key.data - h1 = sha(key+value.data).digest() + h1 = sha(key+value).digest().encode('base64') t = `time.time()` - if not self.store.has_key(h1): - v = dumps((key, value.data, t)) - self.store.put(h1, v) - self.itime.put(t, h1) - self.kw.put(key, h1) - else: + s = "insert into kv values ('%s', '%s', '%s', '%s')" % (h1, key, value, t) + c = self.store.cursor() + try: + c.execute(s) + except: # update last insert time - tup = loads(self.store[h1]) - self.store[h1] = dumps((tup[0], tup[1], t)) - self.itime.put(t, h1) - + s = "update kv set time = '%s' where hkv = '%s'" % (t, h1) + c.execute(s) + self.store.commit() ip = self.crequest.getClientIP() sender['host'] = ip n = Node().initWithDict(sender) @@ -269,14 +278,13 @@ class Khashmir(xmlrpc.XMLRPC): def xmlrpc_find_value(self, key, sender): ip = self.crequest.getClientIP() - key = key.data + key = key.decode('base64') sender['host'] = ip n = Node().initWithDict(sender) self.insertNode(n, contacted=0) l = self.retrieveValues(key) if len(l) > 0: - l = map(lambda v: Binary(v), l) return {'values' : l}, self.node.senderDict() else: nodes = self.table.findNodes(key) @@ -299,7 +307,7 @@ def test_build_net(quiet=0, peers=24, host='localhost', pause=1): print "Building %s peer table." % peers for i in xrange(peers): - a = Khashmir(host, port + i) + a = Khashmir(host, port + i, db = '/tmp/test'+`i`) l.append(a) @@ -406,9 +414,9 @@ def test_find_value(l, quiet=0): d.valueForKey(key, cb(fc).callback) fc.wait() -def test_one(host, port): +def test_one(host, port, db='/tmp/test'): import thread - k = Khashmir(host, port) + k = Khashmir(host, port, db) thread.start_new_thread(k.app.run, ()) return k diff --git a/knode.py b/knode.py index 30d19fb..a15c568 100644 --- a/knode.py +++ b/knode.py @@ -2,8 +2,6 @@ from node import Node from twisted.internet.defer import Deferred from xmlrpcclient import XMLRPCClientFactory as factory from const import reactor, NULL_ID -from xmlrpclib import Binary - class KNode(Node): def makeResponse(self, df): @@ -13,7 +11,7 @@ class KNode(Node): except: d.callback(args) else: - if self.id != NULL_ID and sender['id'].data != self.id: + if self.id != NULL_ID and sender['id'] != self._senderDict['id']: d.errback() else: d.callback(args) @@ -25,16 +23,16 @@ class KNode(Node): return df def findNode(self, target, sender): df = Deferred() - f = factory('find_node', (Binary(target), sender), self.makeResponse(df), df.errback) + f = factory('find_node', (target.encode('base64'), sender), self.makeResponse(df), df.errback) reactor.connectTCP(self.host, self.port, f) return df def storeValue(self, key, value, sender): df = Deferred() - f = factory('store_value', (Binary(key), Binary(value), sender), self.makeResponse(df), df.errback) + f = factory('store_value', (key.encode('base64'), value.encode('base64'), sender), self.makeResponse(df), df.errback) reactor.connectTCP(self.host, self.port, f) return df def findValue(self, key, sender): df = Deferred() - f = factory('find_value', (Binary(key), sender), self.makeResponse(df), df.errback) + f = factory('find_value', (key.encode('base64'), sender), self.makeResponse(df), df.errback) reactor.connectTCP(self.host, self.port, f) return df diff --git a/node.py b/node.py index 1e1c0bd..078ef3c 100644 --- a/node.py +++ b/node.py @@ -16,12 +16,12 @@ class Node: self.int = hash.intify(id) self.host = host self.port = port - self._senderDict = {'id': Binary(self.id), 'port' : self.port, 'host' : self.host} + self._senderDict = {'id': self.id.encode('base64'), 'port' : self.port, 'host' : self.host} return self def initWithDict(self, dict): self._senderDict = dict - self.id = dict['id'].data + self.id = dict['id'].decode('base64') self.int = hash.intify(self.id) self.port = dict['port'] self.host = dict['host'] -- 2.30.2