changed from bsddb3 to pysqlite
authorburris <burris>
Mon, 14 Oct 2002 03:53:01 +0000 (03:53 +0000)
committerburris <burris>
Mon, 14 Oct 2002 03:53:01 +0000 (03:53 +0000)
actions.py
const.py
hash.py
khashmir.py
knode.py
node.py

index d5b2b68..42bec99 100644 (file)
@@ -127,7 +127,7 @@ class GetValue(FindNode):
                    self.found[n.id] = n
        elif l.has_key('values'):
            def x(y, z=self.results):
-               y = y.data
+               y = y.decode('base64')
                if not z.has_key(y):
                    z[y] = 1
                    return y
@@ -178,10 +178,8 @@ class GetValue(FindNode):
 
 
 class KeyExpirer:
-    def __init__(self, store, itime, kw):
+    def __init__(self, store):
        self.store = store
-       self.itime = itime
-       self.kw = kw
        reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
        
     def doExpire(self):
@@ -189,41 +187,9 @@ class KeyExpirer:
        self._expire()
        
     def _expire(self):
-       ic = self.itime.cursor()
-       sc = self.store.cursor()
-       kc = self.kw.cursor()
-       irec = None
-       try:
-           irec = ic.set_range(self.cut)
-       except db.DBNotFoundError:
-           # everything is expired
-           f = ic.prev
-           irec = f()
-       else:
-           f = ic.next
-       i = 0
-       while irec:
-           it, h = irec
-           try:
-               k, v, lt = loads(self.store[h])
-           except KeyError:
-               ic.delete()
-           else:
-               if lt < self.cut:
-                   try:
-                       kc.set_both(k, h)
-                   except db.DBNotFoundError:
-                       print "Database inconsistency!  No key->value entry when a store entry was found!"
-                   else:
-                       kc.delete()
-                   self.store.delete(h)
-                   ic.delete()
-                   i = i + 1
-               else:
-                   break
-           irec = f()
-           
+       c = self.store.cursor()
+       s = "delete from kv where time < '%s';" % self.cut
+       c.execute(s)
+       self.store.commit()
        reactor.callLater(const.KE_DELAY, self.doExpire)
-       if(i > 0):
-           print ">>>KE: done expiring %d" % i
        
\ No newline at end of file
index c1a7360..f5feed4 100644 (file)
--- a/const.py
+++ b/const.py
@@ -7,6 +7,7 @@ main.installReactor(reactor)
 # magic id to use before we know a peer's id
 NULL_ID =  20 * '\0'
 
+
 ### SEARCHING/STORING
 # concurrent xmlrpc calls per find node/value request!
 CONCURRENT_REQS = 4
@@ -14,6 +15,7 @@ CONCURRENT_REQS = 4
 # how many hosts to post to
 STORE_REDUNDANCY = 3
 
+
 ###  ROUTING TABLE STUFF
 # how many times in a row a node can fail to respond before it's booted from the routing table
 MAX_FAILURES = 3
@@ -27,10 +29,10 @@ BUCKET_STALENESS = 60 # one hour
 
 ###  KEY EXPIRER
 # time before expirer starts running
-KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours
+KEINITIAL_DELAY = 15 # 15 seconds - to clean out old stuff in persistent db
 
 # time between expirer runs
 KE_DELAY = 60 * 60 # 1 hour
 
 # expire entries older than this
-KE_AGE = KEINITIAL_DELAY
+KE_AGE = 60 * 60 * 24 # 24 hours
diff --git a/hash.py b/hash.py
index 4210385..9ab4e0b 100644 (file)
--- a/hash.py
+++ b/hash.py
@@ -1,7 +1,7 @@
 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
 
 from sha import sha
-from whrandom import randrange
+import whrandom
 
 ## takes a 20 bit hash, big-endian, and returns it expressed a long python integer
 def intify(hstr):
@@ -27,7 +27,7 @@ def distance(a, b):
 def newID():
     h = sha()
     for i in range(20):
-       h.update(chr(randrange(0,256)))
+       h.update(chr(whrandom.randrange(0,256)))
     return h.digest()
 
 def newIDInRange(min, max):
index b2a6246..19bd3d8 100644 (file)
@@ -22,39 +22,51 @@ from twisted.internet.app import Application
 from twisted.web import server
 threadable.init()
 
-from bsddb3 import db ## find this at http://pybsddb.sf.net/
-from bsddb3._db import DBNotFoundError
-
-from xmlrpclib import Binary
+import sqlite  ## find this at http://pysqlite.sourceforge.net/
 
 
+KhashmirDBExcept = "KhashmirDBExcept"
 
 # this is the main class!
 class Khashmir(xmlrpc.XMLRPC):
