From 8f102eb1964db2ac18d4bac9e399c069a4cb616e Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Thu, 21 Feb 2008 15:18:44 -0800 Subject: [PATCH] Return a token in find_node responses, use it in store_value requests. Also changed the checkpointing frequency to 5 minutes so that storing 3 old tokens will give an age of 10-15 minutes (as per protocol). --- TODO | 2 -- apt-dht.conf | 2 +- apt_dht/apt_dht_conf.py | 2 +- apt_dht_Khashmir/DHT.py | 4 ++-- apt_dht_Khashmir/actions.py | 4 +++- apt_dht_Khashmir/khashmir.py | 19 ++++++++++++++----- apt_dht_Khashmir/knode.py | 4 ++-- apt_dht_Khashmir/node.py | 4 ++++ debian/apt-dht.conf.sgml | 2 +- test.py | 2 +- 10 files changed, 29 insertions(+), 16 deletions(-) diff --git a/TODO b/TODO index a40cdaa..bb5ae3d 100644 --- a/TODO +++ b/TODO @@ -1,8 +1,6 @@ Comply with the newly defined protocol on the web page. Various things need to done to comply with the newly defined protocol: - - add the token to find_node responses - - use the token in store_node requests - standardize the error messages (especially for a bad token) diff --git a/apt-dht.conf b/apt-dht.conf index 6c9b055..0b06e20 100644 --- a/apt-dht.conf +++ b/apt-dht.conf @@ -60,7 +60,7 @@ K = 8 HASH_LENGTH = 160 # interval between saving the running state -CHECKPOINT_INTERVAL = 15m +CHECKPOINT_INTERVAL = 5m # concurrent number of calls per find node/value request! CONCURRENT_REQS = 4 diff --git a/apt_dht/apt_dht_conf.py b/apt_dht/apt_dht_conf.py index fb2be96..42a3d38 100644 --- a/apt_dht/apt_dht_conf.py +++ b/apt_dht/apt_dht_conf.py @@ -62,7 +62,7 @@ DHT_DEFAULTS = { 'HASH_LENGTH': '160', # checkpoint every this many seconds - 'CHECKPOINT_INTERVAL': '15m', # fifteen minutes + 'CHECKPOINT_INTERVAL': '5m', # five minutes ### SEARCHING/STORING # concurrent xmlrpc calls per find node/value request! diff --git a/apt_dht_Khashmir/DHT.py b/apt_dht_Khashmir/DHT.py index 23b2755..e62f304 100644 --- a/apt_dht_Khashmir/DHT.py +++ b/apt_dht_Khashmir/DHT.py @@ -183,7 +183,7 @@ class TestSimpleDHT(unittest.TestCase): timeout = 2 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, - 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, @@ -276,7 +276,7 @@ class TestMultiDHT(unittest.TestCase): timeout = 60 num = 20 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, - 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index b8f5c8c..6766cd9 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -45,6 +45,8 @@ class FindNode(ActionBase): dict = dict['rsp'] n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1]) self.caller.insertNode(n) + if dict["id"] in self.found: + self.found[dict["id"]].updateToken(dict.get('token', '')) l = dict["nodes"] if self.finished or self.answered.has_key(dict["id"]): # a day late and a dollar short @@ -239,7 +241,7 @@ class StoreValue(ActionBase): except AttributeError: log.msg("%s doesn't have a %s method!" % (node, self.store)) else: - df = f(self.target, self.value, self.caller.node.id) + df = f(self.target, self.value, node.token, self.caller.node.id) df.addCallback(self.storedValue, node=node) df.addErrback(self.storeFailed, node=node) diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index 5d40dee..3f5327a 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -33,6 +33,7 @@ class KhashmirBase(protocol.Factory): self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db')) self.node = self._loadSelfNode('', self.port) self.table = KTable(self.node, config) + self.token_secrets = [newID()] #self.app = service.Application("krpc") self.udp = krpc.hostbroker(self, config) self.udp.protocol = krpc.KRPC @@ -59,6 +60,9 @@ class KhashmirBase(protocol.Factory): return self._Node(id, host, port) def checkpoint(self, auto=0): + self.token_secrets.insert(0, newID()) + if len(self.token_secrets) > 3: + self.token_secrets.pop() self.store.saveSelfNode(self.node.id) self.store.dumpRoutingTable(self.table.buckets) self.refreshTable() @@ -211,7 +215,8 @@ class KhashmirBase(protocol.Factory): self.insertNode(n, contacted=0) nodes = self.table.findNodes(target) nodes = map(lambda node: node.contactInfo(), nodes) - return {"nodes" : nodes, "id" : self.node.id} + token = sha(self.token_secrets[0] + _krpc_sender[0]).digest() + return {"nodes" : nodes, "token" : token, "id" : self.node.id} ## This class provides read-only access to the DHT, valueForKey @@ -275,10 +280,14 @@ class KhashmirWrite(KhashmirRead): self.findNode(key, _storeValueForKey) #### Remote Interface - called by remote nodes - def krpc_store_value(self, key, value, id, _krpc_sender): + def krpc_store_value(self, key, value, token, id, _krpc_sender): n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) self.insertNode(n, contacted=0) - self.store.storeValue(key, value) + for secret in self.token_secrets: + this_token = sha(secret + _krpc_sender[0]).digest() + if token == this_token: + self.store.storeValue(key, value) + break; return {"id" : self.node.id} # the whole shebang, for testing @@ -289,7 +298,7 @@ class SimpleTests(unittest.TestCase): timeout = 10 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, - 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, @@ -363,7 +372,7 @@ class MultiTest(unittest.TestCase): timeout = 30 num = 20 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, - 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, diff --git a/apt_dht_Khashmir/knode.py b/apt_dht_Khashmir/knode.py index 984514a..1ced1a0 100644 --- a/apt_dht_Khashmir/knode.py +++ b/apt_dht_Khashmir/knode.py @@ -49,8 +49,8 @@ class KNodeRead(KNodeBase): return df class KNodeWrite(KNodeRead): - def storeValue(self, key, value, id): - df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id}) + def storeValue(self, key, value, token, id): + df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "token" : token, "id": id}) df.addErrback(self.errBack) df.addCallback(self.checkSender) return df diff --git a/apt_dht_Khashmir/node.py b/apt_dht_Khashmir/node.py index 580ad9c..3f04ef2 100644 --- a/apt_dht_Khashmir/node.py +++ b/apt_dht_Khashmir/node.py @@ -30,11 +30,15 @@ class Node: self.num = khash.intify(id) self.host = host self.port = int(port) + self.token = '' self._contactInfo = None def updateLastSeen(self): self.lastSeen = datetime.now() self.fails = 0 + + def updateToken(self, token): + self.token = token def msgFailed(self): self.fails = self.fails + 1 diff --git a/debian/apt-dht.conf.sgml b/debian/apt-dht.conf.sgml index b011c51..6911eb8 100644 --- a/debian/apt-dht.conf.sgml +++ b/debian/apt-dht.conf.sgml @@ -179,7 +179,7 @@ The time to wait between saves of the running state. - (Default is 15 minutes.) + (Default is 5 minutes.) diff --git a/test.py b/test.py index d2dece4..f3a4dbc 100755 --- a/test.py +++ b/test.py @@ -354,7 +354,7 @@ K = 8 HASH_LENGTH = 160 # checkpoint every this many seconds -CHECKPOINT_INTERVAL = 15m +CHECKPOINT_INTERVAL = 5m # concurrent xmlrpc calls per find node/value request! CONCURRENT_REQS = 4 -- 2.30.2