khashmir's store value takes the origination date.
authorCameron Dale <camrdale@gmail.com>
Sun, 13 Jan 2008 22:02:59 +0000 (14:02 -0800)
committerCameron Dale <camrdale@gmail.com>
Sun, 13 Jan 2008 22:02:59 +0000 (14:02 -0800)
apt_dht_Khashmir/DHT.py
apt_dht_Khashmir/actions.py
apt_dht_Khashmir/db.py
apt_dht_Khashmir/khashmir.py
apt_dht_Khashmir/knode.py

index 8930ffe9d58c40e83e169d0ceb3b964ed458cb10..235c8d0c7a5264e10f26283352fe6459e964b752 100644 (file)
@@ -1,4 +1,5 @@
 
 
+from datetime import datetime
 import os, sha, random
 
 from twisted.internet import defer, reactor
 import os, sha, random
 
 from twisted.internet import defer, reactor
@@ -145,7 +146,7 @@ class DHT:
                 d.callback(final_result)
             del self.retrieving[key]
 
                 d.callback(final_result)
             del self.retrieving[key]
 
-    def storeValue(self, key, value):
+    def storeValue(self, key, value, originated = None):
         """See L{apt_dht.interfaces.IDHT}."""
         if self.config is None:
             raise DHTError, "configuration not loaded"
         """See L{apt_dht.interfaces.IDHT}."""
         if self.config is None:
             raise DHTError, "configuration not loaded"
@@ -155,8 +156,10 @@ class DHT:
         if key in self.storing and value in self.storing[key]:
             raise DHTError, "already storing that key with the same value"
 
         if key in self.storing and value in self.storing[key]:
             raise DHTError, "already storing that key with the same value"
 
+        if originated is None:
+            originated = datetime.utcnow()
         d = defer.Deferred()
         d = defer.Deferred()
-        self.khashmir.storeValueForKey(key, value, self._storeValue)
+        self.khashmir.storeValueForKey(key, value, originated, self._storeValue)
         self.storing.setdefault(key, {})[value] = d
         return d
     
         self.storing.setdefault(key, {})[value] = d
         return d
     
@@ -179,7 +182,7 @@ class TestSimpleDHT(unittest.TestCase):
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
-                    'KE_AGE': 3600, 'SPEW': False, }
+                    'KE_AGE': 3600, 'SPEW': True, }
 
     def setUp(self):
         self.a = DHT()
 
     def setUp(self):
         self.a = DHT()
index 9bfa2e8a708be3a5726b35fff76f7dad0772d9bf..8da4431dcd1d110e05c9cf429ea58fc4ab3bb435 100644 (file)
@@ -184,9 +184,10 @@ class GetValue(FindNode):
 
 
 class StoreValue(ActionBase):
 
 
 class StoreValue(ActionBase):
-    def __init__(self, caller, target, value, callback, config, store="storeValue"):
+    def __init__(self, caller, target, value, originated, callback, config, store="storeValue"):
         ActionBase.__init__(self, caller, target, callback, config)
         self.value = value
         ActionBase.__init__(self, caller, target, callback, config)
         self.value = value
+        self.originated = originated
         self.stored = []
         self.store = store
         
         self.stored = []
         self.store = store
         
@@ -234,7 +235,7 @@ class StoreValue(ActionBase):
                     except AttributeError:
                         print ">>> %s doesn't have a %s method!" % (node, self.store)
                     else:
                     except AttributeError:
                         print ">>> %s doesn't have a %s method!" % (node, self.store)
                     else:
-                        df = f(self.target, self.value, self.caller.node.id)
+                        df = f(self.target, self.value, self.originated, self.caller.node.id)
                         df.addCallback(self.storedValue, node=node)
                         df.addErrback(self.storeFailed, node=node)
                     
                         df.addCallback(self.storedValue, node=node)
                         df.addErrback(self.storeFailed, node=node)
                     
index 8bb499e14512e85b0218941b22485ba0dac63474..4b7fcf38f56eda66a9ddf9461d3a86fd2d11a99d 100644 (file)
@@ -44,9 +44,10 @@ class DB:
     def _createNewDB(self, db):
         self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES)
         c = self.conn.cursor()
     def _createNewDB(self, db):
         self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES)
         c = self.conn.cursor()
-        c.execute("CREATE TABLE kv (key KHASH, value TEXT, time TIMESTAMP, PRIMARY KEY (key, value))")
+        c.execute("CREATE TABLE kv (key KHASH, value TEXT, originated TIMESTAMP, last_refresh TIMESTAMP, PRIMARY KEY (key, value))")
         c.execute("CREATE INDEX kv_key ON kv(key)")
         c.execute("CREATE INDEX kv_key ON kv(key)")
