khashmir's store value takes the origination date.
authorCameron Dale <camrdale@gmail.com>
Sun, 13 Jan 2008 22:02:59 +0000 (14:02 -0800)
committerCameron Dale <camrdale@gmail.com>
Sun, 13 Jan 2008 22:02:59 +0000 (14:02 -0800)
apt_dht_Khashmir/DHT.py
apt_dht_Khashmir/actions.py
apt_dht_Khashmir/db.py
apt_dht_Khashmir/khashmir.py
apt_dht_Khashmir/knode.py

index 8930ffe..235c8d0 100644 (file)
@@ -1,4 +1,5 @@
 
+from datetime import datetime
 import os, sha, random
 
 from twisted.internet import defer, reactor
@@ -145,7 +146,7 @@ class DHT:
                 d.callback(final_result)
             del self.retrieving[key]
 
-    def storeValue(self, key, value):
+    def storeValue(self, key, value, originated = None):
         """See L{apt_dht.interfaces.IDHT}."""
         if self.config is None:
             raise DHTError, "configuration not loaded"
@@ -155,8 +156,10 @@ class DHT:
         if key in self.storing and value in self.storing[key]:
             raise DHTError, "already storing that key with the same value"
 
+        if originated is None:
+            originated = datetime.utcnow()
         d = defer.Deferred()
-        self.khashmir.storeValueForKey(key, value, self._storeValue)
+        self.khashmir.storeValueForKey(key, value, originated, self._storeValue)
         self.storing.setdefault(key, {})[value] = d
         return d
     
@@ -179,7 +182,7 @@ class TestSimpleDHT(unittest.TestCase):
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
-                    'KE_AGE': 3600, 'SPEW': False, }
+                    'KE_AGE': 3600, 'SPEW': True, }
 
     def setUp(self):
         self.a = DHT()
index 9bfa2e8..8da4431 100644 (file)
@@ -184,9 +184,10 @@ class GetValue(FindNode):
 
 
 class StoreValue(ActionBase):
-    def __init__(self, caller, target, value, callback, config, store="storeValue"):
+    def __init__(self, caller, target, value, originated, callback, config, store="storeValue"):
         ActionBase.__init__(self, caller, target, callback, config)
         self.value = value
+        self.originated = originated
         self.stored = []
         self.store = store
         
@@ -234,7 +235,7 @@ class StoreValue(ActionBase):
                     except AttributeError:
                         print ">>> %s doesn't have a %s method!" % (node, self.store)
                     else:
-                        df = f(self.target, self.value, self.caller.node.id)
+                        df = f(self.target, self.value, self.originated, self.caller.node.id)
                         df.addCallback(self.storedValue, node=node)
                         df.addErrback(self.storeFailed, node=node)
                     
index 8bb499e..4b7fcf3 100644 (file)
@@ -44,9 +44,10 @@ class DB:
     def _createNewDB(self, db):
         self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES)
         c = self.conn.cursor()
-        c.execute("CREATE TABLE kv (key KHASH, value TEXT, time TIMESTAMP, PRIMARY KEY (key, value))")
+        c.execute("CREATE TABLE kv (key KHASH, value TEXT, originated TIMESTAMP, last_refresh TIMESTAMP, PRIMARY KEY (key, value))")
         c.execute("CREATE INDEX kv_key ON kv(key)")
-        c.execute("CREATE INDEX kv_timestamp ON kv(time)")
+        c.execute("CREATE INDEX kv_originated ON kv(originated)")
+        c.execute("CREATE INDEX kv_last_refresh ON kv(last_refresh)")
         c.execute("CREATE TABLE nodes (id KHASH PRIMARY KEY, host TEXT, port NUMBER)")
         c.execute("CREATE TABLE self (num NUMBER PRIMARY KEY, id KHASH)")
         self.conn.commit()
@@ -86,28 +87,46 @@ class DB:
         return c.fetchall()
             
     def retrieveValues(self, key):
+        """Retrieve values from the database."""
         c = self.conn.cursor()
         c.execute("SELECT value FROM kv WHERE key = ?", (khash(key),))
-        t = c.fetchone()
         l = []
-        while t:
-            l.append(t[0])
-            t = c.fetchone()
+        rows = c.fetchall()
+        for row in rows:
+            l.append(row[0])
         return l
 
-    def storeValue(self, key, value):
+    def storeValue(self, key, value, originated):
         """Store or update a key and value."""
         c = self.conn.cursor()
-        c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?)", (khash(key), value, datetime.now()))
+        c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?, ?)", 
+                  (khash(key), value, originated, datetime.now()))
         self.conn.commit()
 
     def expireValues(self, expireAfter):
         """Expire older values after expireAfter seconds."""
         t = datetime.now() - timedelta(seconds=expireAfter)
         c = self.conn.cursor()
-        c.execute("DELETE FROM kv WHERE time < ?", (t, ))
+        c.execute("DELETE FROM kv WHERE originated < ?", (t, ))
         self.conn.commit()
         
