From b4ad0e73eced53fb46c41fe1511d03c4f2466dba Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Sun, 13 Jan 2008 14:02:59 -0800 Subject: [PATCH] khashmir's store value takes the origination date. --- apt_dht_Khashmir/DHT.py | 9 ++++-- apt_dht_Khashmir/actions.py | 5 ++-- apt_dht_Khashmir/db.py | 56 ++++++++++++++++++++++++++++-------- apt_dht_Khashmir/khashmir.py | 16 +++++------ apt_dht_Khashmir/knode.py | 9 ++---- 5 files changed, 63 insertions(+), 32 deletions(-) diff --git a/apt_dht_Khashmir/DHT.py b/apt_dht_Khashmir/DHT.py index 8930ffe..235c8d0 100644 --- a/apt_dht_Khashmir/DHT.py +++ b/apt_dht_Khashmir/DHT.py @@ -1,4 +1,5 @@ +from datetime import datetime import os, sha, random from twisted.internet import defer, reactor @@ -145,7 +146,7 @@ class DHT: d.callback(final_result) del self.retrieving[key] - def storeValue(self, key, value): + def storeValue(self, key, value, originated = None): """See L{apt_dht.interfaces.IDHT}.""" if self.config is None: raise DHTError, "configuration not loaded" @@ -155,8 +156,10 @@ class DHT: if key in self.storing and value in self.storing[key]: raise DHTError, "already storing that key with the same value" + if originated is None: + originated = datetime.utcnow() d = defer.Deferred() - self.khashmir.storeValueForKey(key, value, self._storeValue) + self.khashmir.storeValueForKey(key, value, originated, self._storeValue) self.storing.setdefault(key, {})[value] = d return d @@ -179,7 +182,7 @@ class TestSimpleDHT(unittest.TestCase): 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, - 'KE_AGE': 3600, 'SPEW': False, } + 'KE_AGE': 3600, 'SPEW': True, } def setUp(self): self.a = DHT() diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index 9bfa2e8..8da4431 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -184,9 +184,10 @@ class GetValue(FindNode): class StoreValue(ActionBase): - def __init__(self, caller, target, value, callback, config, store="storeValue"): + def __init__(self, caller, target, value, originated, callback, config, store="storeValue"): ActionBase.__init__(self, caller, target, callback, config) self.value = value + self.originated = originated self.stored = [] self.store = store @@ -234,7 +235,7 @@ class StoreValue(ActionBase): except AttributeError: print ">>> %s doesn't have a %s method!" % (node, self.store) else: - df = f(self.target, self.value, self.caller.node.id) + df = f(self.target, self.value, self.originated, self.caller.node.id) df.addCallback(self.storedValue, node=node) df.addErrback(self.storeFailed, node=node) diff --git a/apt_dht_Khashmir/db.py b/apt_dht_Khashmir/db.py index 8bb499e..4b7fcf3 100644 --- a/apt_dht_Khashmir/db.py +++ b/apt_dht_Khashmir/db.py @@ -44,9 +44,10 @@ class DB: def _createNewDB(self, db): self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES) c = self.conn.cursor() - c.execute("CREATE TABLE kv (key KHASH, value TEXT, time TIMESTAMP, PRIMARY KEY (key, value))") + c.execute("CREATE TABLE kv (key KHASH, value TEXT, originated TIMESTAMP, last_refresh TIMESTAMP, PRIMARY KEY (key, value))") c.execute("CREATE INDEX kv_key ON kv(key)") - c.execute("CREATE INDEX kv_timestamp ON kv(time)") + c.execute("CREATE INDEX kv_originated ON kv(originated)") + c.execute("CREATE INDEX kv_last_refresh ON kv(last_refresh)") c.execute("CREATE TABLE nodes (id KHASH PRIMARY KEY, host TEXT, port NUMBER)") c.execute("CREATE TABLE self (num NUMBER PRIMARY KEY, id KHASH)") self.conn.commit() @@ -86,28 +87,46 @@ class DB: return c.fetchall() def retrieveValues(self, key): + """Retrieve values from the database.""" c = self.conn.cursor() c.execute("SELECT value FROM kv WHERE key = ?", (khash(key),)) - t = c.fetchone() l = [] - while t: - l.append(t[0]) - t = c.fetchone() + rows = c.fetchall() + for row in rows: + l.append(row[0]) return l - def storeValue(self, key, value): + def storeValue(self, key, value, originated): """Store or update a key and value.""" c = self.conn.cursor() - c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?)", (khash(key), value, datetime.now())) + c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?, ?)", + (khash(key), value, originated, datetime.now())) self.conn.commit() def expireValues(self, expireAfter): """Expire older values after expireAfter seconds.""" t = datetime.now() - timedelta(seconds=expireAfter) c = self.conn.cursor() - c.execute("DELETE FROM kv WHERE time < ?", (t, )) + c.execute("DELETE FROM kv WHERE originated < ?", (t, )) self.conn.commit() + def refreshValues(self, expireAfter): + """Find older values than expireAfter seconds to refresh. + + @return: a list of the hash keys and a list of dictionaries with + key of the value, value is the origination time + """ + t = datetime.now() - timedelta(seconds=expireAfter) + c = self.conn.cursor() + c.execute("SELECT key, value, originated FROM kv WHERE last_refresh < ?", (t,)) + keys = [] + vals = [] + rows = c.fetchall() + for row in rows: + keys.append(row[0]) + vals.append({row[1]: row[2]}) + return keys, vals + def close(self): self.conn.close() @@ -126,20 +145,33 @@ class TestDB(unittest.TestCase): self.failUnlessEqual(self.store.getSelfNode(), self.key) def test_Value(self): - self.store.storeValue(self.key, 'foobar') + self.store.storeValue(self.key, 'foobar', datetime.now()) val = self.store.retrieveValues(self.key) self.failUnlessEqual(len(val), 1) self.failUnlessEqual(val[0], 'foobar') def test_expireValues(self): - self.store.storeValue(self.key, 'foobar') + self.store.storeValue(self.key, 'foobar', datetime.now()) sleep(2) - self.store.storeValue(self.key, 'barfoo') + self.store.storeValue(self.key, 'barfoo', datetime.now()) self.store.expireValues(1) val = self.store.retrieveValues(self.key) self.failUnlessEqual(len(val), 1) self.failUnlessEqual(val[0], 'barfoo') + def test_refreshValues(self): + self.store.storeValue(self.key, 'foobar', datetime.now()) + sleep(2) + self.store.storeValue(self.key, 'barfoo', datetime.now()) + keys, vals = self.store.refreshValues(1) + self.failUnlessEqual(len(keys), 1) + self.failUnlessEqual(keys[0], self.key) + self.failUnlessEqual(len(vals), 1) + self.failUnlessEqual(len(vals[0].keys()), 1) + self.failUnlessEqual(vals[0].keys()[0], 'foobar') + val = self.store.retrieveValues(self.key) + self.failUnlessEqual(len(val), 2) + def test_RoutingTable(self): class dummy: id = self.key diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index af6f0d8..5d14c7b 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -257,28 +257,28 @@ class KhashmirRead(KhashmirBase): class KhashmirWrite(KhashmirRead): _Node = KNodeWrite ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) - def storeValueForKey(self, key, value, callback=None): - """ stores the value for key in the global table, returns immediately, no status + def storeValueForKey(self, key, value, originated, callback=None): + """ stores the value and origination time for key in the global table, returns immediately, no status in this implementation, peers respond but don't indicate status to storing values a key can have many values """ - def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + def _storeValueForKey(nodes, key=key, value=value, originated=originated, response=callback , table=self.table): if not response: # default callback def _storedValueHandler(key, value, sender): pass response=_storedValueHandler - action = StoreValue(self.table, key, value, response, self.config) + action = StoreValue(self.table, key, value, originated, response, self.config) reactor.callLater(0, action.goWithNodes, nodes) # this call is asynch self.findNode(key, _storeValueForKey) #### Remote Interface - called by remote nodes - def krpc_store_value(self, key, value, id, _krpc_sender): + def krpc_store_value(self, key, value, originated, id, _krpc_sender): n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) self.insertNode(n, contacted=0) - self.store.storeValue(key, value) + self.store.storeValue(key, value, originated) return {"id" : self.node.id} # the whole shebang, for testing @@ -335,7 +335,7 @@ class SimpleTests(unittest.TestCase): reactor.iterate() reactor.iterate() self.got = 0 - self.a.storeValueForKey(sha('foo').digest(), 'foobar') + self.a.storeValueForKey(sha('foo').digest(), 'foobar', datetime.utcnow()) reactor.iterate() reactor.iterate() reactor.iterate() @@ -417,7 +417,7 @@ class MultiTest(unittest.TestCase): self.done = 0 def _scb(key, value, result): self.done = 1 - self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb) + self.l[randrange(0, self.num)].storeValueForKey(K, V, datetime.utcnow(), _scb) while not self.done: reactor.iterate() diff --git a/apt_dht_Khashmir/knode.py b/apt_dht_Khashmir/knode.py index 5a9d8f4..eda1b01 100644 --- a/apt_dht_Khashmir/knode.py +++ b/apt_dht_Khashmir/knode.py @@ -47,13 +47,8 @@ class KNodeRead(KNodeBase): return df class KNodeWrite(KNodeRead): - def storeValue(self, key, value, id): - df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id}) - df.addErrback(self.errBack) - df.addCallback(self.checkSender) - return df - def storeValues(self, key, value, id): - df = self.conn.sendRequest('store_values', {"key" : key, "values" : value, "id": id}) + def storeValue(self, key, value, originated, id): + df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "originated" : originated, "id": id}) df.addErrback(self.errBack) df.addCallback(self.checkSender) return df -- 2.30.2