self._createNewDB(db)
else:
self._loadDB(db)
- self.conn.text_factory = str
+ if sqlite.version_info < (2, 1):
+ sqlite.register_converter("TEXT", str)
+ sqlite.register_converter("text", str)
+ else:
+ self.conn.text_factory = str
def _loadDB(self, db):
try:
def _createNewDB(self, db):
self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES)
c = self.conn.cursor()
- c.execute("CREATE TABLE kv (key KHASH, value TEXT, time TIMESTAMP, PRIMARY KEY (key, value))")
+ c.execute("CREATE TABLE kv (key KHASH, value TEXT, originated TIMESTAMP, last_refresh TIMESTAMP, PRIMARY KEY (key, value))")
c.execute("CREATE INDEX kv_key ON kv(key)")
- c.execute("CREATE INDEX kv_timestamp ON kv(time)")
+ c.execute("CREATE INDEX kv_originated ON kv(originated)")
+ c.execute("CREATE INDEX kv_last_refresh ON kv(last_refresh)")
c.execute("CREATE TABLE nodes (id KHASH PRIMARY KEY, host TEXT, port NUMBER)")
c.execute("CREATE TABLE self (num NUMBER PRIMARY KEY, id KHASH)")
self.conn.commit()
return c.fetchall()
def retrieveValues(self, key):
+ """Retrieve values from the database."""
c = self.conn.cursor()
c.execute("SELECT value FROM kv WHERE key = ?", (khash(key),))
- t = c.fetchone()
l = []
- while t:
- l.append(t[0])
- t = c.fetchone()
+ rows = c.fetchall()
+ for row in rows:
+ l.append(row[0])
return l
- def storeValue(self, key, value):
+ def storeValue(self, key, value, originated):
"""Store or update a key and value."""
c = self.conn.cursor()
- c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?)", (khash(key), value, datetime.now()))
+ c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?, ?)",
+ (khash(key), value, originated, datetime.now()))
self.conn.commit()
def expireValues(self, expireAfter):
"""Expire older values after expireAfter seconds."""
t = datetime.now() - timedelta(seconds=expireAfter)
c = self.conn.cursor()
- c.execute("DELETE FROM kv WHERE time < ?", (t, ))
+ c.execute("DELETE FROM kv WHERE originated < ?", (t, ))
self.conn.commit()
+ def refreshValues(self, expireAfter):
+ """Find older values than expireAfter seconds to refresh.
+
+ @return: a list of the hash keys and a list of dictionaries with
+ key of the value, value is the origination time
+ """
+ t = datetime.now() - timedelta(seconds=expireAfter)
+ c = self.conn.cursor()
+ c.execute("SELECT key, value, originated FROM kv WHERE last_refresh < ?", (t,))
+ keys = []
+ vals = []
+ rows = c.fetchall()
+ for row in rows:
+ keys.append(row[0])
+ vals.append({row[1]: row[2]})
+ return keys, vals
+
def close(self):
self.conn.close()
self.failUnlessEqual(self.store.getSelfNode(), self.key)
def test_Value(self):
- self.store.storeValue(self.key, 'foobar')
+ self.store.storeValue(self.key, 'foobar', datetime.now())
val = self.store.retrieveValues(self.key)
self.failUnlessEqual(len(val), 1)
self.failUnlessEqual(val[0], 'foobar')
def test_expireValues(self):
- self.store.storeValue(self.key, 'foobar')
+ self.store.storeValue(self.key, 'foobar', datetime.now())
sleep(2)
- self.store.storeValue(self.key, 'barfoo')
+ self.store.storeValue(self.key, 'barfoo', datetime.now())
self.store.expireValues(1)
val = self.store.retrieveValues(self.key)
self.failUnlessEqual(len(val), 1)
self.failUnlessEqual(val[0], 'barfoo')
+ def test_refreshValues(self):
+ self.store.storeValue(self.key, 'foobar', datetime.now())
+ sleep(2)
+ self.store.storeValue(self.key, 'barfoo', datetime.now())
+ keys, vals = self.store.refreshValues(1)
+ self.failUnlessEqual(len(keys), 1)
+ self.failUnlessEqual(keys[0], self.key)
+ self.failUnlessEqual(len(vals), 1)
+ self.failUnlessEqual(len(vals[0].keys()), 1)
+ self.failUnlessEqual(vals[0].keys()[0], 'foobar')
+ val = self.store.retrieveValues(self.key)
+ self.failUnlessEqual(len(val), 2)
+
def test_RoutingTable(self):
class dummy:
id = self.key