From 4bf5774eb039f6064b830c0a65a9d62a1e4bc62f Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Tue, 8 Jan 2008 12:33:18 -0800 Subject: [PATCH] Move all the khashmir database operations to a separate module. --- apt_dht_Khashmir/DHT.py | 8 +-- apt_dht_Khashmir/actions.py | 8 +-- apt_dht_Khashmir/db.py | 107 +++++++++++++++++++++++++++++++++++ apt_dht_Khashmir/khashmir.py | 101 +++++---------------------------- 4 files changed, 127 insertions(+), 97 deletions(-) create mode 100644 apt_dht_Khashmir/db.py diff --git a/apt_dht_Khashmir/DHT.py b/apt_dht_Khashmir/DHT.py index 42e2e9a..3d6c9f9 100644 --- a/apt_dht_Khashmir/DHT.py +++ b/apt_dht_Khashmir/DHT.py @@ -253,12 +253,12 @@ class TestSimpleDHT(unittest.TestCase): def tearDown(self): self.a.leave() try: - os.unlink(self.a.khashmir.db) + os.unlink(self.a.khashmir.store.db) except: pass self.b.leave() try: - os.unlink(self.b.khashmir.db) + os.unlink(self.b.khashmir.store.db) except: pass @@ -322,7 +322,7 @@ class TestMultiDHT(unittest.TestCase): def get_values(self): self.checked = 0 for i in range(len(self.l)): - for j in random.sample(xrange(len(self.l)), 4): + for j in random.sample(xrange(len(self.l)), max(len(self.l), 4)): self.checked += 1 d = self.l[i].getValue(sha.new(str(self.startport+j)).digest()) check = [] @@ -349,6 +349,6 @@ class TestMultiDHT(unittest.TestCase): for i in self.l: try: i.leave() - os.unlink(i.khashmir.db) + os.unlink(i.khashmir.store.db) except: pass diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index 7f8f911..4214e4e 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -263,13 +263,7 @@ class KeyExpirer: self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire) def doExpire(self): - self.cut = "%0.6f" % (time() - self.config['KE_AGE']) - self._expire() - - def _expire(self): - c = self.store.cursor() - s = "delete from kv where time < '%s';" % self.cut - c.execute(s) + self.store.expireValues(time() - self.config['KE_AGE']) self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire) def shutdown(self): diff --git a/apt_dht_Khashmir/db.py b/apt_dht_Khashmir/db.py new file mode 100644 index 0000000..c29abd8 --- /dev/null +++ b/apt_dht_Khashmir/db.py @@ -0,0 +1,107 @@ + +from time import time +import sqlite ## find this at http://pysqlite.sourceforge.net/ +import os + +class DBExcept(Exception): + pass + +class DB: + """Database access for storing persistent data.""" + + def __init__(self, db): + self.db = db + try: + os.stat(db) + except OSError: + self._createNewDB(db) + else: + self._loadDB(db) + + def _loadDB(self, db): + try: + self.store = sqlite.connect(db=db) + #self.store.autocommit = 0 + except: + import traceback + raise DBExcept, "Couldn't open DB", traceback.format_exc() + + def _createNewDB(self, db): + self.store = sqlite.connect(db=db) + s = """ + create table kv (key binary, value binary, time timestamp, primary key (key, value)); + create index kv_key on kv(key); + create index kv_timestamp on kv(time); + + create table nodes (id binary primary key, host text, port number); + + create table self (num number primary key, id binary); + """ + c = self.store.cursor() + c.execute(s) + self.store.commit() + + def getSelfNode(self): + c = self.store.cursor() + c.execute('select id from self where num = 0;') + if c.rowcount > 0: + return c.fetchone()[0] + else: + return None + + def saveSelfNode(self, id): + c = self.store.cursor() + c.execute('delete from self where num = 0;') + c.execute("insert into self values (0, %s);", sqlite.encode(id)) + self.store.commit() + + def dumpRoutingTable(self, buckets): + """ + save routing table nodes to the database + """ + c = self.store.cursor() + c.execute("delete from nodes where id not NULL;") + for bucket in buckets: + for node in bucket.l: + c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port)) + self.store.commit() + + def getRoutingTable(self): + """ + load routing table nodes from database + it's usually a good idea to call refreshTable(force=1) after loading the table + """ + c = self.store.cursor() + c.execute("select * from nodes;") + return c.fetchall() + + def retrieveValues(self, key): + c = self.store.cursor() + c.execute("select value from kv where key = %s;", sqlite.encode(key)) + t = c.fetchone() + l = [] + while t: + l.append(t['value']) + t = c.fetchone() + return l + + def storeValue(self, key, value): + """Store or update a key and value.""" + t = "%0.6f" % time() + c = self.store.cursor() + try: + c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t)) + except sqlite.IntegrityError, reason: + # update last insert time + c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value))) + self.store.commit() + + def expireValues(self, expireTime): + """Expire older values than expireTime.""" + t = "%0.6f" % expireTime + c = self.store.cursor() + s = "delete from kv where time < '%s';" % t + c.execute(s) + + def close(self): + self.store.close() diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index d184329..be60243 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -8,21 +8,18 @@ from time import time from random import randrange from sha import sha import os -import sqlite ## find this at http://pysqlite.sourceforge.net/ from twisted.internet.defer import Deferred from twisted.internet import protocol, reactor from twisted.trial import unittest +from db import DB from ktable import KTable from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID from khash import newID, newIDInRange from actions import FindNode, GetValue, KeyExpirer, StoreValue import krpc -class KhashmirDBExcept(Exception): - pass - # this is the base class, has base functionality and find node, no key-value mappings class KhashmirBase(protocol.Factory): _Node = KNodeBase @@ -33,7 +30,7 @@ class KhashmirBase(protocol.Factory): def setup(self, config, cache_dir): self.config = config self.port = config['PORT'] - self._findDB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db')) + self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db')) self.node = self._loadSelfNode('', self.port) self.table = KTable(self.node, config) #self.app = service.Application("krpc") @@ -55,80 +52,27 @@ class KhashmirBase(protocol.Factory): self.listenport.stopListening() def _loadSelfNode(self, host, port): - c = self.store.cursor() - c.execute('select id from self where num = 0;') - if c.rowcount > 0: - id = c.fetchone()[0] - else: + id = self.store.getSelfNode() + if not id: id = newID() return self._Node().init(id, host, port) - def _saveSelfNode(self): - c = self.store.cursor() - c.execute('delete from self where num = 0;') - c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id)) - self.store.commit() - def checkpoint(self, auto=0): - self._saveSelfNode() - self._dumpRoutingTable() + self.store.saveSelfNode(self.node.id) + self.store.dumpRoutingTable(self.table.buckets) self.refreshTable() if auto: self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), self.checkpoint, (1,)) - def _findDB(self, db): - self.db = db - try: - os.stat(db) - except OSError: - self._createNewDB(db) - else: - self._loadDB(db) - - def _loadDB(self, db): - try: - self.store = sqlite.connect(db=db) - #self.store.autocommit = 0 - except: - import traceback - raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc() - - def _createNewDB(self, db): - self.store = sqlite.connect(db=db) - s = """ - create table kv (key binary, value binary, time timestamp, primary key (key, value)); - create index kv_key on kv(key); - create index kv_timestamp on kv(time); - - create table nodes (id binary primary key, host text, port number); - - create table self (num number primary key, id binary); - """ - c = self.store.cursor() - c.execute(s) - self.store.commit() - - def _dumpRoutingTable(self): - """ - save routing table nodes to the database - """ - c = self.store.cursor() - c.execute("delete from nodes where id not NULL;") - for bucket in self.table.buckets: - for node in bucket.l: - c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port)) - self.store.commit() - def _loadRoutingTable(self): """ load routing table nodes from database it's usually a good idea to call refreshTable(force=1) after loading the table """ - c = self.store.cursor() - c.execute("select * from nodes;") - for rec in c.fetchall(): + nodes = self.store.getRoutingTable() + for rec in nodes: n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.table.insertNode(n, contacted=0) @@ -278,15 +222,7 @@ class KhashmirBase(protocol.Factory): ## you probably want to use this mixin and provide your own write methods class KhashmirRead(KhashmirBase): _Node = KNodeRead - def retrieveValues(self, key): - c = self.store.cursor() - c.execute("select value from kv where key = %s;", sqlite.encode(key)) - t = c.fetchone() - l = [] - while t: - l.append(t['value']) - t = c.fetchone() - return l + ## also async def valueForKey(self, key, callback, searchlocal = 1): """ returns the values found for key in global table @@ -297,7 +233,7 @@ class KhashmirRead(KhashmirBase): # get locals if searchlocal: - l = self.retrieveValues(key) + l = self.store.retrieveValues(key) if len(l) > 0: reactor.callLater(0, callback, key, l) else: @@ -315,7 +251,7 @@ class KhashmirRead(KhashmirBase): n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) - l = self.retrieveValues(key) + l = self.store.retrieveValues(key) if len(l) > 0: return {'values' : l, "id": self.node.id} else: @@ -346,14 +282,7 @@ class KhashmirWrite(KhashmirRead): self.findNode(key, _storeValueForKey) def krpc_store_value(self, key, value, id, _krpc_sender): - t = "%0.6f" % time() - c = self.store.cursor() - try: - c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t)) - except sqlite.IntegrityError, reason: - # update last insert time - c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value))) - self.store.commit() + self.store.storeValue(key, value) sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] @@ -388,8 +317,8 @@ class SimpleTests(unittest.TestCase): def tearDown(self): self.a.shutdown() self.b.shutdown() - os.unlink(self.a.db) - os.unlink(self.b.db) + os.unlink(self.a.store.db) + os.unlink(self.b.store.db) def testAddContact(self): self.assertEqual(len(self.a.table.buckets), 1) @@ -485,7 +414,7 @@ class MultiTest(unittest.TestCase): def tearDown(self): for i in self.l: i.shutdown() - os.unlink(i.db) + os.unlink(i.store.db) reactor.iterate() -- 2.39.2