-    __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
-    def __init__(self, host, port):
+    __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
+    def __init__(self, host, port, db='khashmir.db'):
        self.node = Node().init(newID(), host, port)
        self.table = KTable(self.node)
        self.app = Application("xmlrpc")
        self.app.listenTCP(port, server.Site(self))
-       
-       ## these databases may be more suited to on-disk rather than in-memory
-       # h((key, value)) -> (key, value, time) mappings
-       self.store = db.DB()
-       self.store.open(None, None, db.DB_BTREE)
-       
-       # <insert time> -> h((key, value))
-       self.itime = db.DB()
-       self.itime.set_flags(db.DB_DUP)
-       self.itime.open(None, None, db.DB_BTREE)
-
-       # key -> h((key, value))
-       self.kw = db.DB()
-       self.kw.set_flags(db.DB_DUP)
-       self.kw.open(None, None, db.DB_BTREE)
-
-       KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
-       
+       self.findDB(db)
+       self.last = time.time()
+       KeyExpirer(store=self.store)
+
+    def findDB(self, db):
+       import os
+       try:
+           os.stat(db)
+       except OSError:
+           self.createNewDB(db)
+       else:
+           self.loadDB(db)
+           
+    def loadDB(self, db):
+       try:
+           self.store = sqlite.connect(db=db)
+       except:
+           import traceback
+           raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+           
+    def createNewDB(self, db):
+       self.store = sqlite.connect(db=db)
+       s = """
+           create table kv (hkv text primary key, key text, value text, time timestamp);
+           create index kv_key on kv(key);
+           
+           create table nodes (id text primary key, host text, port number);
+           """
+       c = self.store.cursor()
+       c.execute(s)
+       self.store.commit()
+               
     def render(self, request):
        """
            Override the built in render so we can have access to the request object!
@@ -94,13 +106,16 @@ class Khashmir(xmlrpc.XMLRPC):
     
     ## also async
     def valueForKey(self, key, callback):
-       """ returns the values found for key in global table """
+       """ returns the values found for key in global table
+           callback will be called with a list of values for each peer that returns unique values
+           final callback will be an empty list - probably should change to 'more coming' arg
+       """
        nodes = self.table.findNodes(key)
 
        # get locals
        l = self.retrieveValues(key)
        if len(l) > 0:
-           reactor.callFromThread(callback, l)
+           reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
 
        # create our search state
        state = GetValue(self, key, callback)
@@ -108,16 +123,15 @@ class Khashmir(xmlrpc.XMLRPC):
        
 
 
-    ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
+    ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
     def storeValueForKey(self, key, value, callback=None):
        """ stores the value for key in the global table, returns immediately, no status 
            in this implementation, peers respond but don't indicate status to storing values
