+from datetime import datetime
import os, sha, random
from twisted.internet import defer, reactor
d.callback(final_result)
del self.retrieving[key]
- def storeValue(self, key, value):
+ def storeValue(self, key, value, originated = None):
"""See L{apt_dht.interfaces.IDHT}."""
if self.config is None:
raise DHTError, "configuration not loaded"
if key in self.storing and value in self.storing[key]:
raise DHTError, "already storing that key with the same value"
+ if originated is None:
+ originated = datetime.utcnow()
d = defer.Deferred()
- self.khashmir.storeValueForKey(key, value, self._storeValue)
+ self.khashmir.storeValueForKey(key, value, originated, self._storeValue)
self.storing.setdefault(key, {})[value] = d
return d
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, 'SPEW': False, }
+ 'KE_AGE': 3600, 'SPEW': True, }
def setUp(self):
self.a = DHT()
class StoreValue(ActionBase):
- def __init__(self, caller, target, value, callback, config, store="storeValue"):
+ def __init__(self, caller, target, value, originated, callback, config, store="storeValue"):
ActionBase.__init__(self, caller, target, callback, config)
self.value = value
+ self.originated = originated
self.stored = []
self.store = store
except AttributeError:
print ">>> %s doesn't have a %s method!" % (node, self.store)
else:
- df = f(self.target, self.value, self.caller.node.id)
+ df = f(self.target, self.value, self.originated, self.caller.node.id)
df.addCallback(self.storedValue, node=node)
df.addErrback(self.storeFailed, node=node)
def _createNewDB(self, db):
self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES)
c = self.conn.cursor()
- c.execute("CREATE TABLE kv (key KHASH, value TEXT, time TIMESTAMP, PRIMARY KEY (key, value))")
+ c.execute("CREATE TABLE kv (key KHASH, value TEXT, originated TIMESTAMP, last_refresh TIMESTAMP, PRIMARY KEY (key, value))")
c.execute("CREATE INDEX kv_key ON kv(key)")
- c.execute("CREATE INDEX kv_timestamp ON kv(time)")
+ c.execute("CREATE INDEX kv_originated ON kv(originated)")
+ c.execute("CREATE INDEX kv_last_refresh ON kv(last_refresh)")
c.execute("CREATE TABLE nodes (id KHASH PRIMARY KEY, host TEXT, port NUMBER)")
c.execute("CREATE TABLE self (num NUMBER PRIMARY KEY, id KHASH)")
self.conn.commit()
return c.fetchall()
def retrieveValues(self, key):
+ """Retrieve values from the database."""
c = self.conn.cursor()
c.execute("SELECT value FROM kv WHERE key = ?", (khash(key),))
- t = c.fetchone()
l = []
- while t:
- l.append(t[0])
- t = c.fetchone()
+ rows = c.fetchall()
+ for row in rows:
+ l.append(row[0])
return l
- def storeValue(self, key, value):
+ def storeValue(self, key, value, originated):
"""Store or update a key and value."""
c = self.conn.cursor()
- c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?)", (khash(key), value, datetime.now()))
+ c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?, ?)",
+ (khash(key), value, originated, datetime.now()))
self.conn.commit()
def expireValues(self, expireAfter):
"""Expire older values after expireAfter seconds."""
t = datetime.now() - timedelta(seconds=expireAfter)
c = self.conn.cursor()
- c.execute("DELETE FROM kv WHERE time < ?", (t, ))
+ c.execute("DELETE FROM kv WHERE originated < ?", (t, ))
self.conn.commit()
+ def refreshValues(self, expireAfter):
+ """Find older values than expireAfter seconds to refresh.
+
+ @return: a list of the hash keys and a list of dictionaries with
+ key of the value, value is the origination time
+ """
+ t = datetime.now() - timedelta(seconds=expireAfter)
+ c = self.conn.cursor()
+ c.execute("SELECT key, value, originated FROM kv WHERE last_refresh < ?", (t,))
+ keys = []
+ vals = []
+ rows = c.fetchall()
+ for row in rows:
+ keys.append(row[0])
+ vals.append({row[1]: row[2]})
+ return keys, vals
+
def close(self):
self.conn.close()
self.failUnlessEqual(self.store.getSelfNode(), self.key)
def test_Value(self):
- self.store.storeValue(self.key, 'foobar')
+ self.store.storeValue(self.key, 'foobar', datetime.now())
val = self.store.retrieveValues(self.key)
self.failUnlessEqual(len(val), 1)
self.failUnlessEqual(val[0], 'foobar')
def test_expireValues(self):
- self.store.storeValue(self.key, 'foobar')
+ self.store.storeValue(self.key, 'foobar', datetime.now())
sleep(2)
- self.store.storeValue(self.key, 'barfoo')
+ self.store.storeValue(self.key, 'barfoo', datetime.now())
self.store.expireValues(1)
val = self.store.retrieveValues(self.key)
self.failUnlessEqual(len(val), 1)
self.failUnlessEqual(val[0], 'barfoo')
+ def test_refreshValues(self):
+ self.store.storeValue(self.key, 'foobar', datetime.now())
+ sleep(2)
+ self.store.storeValue(self.key, 'barfoo', datetime.now())
+ keys, vals = self.store.refreshValues(1)
+ self.failUnlessEqual(len(keys), 1)
+ self.failUnlessEqual(keys[0], self.key)
+ self.failUnlessEqual(len(vals), 1)
+ self.failUnlessEqual(len(vals[0].keys()), 1)
+ self.failUnlessEqual(vals[0].keys()[0], 'foobar')
+ val = self.store.retrieveValues(self.key)
+ self.failUnlessEqual(len(val), 2)
+
def test_RoutingTable(self):
class dummy:
id = self.key
class KhashmirWrite(KhashmirRead):
_Node = KNodeWrite
## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
- def storeValueForKey(self, key, value, callback=None):
- """ stores the value for key in the global table, returns immediately, no status
+ def storeValueForKey(self, key, value, originated, callback=None):
+ """ stores the value and origination time for key in the global table, returns immediately, no status
in this implementation, peers respond but don't indicate status to storing values
a key can have many values
"""
- def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+ def _storeValueForKey(nodes, key=key, value=value, originated=originated, response=callback , table=self.table):
if not response:
# default callback
def _storedValueHandler(key, value, sender):
pass
response=_storedValueHandler
- action = StoreValue(self.table, key, value, response, self.config)
+ action = StoreValue(self.table, key, value, originated, response, self.config)
reactor.callLater(0, action.goWithNodes, nodes)
# this call is asynch
self.findNode(key, _storeValueForKey)
#### Remote Interface - called by remote nodes
- def krpc_store_value(self, key, value, id, _krpc_sender):
+ def krpc_store_value(self, key, value, originated, id, _krpc_sender):
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
self.insertNode(n, contacted=0)
- self.store.storeValue(key, value)
+ self.store.storeValue(key, value, originated)
return {"id" : self.node.id}
# the whole shebang, for testing
reactor.iterate()
reactor.iterate()
self.got = 0
- self.a.storeValueForKey(sha('foo').digest(), 'foobar')
+ self.a.storeValueForKey(sha('foo').digest(), 'foobar', datetime.utcnow())
reactor.iterate()
reactor.iterate()
reactor.iterate()
self.done = 0
def _scb(key, value, result):
self.done = 1
- self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
+ self.l[randrange(0, self.num)].storeValueForKey(K, V, datetime.utcnow(), _scb)
while not self.done:
reactor.iterate()
return df
class KNodeWrite(KNodeRead):
- def storeValue(self, key, value, id):
- df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id})
- df.addErrback(self.errBack)
- df.addCallback(self.checkSender)
- return df
- def storeValues(self, key, value, id):
- df = self.conn.sendRequest('store_values', {"key" : key, "values" : value, "id": id})
+ def storeValue(self, key, value, originated, id):
+ df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "originated" : originated, "id": id})
df.addErrback(self.errBack)
df.addCallback(self.checkSender)
return df