+    def refreshValues(self, expireAfter):
+        """Find older values than expireAfter seconds to refresh.
+        
+        @return: a list of the hash keys and a list of dictionaries with
+            key of the value, value is the origination time
+        """
+        t = datetime.now() - timedelta(seconds=expireAfter)
+        c = self.conn.cursor()
+        c.execute("SELECT key, value, originated FROM kv WHERE last_refresh < ?", (t,))
+        keys = []
+        vals = []
+        rows = c.fetchall()
+        for row in rows:
+            keys.append(row[0])
+            vals.append({row[1]: row[2]})
+        return keys, vals
+        
     def close(self):
         self.conn.close()
 
@@ -126,20 +145,33 @@ class TestDB(unittest.TestCase):
         self.failUnlessEqual(self.store.getSelfNode(), self.key)
         
     def test_Value(self):
-        self.store.storeValue(self.key, 'foobar')
+        self.store.storeValue(self.key, 'foobar', datetime.now())
         val = self.store.retrieveValues(self.key)
         self.failUnlessEqual(len(val), 1)
         self.failUnlessEqual(val[0], 'foobar')
         
     def test_expireValues(self):
-        self.store.storeValue(self.key, 'foobar')
+        self.store.storeValue(self.key, 'foobar', datetime.now())
         sleep(2)
-        self.store.storeValue(self.key, 'barfoo')
+        self.store.storeValue(self.key, 'barfoo', datetime.now())
         self.store.expireValues(1)
         val = self.store.retrieveValues(self.key)
         self.failUnlessEqual(len(val), 1)
         self.failUnlessEqual(val[0], 'barfoo')
         
+    def test_refreshValues(self):
+        self.store.storeValue(self.key, 'foobar', datetime.now())
+        sleep(2)
+        self.store.storeValue(self.key, 'barfoo', datetime.now())
+        keys, vals = self.store.refreshValues(1)
+        self.failUnlessEqual(len(keys), 1)
+        self.failUnlessEqual(keys[0], self.key)
+        self.failUnlessEqual(len(vals), 1)
+        self.failUnlessEqual(len(vals[0].keys()), 1)
+        self.failUnlessEqual(vals[0].keys()[0], 'foobar')
+        val = self.store.retrieveValues(self.key)
+        self.failUnlessEqual(len(val), 2)
+        
     def test_RoutingTable(self):
         class dummy:
             id = self.key
index af6f0d8..5d14c7b 100644 (file)
@@ -257,28 +257,28 @@ class KhashmirRead(KhashmirBase):
 class KhashmirWrite(KhashmirRead):
     _Node = KNodeWrite
     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
-    def storeValueForKey(self, key, value, callback=None):
-        """ stores the value for key in the global table, returns immediately, no status 
+    def storeValueForKey(self, key, value, originated, callback=None):
+        """ stores the value and origination time for key in the global table, returns immediately, no status 
             in this implementation, peers respond but don't indicate status to storing values
             a key can have many values
         """
-        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+        def _storeValueForKey(nodes, key=key, value=value, originated=originated, response=callback , table=self.table):
             if not response:
                 # default callback
                 def _storedValueHandler(key, value, sender):
                     pass
                 response=_storedValueHandler
-            action = StoreValue(self.table, key, value, response, self.config)
+            action = StoreValue(self.table, key, value, originated, response, self.config)
             reactor.callLater(0, action.goWithNodes, nodes)
             
         # this call is asynch
         self.findNode(key, _storeValueForKey)
                     
     #### Remote Interface - called by remote nodes
-    def krpc_store_value(self, key, value, id, _krpc_sender):
+    def krpc_store_value(self, key, value, originated, id, _krpc_sender):
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
-        self.store.storeValue(key, value)
+        self.store.storeValue(key, value, originated)
         return {"id" : self.node.id}
 
 # the whole shebang, for testing
@@ -335,7 +335,7 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
         reactor.iterate()
         self.got = 0
-        self.a.storeValueForKey(sha('foo').digest(), 'foobar')
+        self.a.storeValueForKey(sha('foo').digest(), 'foobar', datetime.utcnow())
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
@@ -417,7 +417,7 @@ class MultiTest(unittest.TestCase):
                 self.done = 0
                 def _scb(key, value, result):
                     self.done = 1
-                self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
+                self.l[randrange(0, self.num)].storeValueForKey(K, V, datetime.utcnow(), _scb)
                 while not self.done:
                     reactor.iterate()
 
index 5a9d8f4..eda1b01 100644 (file)
@@ -47,13 +47,8 @@ class KNodeRead(KNodeBase):
         return df
 
 class KNodeWrite(KNodeRead):
-    def storeValue(self, key, value, id):
-        df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id})
-        df.addErrback(self.errBack)
-        df.addCallback(self.checkSender)
-        return df
-    def storeValues(self, key, value, id):
-        df = self.conn.sendRequest('store_values', {"key" : key, "values" : value, "id": id})
+    def storeValue(self, key, value, originated, id):
+        df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "originated" : originated, "id": id})
         df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df