--- /dev/null
+
+from time import time
+import sqlite ## find this at http://pysqlite.sourceforge.net/
+import os
+
+class DBExcept(Exception):
+ pass
+
+class DB:
+ """Database access for storing persistent data."""
+
+ def __init__(self, db):
+ self.db = db
+ try:
+ os.stat(db)
+ except OSError:
+ self._createNewDB(db)
+ else:
+ self._loadDB(db)
+
+ def _loadDB(self, db):
+ try:
+ self.store = sqlite.connect(db=db)
+ #self.store.autocommit = 0
+ except:
+ import traceback
+ raise DBExcept, "Couldn't open DB", traceback.format_exc()
+
+ def _createNewDB(self, db):
+ self.store = sqlite.connect(db=db)
+ s = """
+ create table kv (key binary, value binary, time timestamp, primary key (key, value));
+ create index kv_key on kv(key);
+ create index kv_timestamp on kv(time);
+
+ create table nodes (id binary primary key, host text, port number);
+
+ create table self (num number primary key, id binary);
+ """
+ c = self.store.cursor()
+ c.execute(s)
+ self.store.commit()
+
+ def getSelfNode(self):
+ c = self.store.cursor()
+ c.execute('select id from self where num = 0;')
+ if c.rowcount > 0:
+ return c.fetchone()[0]
+ else:
+ return None
+
+ def saveSelfNode(self, id):
+ c = self.store.cursor()
+ c.execute('delete from self where num = 0;')
+ c.execute("insert into self values (0, %s);", sqlite.encode(id))
+ self.store.commit()
+
+ def dumpRoutingTable(self, buckets):
+ """
+ save routing table nodes to the database
+ """
+ c = self.store.cursor()
+ c.execute("delete from nodes where id not NULL;")
+ for bucket in buckets:
+ for node in bucket.l:
+ c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
+ self.store.commit()
+
+ def getRoutingTable(self):
+ """
+ load routing table nodes from database
+ it's usually a good idea to call refreshTable(force=1) after loading the table
+ """
+ c = self.store.cursor()
+ c.execute("select * from nodes;")
+ return c.fetchall()
+
+ def retrieveValues(self, key):
+ c = self.store.cursor()
+ c.execute("select value from kv where key = %s;", sqlite.encode(key))
+ t = c.fetchone()
+ l = []
+ while t:
+ l.append(t['value'])
+ t = c.fetchone()
+ return l
+
+ def storeValue(self, key, value):
+ """Store or update a key and value."""
+ t = "%0.6f" % time()
+ c = self.store.cursor()
+ try:
+ c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
+ except sqlite.IntegrityError, reason:
+ # update last insert time
+ c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
+ self.store.commit()
+
+ def expireValues(self, expireTime):
+ """Expire older values than expireTime."""
+ t = "%0.6f" % expireTime
+ c = self.store.cursor()
+ s = "delete from kv where time < '%s';" % t
+ c.execute(s)
+
+ def close(self):
+ self.store.close()
from random import randrange
from sha import sha
import os
-import sqlite ## find this at http://pysqlite.sourceforge.net/
from twisted.internet.defer import Deferred
from twisted.internet import protocol, reactor
from twisted.trial import unittest
+from db import DB
from ktable import KTable
from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
from khash import newID, newIDInRange
from actions import FindNode, GetValue, KeyExpirer, StoreValue
import krpc
-class KhashmirDBExcept(Exception):
- pass
-
# this is the base class, has base functionality and find node, no key-value mappings
class KhashmirBase(protocol.Factory):
_Node = KNodeBase
def setup(self, config, cache_dir):
self.config = config
self.port = config['PORT']
- self._findDB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
+ self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
self.node = self._loadSelfNode('', self.port)
self.table = KTable(self.node, config)
#self.app = service.Application("krpc")
self.listenport.stopListening()
def _loadSelfNode(self, host, port):
- c = self.store.cursor()
- c.execute('select id from self where num = 0;')
- if c.rowcount > 0:
- id = c.fetchone()[0]
- else:
+ id = self.store.getSelfNode()
+ if not id:
id = newID()
return self._Node().init(id, host, port)
- def _saveSelfNode(self):
- c = self.store.cursor()
- c.execute('delete from self where num = 0;')
- c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
- self.store.commit()
-
def checkpoint(self, auto=0):
- self._saveSelfNode()
- self._dumpRoutingTable()
+ self.store.saveSelfNode(self.node.id)
+ self.store.dumpRoutingTable(self.table.buckets)
self.refreshTable()
if auto:
self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
self.checkpoint, (1,))
- def _findDB(self, db):
- self.db = db
- try:
- os.stat(db)
- except OSError:
- self._createNewDB(db)
- else:
- self._loadDB(db)
-
- def _loadDB(self, db):
- try:
- self.store = sqlite.connect(db=db)
- #self.store.autocommit = 0
- except:
- import traceback
- raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
-
- def _createNewDB(self, db):
- self.store = sqlite.connect(db=db)
- s = """
- create table kv (key binary, value binary, time timestamp, primary key (key, value));
- create index kv_key on kv(key);
- create index kv_timestamp on kv(time);
-
- create table nodes (id binary primary key, host text, port number);
-
- create table self (num number primary key, id binary);
- """
- c = self.store.cursor()
- c.execute(s)
- self.store.commit()
-
- def _dumpRoutingTable(self):
- """
- save routing table nodes to the database
- """
- c = self.store.cursor()
- c.execute("delete from nodes where id not NULL;")
- for bucket in self.table.buckets:
- for node in bucket.l:
- c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
- self.store.commit()
-
def _loadRoutingTable(self):
"""
load routing table nodes from database
it's usually a good idea to call refreshTable(force=1) after loading the table
"""
- c = self.store.cursor()
- c.execute("select * from nodes;")
- for rec in c.fetchall():
+ nodes = self.store.getRoutingTable()
+ for rec in nodes:
n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
n.conn = self.udp.connectionForAddr((n.host, n.port))
self.table.insertNode(n, contacted=0)
## you probably want to use this mixin and provide your own write methods
class KhashmirRead(KhashmirBase):
_Node = KNodeRead
- def retrieveValues(self, key):
- c = self.store.cursor()
- c.execute("select value from kv where key = %s;", sqlite.encode(key))
- t = c.fetchone()
- l = []
- while t:
- l.append(t['value'])
- t = c.fetchone()
- return l
+
## also async
def valueForKey(self, key, callback, searchlocal = 1):
""" returns the values found for key in global table
# get locals
if searchlocal:
- l = self.retrieveValues(key)
+ l = self.store.retrieveValues(key)
if len(l) > 0:
reactor.callLater(0, callback, key, l)
else:
n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
- l = self.retrieveValues(key)
+ l = self.store.retrieveValues(key)
if len(l) > 0:
return {'values' : l, "id": self.node.id}
else:
self.findNode(key, _storeValueForKey)
def krpc_store_value(self, key, value, id, _krpc_sender):
- t = "%0.6f" % time()
- c = self.store.cursor()
- try:
- c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
- except sqlite.IntegrityError, reason:
- # update last insert time
- c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
- self.store.commit()
+ self.store.storeValue(key, value)
sender = {'id' : id}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
def tearDown(self):
self.a.shutdown()
self.b.shutdown()
- os.unlink(self.a.db)
- os.unlink(self.b.db)
+ os.unlink(self.a.store.db)
+ os.unlink(self.b.store.db)
def testAddContact(self):
self.assertEqual(len(self.a.table.buckets), 1)
def tearDown(self):
for i in self.l:
i.shutdown()
- os.unlink(i.db)
+ os.unlink(i.store.db)
reactor.iterate()