Various things need to done to comply with the newly defined protocol:
- use the compact encoding of contact information
- - remove the originated time from the value storage
- add the token to find_node responses
- use the token in store_node requests
- standardize the error messages (especially for a bad token)
d.callback(final_result)
del self.retrieving[key]
- def storeValue(self, key, value, originated = None):
+ def storeValue(self, key, value):
"""See L{apt_dht.interfaces.IDHT}."""
if self.config is None:
raise DHTError, "configuration not loaded"
if key in self.storing and value in self.storing[key]:
raise DHTError, "already storing that key with the same value"
- if originated is None:
- originated = datetime.utcnow()
d = defer.Deferred()
- self.khashmir.storeValueForKey(key, value, originated, self._storeValue)
+ self.khashmir.storeValueForKey(key, value, self._storeValue)
self.storing.setdefault(key, {})[value] = d
return d
class StoreValue(ActionBase):
- def __init__(self, caller, target, value, originated, callback, config, store="storeValue"):
+ def __init__(self, caller, target, value, callback, config, store="storeValue"):
ActionBase.__init__(self, caller, target, callback, config)
self.value = value
- self.originated = originated
self.stored = []
self.store = store
except AttributeError:
log.msg("%s doesn't have a %s method!" % (node, self.store))
else:
- df = f(self.target, self.value, self.originated, self.caller.node.id)
+ df = f(self.target, self.value, self.caller.node.id)
df.addCallback(self.storedValue, node=node)
df.addErrback(self.storeFailed, node=node)
def _createNewDB(self, db):
self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES)
c = self.conn.cursor()
- c.execute("CREATE TABLE kv (key KHASH, value TEXT, originated TIMESTAMP, last_refresh TIMESTAMP, PRIMARY KEY (key, value))")
+ c.execute("CREATE TABLE kv (key KHASH, value TEXT, last_refresh TIMESTAMP, PRIMARY KEY (key, value))")
c.execute("CREATE INDEX kv_key ON kv(key)")
- c.execute("CREATE INDEX kv_originated ON kv(originated)")
c.execute("CREATE INDEX kv_last_refresh ON kv(last_refresh)")
c.execute("CREATE TABLE nodes (id KHASH PRIMARY KEY, host TEXT, port NUMBER)")
c.execute("CREATE TABLE self (num NUMBER PRIMARY KEY, id KHASH)")
l.append(row[0])
return l
- def storeValue(self, key, value, originated):
+ def storeValue(self, key, value):
"""Store or update a key and value."""
c = self.conn.cursor()
- c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?, ?)",
- (khash(key), value, originated, datetime.now()))
+ c.execute("INSERT OR REPLACE INTO kv VALUES (?, ?, ?)",
+ (khash(key), value, datetime.now()))
self.conn.commit()
def expireValues(self, expireAfter):
"""Expire older values after expireAfter seconds."""
t = datetime.now() - timedelta(seconds=expireAfter)
c = self.conn.cursor()
- c.execute("DELETE FROM kv WHERE originated < ?", (t, ))
+ c.execute("DELETE FROM kv WHERE last_refresh < ?", (t, ))
self.conn.commit()
def refreshValues(self, expireAfter):
"""
t = datetime.now() - timedelta(seconds=expireAfter)
c = self.conn.cursor()
- c.execute("SELECT key, value, originated FROM kv WHERE last_refresh < ?", (t,))
+ c.execute("SELECT key, value, FROM kv WHERE last_refresh < ?", (t,))
keys = []
vals = []
rows = c.fetchall()
class KhashmirWrite(KhashmirRead):
_Node = KNodeWrite
## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
- def storeValueForKey(self, key, value, originated, callback=None):
+ def storeValueForKey(self, key, value, callback=None):
""" stores the value and origination time for key in the global table, returns immediately, no status
in this implementation, peers respond but don't indicate status to storing values
a key can have many values
"""
- def _storeValueForKey(nodes, key=key, value=value, originated=originated, response=callback , table=self.table):
+ def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
if not response:
# default callback
def _storedValueHandler(key, value, sender):
pass
response=_storedValueHandler
- action = StoreValue(self.table, key, value, originated, response, self.config)
+ action = StoreValue(self.table, key, value, response, self.config)
reactor.callLater(0, action.goWithNodes, nodes)
# this call is asynch
self.findNode(key, _storeValueForKey)
#### Remote Interface - called by remote nodes
- def krpc_store_value(self, key, value, originated, id, _krpc_sender):
+ def krpc_store_value(self, key, value, id, _krpc_sender):
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
self.insertNode(n, contacted=0)
- self.store.storeValue(key, value, originated)
+ self.store.storeValue(key, value)
return {"id" : self.node.id}
# the whole shebang, for testing
return df
class KNodeWrite(KNodeRead):
- def storeValue(self, key, value, originated, id):
- df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "originated" : originated, "id": id})
+ def storeValue(self, key, value, id):
+ df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id})
df.addErrback(self.errBack)
df.addCallback(self.checkSender)
return df