-        c.execute("CREATE INDEX kv_timestamp ON kv(time)")
+        c.execute("CREATE INDEX kv_originated ON kv(originated)")
+        c.execute("CREATE INDEX kv_last_refresh ON kv(last_refresh)")
         c.execute("CREATE TABLE nodes (id KHASH PRIMARY KEY, host TEXT, port NUMBER)")
         c.execute("CREATE TABLE self (num NUMBER PRIMARY KEY, id KHASH)")
         self.conn.commit()
         c.execute("CREATE TABLE nodes (id KHASH PRIMARY KEY, host TEXT, port NUMBER)")
         c.execute("CREATE TABLE self (num NUMBER PRIMARY KEY, id KHASH)")
         self.conn.commit()
@@ -86,28 +87,46 @@ class DB:
         return c.fetchall()
             
     def retrieveValues(self, key):
         return c.fetchall()
             
     def retrieveValues(self, key):
+        """Retrieve values from the database."""
         c = self.conn.cursor()
         c.execute("SELECT value FROM kv WHERE key = ?", (khash(key),))
         c = self.conn.cursor()
         c.execute("SELECT value FROM kv WHERE key = ?", (khash(key),))
-        t = c.fetchone()
         l = []
         l = []
-        while t:
-            l.append(t[0])
-            t = c.fetchone()
+        rows = c.fetchall()
+        for row in rows:
+            l.append(row[0])
         return l
 
         return l
 
-    def storeValue(self, key, value):
+    def storeValue(self, key, value, originated):
         """Store or update a key and value."""
         c = self.conn.cursor()
         """Store or update a key and value."""
         c = self.conn.cursor()
-        c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?)", (khash(key), value, datetime.now()))
+        c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?, ?)", 
+                  (khash(key), value, originated, datetime.now()))
         self.conn.commit()
 
     def expireValues(self, expireAfter):
         """Expire older values after expireAfter seconds."""
         t = datetime.now() - timedelta(seconds=expireAfter)
         c = self.conn.cursor()
         self.conn.commit()
 
     def expireValues(self, expireAfter):
         """Expire older values after expireAfter seconds."""
         t = datetime.now() - timedelta(seconds=expireAfter)
         c = self.conn.cursor()
-        c.execute("DELETE FROM kv WHERE time < ?", (t, ))
+        c.execute("DELETE FROM kv WHERE originated < ?", (t, ))
         self.conn.commit()
         
         self.conn.commit()
         
+    def refreshValues(self, expireAfter):
+        """Find older values than expireAfter seconds to refresh.
+        
+        @return: a list of the hash keys and a list of dictionaries with
+            key of the value, value is the origination time
+        """
+        t = datetime.now() - timedelta(seconds=expireAfter)
+        c = self.conn.cursor()
+        c.execute("SELECT key, value, originated FROM kv WHERE last_refresh < ?", (t,))
+        keys = []
+        vals = []
+        rows = c.fetchall()
+        for row in rows:
+            keys.append(row[0])
+            vals.append({row[1]: row[2]})
+        return keys, vals
+        
     def close(self):
         self.conn.close()
 
     def close(self):
         self.conn.close()
 
@@ -126,20 +145,33 @@ class TestDB(unittest.TestCase):
         self.failUnlessEqual(self.store.getSelfNode(), self.key)
         
     def test_Value(self):
         self.failUnlessEqual(self.store.getSelfNode(), self.key)
         
     def test_Value(self):
-        self.store.storeValue(self.key, 'foobar')
+        self.store.storeValue(self.key, 'foobar', datetime.now())
         val = self.store.retrieveValues(self.key)
         self.failUnlessEqual(len(val), 1)
         self.failUnlessEqual(val[0], 'foobar')
         
     def test_expireValues(self):
         val = self.store.retrieveValues(self.key)
         self.failUnlessEqual(len(val), 1)
         self.failUnlessEqual(val[0], 'foobar')
         
     def test_expireValues(self):
-        self.store.storeValue(self.key, 'foobar')
+        self.store.storeValue(self.key, 'foobar', datetime.now())
         sleep(2)
         sleep(2)
-        self.store.storeValue(self.key, 'barfoo')
+        self.store.storeValue(self.key, 'barfoo', datetime.now())
         self.store.expireValues(1)
         val = self.store.retrieveValues(self.key)
         self.failUnlessEqual(len(val), 1)
         self.failUnlessEqual(val[0], 'barfoo')
         
         self.store.expireValues(1)
         val = self.store.retrieveValues(self.key)
         self.failUnlessEqual(len(val), 1)
         self.failUnlessEqual(val[0], 'barfoo')
         
