X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_dht_Khashmir%2Fdb.py;h=4b7fcf38f56eda66a9ddf9461d3a86fd2d11a99d;hp=8bb499e14512e85b0218941b22485ba0dac63474;hb=b4ad0e73eced53fb46c41fe1511d03c4f2466dba;hpb=295c9e582441c0ca5b8459b8a13692afee8b2e3f diff --git a/apt_dht_Khashmir/db.py b/apt_dht_Khashmir/db.py index 8bb499e..4b7fcf3 100644 --- a/apt_dht_Khashmir/db.py +++ b/apt_dht_Khashmir/db.py @@ -44,9 +44,10 @@ class DB: def _createNewDB(self, db): self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES) c = self.conn.cursor() - c.execute("CREATE TABLE kv (key KHASH, value TEXT, time TIMESTAMP, PRIMARY KEY (key, value))") + c.execute("CREATE TABLE kv (key KHASH, value TEXT, originated TIMESTAMP, last_refresh TIMESTAMP, PRIMARY KEY (key, value))") c.execute("CREATE INDEX kv_key ON kv(key)") - c.execute("CREATE INDEX kv_timestamp ON kv(time)") + c.execute("CREATE INDEX kv_originated ON kv(originated)") + c.execute("CREATE INDEX kv_last_refresh ON kv(last_refresh)") c.execute("CREATE TABLE nodes (id KHASH PRIMARY KEY, host TEXT, port NUMBER)") c.execute("CREATE TABLE self (num NUMBER PRIMARY KEY, id KHASH)") self.conn.commit() @@ -86,28 +87,46 @@ class DB: return c.fetchall() def retrieveValues(self, key): + """Retrieve values from the database.""" c = self.conn.cursor() c.execute("SELECT value FROM kv WHERE key = ?", (khash(key),)) - t = c.fetchone() l = [] - while t: - l.append(t[0]) - t = c.fetchone() + rows = c.fetchall() + for row in rows: + l.append(row[0]) return l - def storeValue(self, key, value): + def storeValue(self, key, value, originated): """Store or update a key and value.""" c = self.conn.cursor() - c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?)", (khash(key), value, datetime.now())) + c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?, ?)", + (khash(key), value, originated, datetime.now())) self.conn.commit() def expireValues(self, expireAfter): """Expire older values after expireAfter seconds.""" t = datetime.now() - timedelta(seconds=expireAfter) c = self.conn.cursor() - c.execute("DELETE FROM kv WHERE time < ?", (t, )) + c.execute("DELETE FROM kv WHERE originated < ?", (t, )) self.conn.commit() + def refreshValues(self, expireAfter): + """Find older values than expireAfter seconds to refresh. + + @return: a list of the hash keys and a list of dictionaries with + key of the value, value is the origination time + """ + t = datetime.now() - timedelta(seconds=expireAfter) + c = self.conn.cursor() + c.execute("SELECT key, value, originated FROM kv WHERE last_refresh < ?", (t,)) + keys = [] + vals = [] + rows = c.fetchall() + for row in rows: + keys.append(row[0]) + vals.append({row[1]: row[2]}) + return keys, vals + def close(self): self.conn.close() @@ -126,20 +145,33 @@ class TestDB(unittest.TestCase): self.failUnlessEqual(self.store.getSelfNode(), self.key) def test_Value(self): - self.store.storeValue(self.key, 'foobar') + self.store.storeValue(self.key, 'foobar', datetime.now()) val = self.store.retrieveValues(self.key) self.failUnlessEqual(len(val), 1) self.failUnlessEqual(val[0], 'foobar') def test_expireValues(self): - self.store.storeValue(self.key, 'foobar') + self.store.storeValue(self.key, 'foobar', datetime.now()) sleep(2) - self.store.storeValue(self.key, 'barfoo') + self.store.storeValue(self.key, 'barfoo', datetime.now()) self.store.expireValues(1) val = self.store.retrieveValues(self.key) self.failUnlessEqual(len(val), 1) self.failUnlessEqual(val[0], 'barfoo') + def test_refreshValues(self): + self.store.storeValue(self.key, 'foobar', datetime.now()) + sleep(2) + self.store.storeValue(self.key, 'barfoo', datetime.now()) + keys, vals = self.store.refreshValues(1) + self.failUnlessEqual(len(keys), 1) + self.failUnlessEqual(keys[0], self.key) + self.failUnlessEqual(len(vals), 1) + self.failUnlessEqual(len(vals[0].keys()), 1) + self.failUnlessEqual(vals[0].keys()[0], 'foobar') + val = self.store.retrieveValues(self.key) + self.failUnlessEqual(len(val), 2) + def test_RoutingTable(self): class dummy: id = self.key