-           values are stored in peers on a first-come first-served basis
-           this will probably change so more than one value can be stored under a key
+           a key can have many values
        """
        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
            if not callback:
-               # default callback - this will get called for each successful store value
+               # default callback
                def _storedValueHandler(sender):
                    pass
                response=_storedValueHandler
@@ -171,7 +185,7 @@ class Khashmir(xmlrpc.XMLRPC):
        ## these are the callbacks we use when we issue a PING
        def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
            l, sender = args
-           if id != const.NULL_ID and id != sender['id'].data:
+           if id != const.NULL_ID and id != sender['id'].decode('base64'):
                # whoah, got response from different peer than we were expecting
                pass
            else:
@@ -211,17 +225,15 @@ class Khashmir(xmlrpc.XMLRPC):
        
  
     def retrieveValues(self, key):
-       if self.kw.has_key(key):
-           c = self.kw.cursor()
-           tup = c.set(key)
-           l = []
-           while(tup and tup[0] == key):
-               h1 = tup[1]
-               v = loads(self.store[h1])[1]
-               l.append(v)
-               tup = c.next()
-           return l
-       return []
+       s = "select value from kv where key = '%s';" % key.encode('base64')
+       c = self.store.cursor()
+       c.execute(s)
+       t = c.fetchone()
+       l = []
+       while t:
+           l.append(t['value'])
+           t = c.fetchone()
+       return l
        
     #####
     ##### INCOMING MESSAGE HANDLERS
@@ -238,29 +250,26 @@ class Khashmir(xmlrpc.XMLRPC):
        return (), self.node.senderDict()
                
     def xmlrpc_find_node(self, target, sender):
-       nodes = self.table.findNodes(target.data)
+       nodes = self.table.findNodes(target.decode('base64'))
        nodes = map(lambda node: node.senderDict(), nodes)
        ip = self.crequest.getClientIP()
        sender['host'] = ip
        n = Node().initWithDict(sender)
        self.insertNode(n, contacted=0)
        return nodes, self.node.senderDict()
-    
+           
     def xmlrpc_store_value(self, key, value, sender):
-       key = key.data
-       h1 = sha(key+value.data).digest()
+       h1 = sha(key+value).digest().encode('base64')
        t = `time.time()`
-       if not self.store.has_key(h1):
-           v = dumps((key, value.data, t))
-           self.store.put(h1, v)
-           self.itime.put(t, h1)
-           self.kw.put(key, h1)
-       else:
+       s = "insert into kv values ('%s', '%s', '%s', '%s')" % (h1, key, value, t)
+       c = self.store.cursor()
+       try:
+           c.execute(s)
+       except:
            # update last insert time
-           tup = loads(self.store[h1])
-           self.store[h1] = dumps((tup[0], tup[1], t))
-           self.itime.put(t, h1)
-
+           s = "update kv set time = '%s' where hkv = '%s'" % (t, h1)
+           c.execute(s)
+       self.store.commit()
        ip = self.crequest.getClientIP()
        sender['host'] = ip
        n = Node().initWithDict(sender)
@@ -269,14 +278,13 @@ class Khashmir(xmlrpc.XMLRPC):
        
     def xmlrpc_find_value(self, key, sender):
        ip = self.crequest.getClientIP()
-       key = key.data
+       key = key.decode('base64')
        sender['host'] = ip
        n = Node().initWithDict(sender)
        self.insertNode(n, contacted=0)
 
        l = self.retrieveValues(key)
        if len(l) > 0:
-           l = map(lambda v: Binary(v), l)
            return {'values' : l}, self.node.senderDict()
        else:
            nodes = self.table.findNodes(key)
@@ -299,7 +307,7 @@ def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
        print "Building %s peer table." % peers
        
     for i in xrange(peers):
-       a = Khashmir(host, port + i)
+       a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
        l.append(a)
     
 
@@ -406,9 +414,9 @@ def test_find_value(l, quiet=0):
     d.valueForKey(key, cb(fc).callback)    
     fc.wait()
     
-def test_one(host, port):
+def test_one(host, port, db='/tmp/test'):
     import thread
-    k = Khashmir(host, port)
+    k = Khashmir(host, port, db)
     thread.start_new_thread(k.app.run, ())
     return k
     
index 30d19fb..a15c568 100644 (file)
--- a/knode.py
+++ b/knode.py
@@ -2,8 +2,6 @@ from node import Node
 from twisted.internet.defer import Deferred
 from xmlrpcclient import XMLRPCClientFactory as factory
 from const import reactor, NULL_ID
-from xmlrpclib import Binary
-
 
 class KNode(Node):
     def makeResponse(self, df):
@@ -13,7 +11,7 @@ class KNode(Node):
            except:
                d.callback(args)
            else:
-               if self.id != NULL_ID and sender['id'].data != self.id:
+               if self.id != NULL_ID and sender['id'] != self._senderDict['id']:
                    d.errback()
                else:
                    d.callback(args)
@@ -25,16 +23,16 @@ class KNode(Node):
        return df
     def findNode(self, target, sender):
        df = Deferred()
-       f = factory('find_node', (Binary(target), sender), self.makeResponse(df), df.errback)
+       f = factory('find_node', (target.encode('base64'), sender), self.makeResponse(df), df.errback)
        reactor.connectTCP(self.host, self.port, f)
        return df
     def storeValue(self, key, value, sender):
        df = Deferred()
-       f = factory('store_value', (Binary(key), Binary(value), sender), self.makeResponse(df), df.errback)
+       f = factory('store_value', (key.encode('base64'), value.encode('base64'), sender), self.makeResponse(df), df.errback)
        reactor.connectTCP(self.host, self.port, f)
        return df
     def findValue(self, key, sender):
        df = Deferred()
-       f = factory('find_value', (Binary(key), sender), self.makeResponse(df), df.errback)
+       f = factory('find_value', (key.encode('base64'), sender), self.makeResponse(df), df.errback)
        reactor.connectTCP(self.host, self.port, f)
        return df
diff --git a/node.py b/node.py
index 1e1c0bd..078ef3c 100644 (file)
--- a/node.py
+++ b/node.py
@@ -16,12 +16,12 @@ class Node:
        self.int = hash.intify(id)
        self.host = host
        self.port = port
-       self._senderDict = {'id': Binary(self.id), 'port' : self.port, 'host' : self.host}
+       self._senderDict = {'id': self.id.encode('base64'), 'port' : self.port, 'host' : self.host}
        return self
        
     def initWithDict(self, dict):
        self._senderDict = dict
-       self.id = dict['id'].data
+       self.id = dict['id'].decode('base64')
        self.int = hash.intify(self.id)
        self.port = dict['port']
        self.host = dict['host']