2 from datetime import datetime, timedelta
3 from pysqlite2 import dbapi2 as sqlite
4 from binascii import a2b_base64, b2a_base64
8 from twisted.trial import unittest
10 class DBExcept(Exception):
14 """Dummy class to convert all hashes to base64 for storing in the DB."""
16 sqlite.register_adapter(khash, b2a_base64)
17 sqlite.register_converter("KHASH", a2b_base64)
18 sqlite.register_converter("khash", a2b_base64)
21 """Database access for storing persistent data."""
23 def __init__(self, db):
31 self.conn.text_factory = str
33 def _loadDB(self, db):
35 self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES)
38 raise DBExcept, "Couldn't open DB", traceback.format_exc()
40 def _createNewDB(self, db):
41 self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES)
42 c = self.conn.cursor()
43 c.execute("CREATE TABLE kv (key KHASH, value TEXT, time TIMESTAMP, PRIMARY KEY (key, value))")
44 c.execute("CREATE INDEX kv_key ON kv(key)")
45 c.execute("CREATE INDEX kv_timestamp ON kv(time)")
46 c.execute("CREATE TABLE nodes (id KHASH PRIMARY KEY, host TEXT, port NUMBER)")
47 c.execute("CREATE TABLE self (num NUMBER PRIMARY KEY, id KHASH)")
50 def getSelfNode(self):
51 c = self.conn.cursor()
52 c.execute('SELECT id FROM self WHERE num = 0')
59 def saveSelfNode(self, id):
60 c = self.conn.cursor()
61 c.execute("INSERT OR REPLACE INTO self VALUES (0, ?)", (khash(id),))
64 def dumpRoutingTable(self, buckets):
66 save routing table nodes to the database
68 c = self.conn.cursor()
69 c.execute("DELETE FROM nodes WHERE id NOT NULL")
70 for bucket in buckets:
72 c.execute("INSERT INTO nodes VALUES (?, ?, ?)", (khash(node.id), node.host, node.port))
75 def getRoutingTable(self):
77 load routing table nodes from database
78 it's usually a good idea to call refreshTable(force=1) after loading the table
80 c = self.conn.cursor()
81 c.execute("SELECT * FROM nodes")
84 def retrieveValues(self, key):
85 c = self.conn.cursor()
86 c.execute("SELECT value FROM kv WHERE key = ?", (khash(key),))
94 def storeValue(self, key, value):
95 """Store or update a key and value."""
96 c = self.conn.cursor()
97 c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?)", (khash(key), value, datetime.now()))
100 def expireValues(self, expireAfter):
101 """Expire older values after expireAfter seconds."""
102 t = datetime.now() - timedelta(seconds=expireAfter)
103 c = self.conn.cursor()
104 c.execute("DELETE FROM kv WHERE time < ?", (t, ))
110 class TestDB(unittest.TestCase):
111 """Tests for the khashmir database."""
114 db = '/tmp/khashmir.db'
115 key = '\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'
118 self.store = DB(self.db)
120 def test_selfNode(self):
121 self.store.saveSelfNode(self.key)
122 self.failUnlessEqual(self.store.getSelfNode(), self.key)
124 def test_Value(self):
125 self.store.storeValue(self.key, 'foobar')
126 val = self.store.retrieveValues(self.key)
127 self.failUnlessEqual(len(val), 1)
128 self.failUnlessEqual(val[0], 'foobar')
130 def test_expireValues(self):
131 self.store.storeValue(self.key, 'foobar')
133 self.store.storeValue(self.key, 'barfoo')
134 self.store.expireValues(1)
135 val = self.store.retrieveValues(self.key)
136 self.failUnlessEqual(len(val), 1)
137 self.failUnlessEqual(val[0], 'barfoo')
139 def test_RoutingTable(self):
145 return (self.id, self.host, self.port)
147 dummy2.id = '\xaa\xbb\xcc\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'
148 dummy2.host = '205.23.67.124'
154 bl1.l.append(dummy())
158 self.store.dumpRoutingTable(buckets)
159 rt = self.store.getRoutingTable()
160 self.failUnlessIn(dummy().contents(), rt)
161 self.failUnlessIn(dummy2.contents(), rt)