+    def test_refreshValues(self):
+        self.store.storeValue(self.key, 'foobar', datetime.now())
+        sleep(2)
+        self.store.storeValue(self.key, 'barfoo', datetime.now())
+        keys, vals = self.store.refreshValues(1)
+        self.failUnlessEqual(len(keys), 1)
+        self.failUnlessEqual(keys[0], self.key)
+        self.failUnlessEqual(len(vals), 1)
+        self.failUnlessEqual(len(vals[0].keys()), 1)
+        self.failUnlessEqual(vals[0].keys()[0], 'foobar')
+        val = self.store.retrieveValues(self.key)
+        self.failUnlessEqual(len(val), 2)
+        
     def test_RoutingTable(self):
         class dummy:
             id = self.key
     def test_RoutingTable(self):
         class dummy:
             id = self.key
index af6f0d8817156d0eba8af78feb2c4e320af2346f..5d14c7bd34a85ba1c76c772695e731d38f06e6a8 100644 (file)
@@ -257,28 +257,28 @@ class KhashmirRead(KhashmirBase):
 class KhashmirWrite(KhashmirRead):
     _Node = KNodeWrite
     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
 class KhashmirWrite(KhashmirRead):
     _Node = KNodeWrite
     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
-    def storeValueForKey(self, key, value, callback=None):
-        """ stores the value for key in the global table, returns immediately, no status 
+    def storeValueForKey(self, key, value, originated, callback=None):
+        """ stores the value and origination time for key in the global table, returns immediately, no status 
             in this implementation, peers respond but don't indicate status to storing values
             a key can have many values
         """
             in this implementation, peers respond but don't indicate status to storing values
             a key can have many values
         """
-        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+        def _storeValueForKey(nodes, key=key, value=value, originated=originated, response=callback , table=self.table):
             if not response:
                 # default callback
                 def _storedValueHandler(key, value, sender):
                     pass
                 response=_storedValueHandler
             if not response:
                 # default callback
                 def _storedValueHandler(key, value, sender):
                     pass
                 response=_storedValueHandler
-            action = StoreValue(self.table, key, value, response, self.config)
+            action = StoreValue(self.table, key, value, originated, response, self.config)
             reactor.callLater(0, action.goWithNodes, nodes)
             
         # this call is asynch
         self.findNode(key, _storeValueForKey)
                     
     #### Remote Interface - called by remote nodes
             reactor.callLater(0, action.goWithNodes, nodes)
             
         # this call is asynch
         self.findNode(key, _storeValueForKey)
                     
     #### Remote Interface - called by remote nodes
-    def krpc_store_value(self, key, value, id, _krpc_sender):
+    def krpc_store_value(self, key, value, originated, id, _krpc_sender):
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
-        self.store.storeValue(key, value)
+        self.store.storeValue(key, value, originated)
         return {"id" : self.node.id}
 
 # the whole shebang, for testing
         return {"id" : self.node.id}
 
 # the whole shebang, for testing
@@ -335,7 +335,7 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
         reactor.iterate()
         self.got = 0
         reactor.iterate()
         reactor.iterate()
         self.got = 0
-        self.a.storeValueForKey(sha('foo').digest(), 'foobar')
+        self.a.storeValueForKey(sha('foo').digest(), 'foobar', datetime.utcnow())
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
@@ -417,7 +417,7 @@ class MultiTest(unittest.TestCase):
                 self.done = 0
                 def _scb(key, value, result):
                     self.done = 1
                 self.done = 0
                 def _scb(key, value, result):
                     self.done = 1
-                self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
+                self.l[randrange(0, self.num)].storeValueForKey(K, V, datetime.utcnow(), _scb)
                 while not self.done:
                     reactor.iterate()
 
                 while not self.done:
                     reactor.iterate()
 
index 5a9d8f43898e1997c137585d49363d14ef468620..eda1b01645bc109e737c2a837063825233d535a5 100644 (file)
@@ -47,13 +47,8 @@ class KNodeRead(KNodeBase):
         return df
 
 class KNodeWrite(KNodeRead):
         return df
 
 class KNodeWrite(KNodeRead):
-    def storeValue(self, key, value, id):
-        df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id})
-        df.addErrback(self.errBack)
-        df.addCallback(self.checkSender)
-        return df
-    def storeValues(self, key, value, id):
-        df = self.conn.sendRequest('store_values', {"key" : key, "values" : value, "id": id})
+    def storeValue(self, key, value, originated, id):
+        df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "originated" : originated, "id": id})
         df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
         df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df