Return a token in find_node responses, use it in store_value requests.
authorCameron Dale <camrdale@gmail.com>
Thu, 21 Feb 2008 23:18:44 +0000 (15:18 -0800)
committerCameron Dale <camrdale@gmail.com>
Thu, 21 Feb 2008 23:18:44 +0000 (15:18 -0800)
Also changed the checkpointing frequency to 5 minutes so that storing 3
old tokens will give an age of 10-15 minutes (as per protocol).

TODO
apt-dht.conf
apt_dht/apt_dht_conf.py
apt_dht_Khashmir/DHT.py
apt_dht_Khashmir/actions.py
apt_dht_Khashmir/khashmir.py
apt_dht_Khashmir/knode.py
apt_dht_Khashmir/node.py
debian/apt-dht.conf.sgml
test.py

diff --git a/TODO b/TODO
index a40cdaa..bb5ae3d 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,8 +1,6 @@
 Comply with the newly defined protocol on the web page.
 
 Various things need to done to comply with the newly defined protocol:
- - add the token to find_node responses
- - use the token in store_node requests
  - standardize the error messages (especially for a bad token)
 
 
index 6c9b055..0b06e20 100644 (file)
@@ -60,7 +60,7 @@ K = 8
 HASH_LENGTH = 160
 
 # interval between saving the running state
-CHECKPOINT_INTERVAL = 15m
+CHECKPOINT_INTERVAL = 5m
 
 # concurrent number of calls per find node/value request!
 CONCURRENT_REQS = 4
index fb2be96..42a3d38 100644 (file)
@@ -62,7 +62,7 @@ DHT_DEFAULTS = {
     'HASH_LENGTH': '160',
     
     # checkpoint every this many seconds
-    'CHECKPOINT_INTERVAL': '15m', # fifteen minutes
+    'CHECKPOINT_INTERVAL': '5m', # five minutes
     
     ### SEARCHING/STORING
     # concurrent xmlrpc calls per find node/value request!
index 23b2755..e62f304 100644 (file)
@@ -183,7 +183,7 @@ class TestSimpleDHT(unittest.TestCase):
     
     timeout = 2
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
-                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
@@ -276,7 +276,7 @@ class TestMultiDHT(unittest.TestCase):
     timeout = 60
     num = 20
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
-                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
index b8f5c8c..6766cd9 100644 (file)
@@ -45,6 +45,8 @@ class FindNode(ActionBase):
         dict = dict['rsp']
         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
         self.caller.insertNode(n)
+        if dict["id"] in self.found:
+            self.found[dict["id"]].updateToken(dict.get('token', ''))
         l = dict["nodes"]
         if self.finished or self.answered.has_key(dict["id"]):
             # a day late and a dollar short
@@ -239,7 +241,7 @@ class StoreValue(ActionBase):
                     except AttributeError:
                         log.msg("%s doesn't have a %s method!" % (node, self.store))
                     else:
-                        df = f(self.target, self.value, self.caller.node.id)
+                        df = f(self.target, self.value, node.token, self.caller.node.id)
                         df.addCallback(self.storedValue, node=node)
                         df.addErrback(self.storeFailed, node=node)
                     
index 5d40dee..3f5327a 100644 (file)
@@ -33,6 +33,7 @@ class KhashmirBase(protocol.Factory):
         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
+        self.token_secrets = [newID()]
         #self.app = service.Application("krpc")
         self.udp = krpc.hostbroker(self, config)
         self.udp.protocol = krpc.KRPC
@@ -59,6 +60,9 @@ class KhashmirBase(protocol.Factory):
         return self._Node(id, host, port)
         
     def checkpoint(self, auto=0):
+        self.token_secrets.insert(0, newID())
+        if len(self.token_secrets) > 3:
+            self.token_secrets.pop()
         self.store.saveSelfNode(self.node.id)
         self.store.dumpRoutingTable(self.table.buckets)
         self.refreshTable()
@@ -211,7 +215,8 @@ class KhashmirBase(protocol.Factory):
         self.insertNode(n, contacted=0)
         nodes = self.table.findNodes(target)
         nodes = map(lambda node: node.contactInfo(), nodes)
-        return {"nodes" : nodes, "id" : self.node.id}
+        token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
+        return {"nodes" : nodes, "token" : token, "id" : self.node.id}
 
 
 ## This class provides read-only access to the DHT, valueForKey
@@ -275,10 +280,14 @@ class KhashmirWrite(KhashmirRead):
         self.findNode(key, _storeValueForKey)
                     
     #### Remote Interface - called by remote nodes
-    def krpc_store_value(self, key, value, id, _krpc_sender):
+    def krpc_store_value(self, key, value, token, id, _krpc_sender):
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
-        self.store.storeValue(key, value)
+        for secret in self.token_secrets:
+            this_token = sha(secret + _krpc_sender[0]).digest()
+            if token == this_token:
+                self.store.storeValue(key, value)
+                break;
         return {"id" : self.node.id}
 
 # the whole shebang, for testing
@@ -289,7 +298,7 @@ class SimpleTests(unittest.TestCase):
     
     timeout = 10
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
-                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
@@ -363,7 +372,7 @@ class MultiTest(unittest.TestCase):
     timeout = 30
     num = 20
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
-                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
index 984514a..1ced1a0 100644 (file)
@@ -49,8 +49,8 @@ class KNodeRead(KNodeBase):
         return df
 
 class KNodeWrite(KNodeRead):
-    def storeValue(self, key, value, id):
-        df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id})
+    def storeValue(self, key, value, token, id):
+        df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "token" : token, "id": id})
         df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
index 580ad9c..3f04ef2 100644 (file)
@@ -30,11 +30,15 @@ class Node:
         self.num = khash.intify(id)
         self.host = host
         self.port = int(port)
+        self.token = ''
         self._contactInfo = None
     
     def updateLastSeen(self):
         self.lastSeen = datetime.now()
         self.fails = 0
+        
+    def updateToken(self, token):
+        self.token = token
     
     def msgFailed(self):
         self.fails = self.fails + 1
index b011c51..6911eb8 100644 (file)
            <term><option>CHECKPOINT_INTERVAL = <replaceable>time</replaceable></option></term>
             <listitem>
              <para>The <replaceable>time</replaceable> to wait between saves of the running state.
-                 (Default is 15 minutes.)</para>
+                 (Default is 5 minutes.)</para>
            </listitem>
          </varlistentry>
          <varlistentry>
diff --git a/test.py b/test.py
index d2dece4..f3a4dbc 100755 (executable)
--- a/test.py
+++ b/test.py
@@ -354,7 +354,7 @@ K = 8
 HASH_LENGTH = 160
 
 # checkpoint every this many seconds
-CHECKPOINT_INTERVAL = 15m
+CHECKPOINT_INTERVAL = 5m
 
 # concurrent xmlrpc calls per find node/value request!
 CONCURRENT_REQS = 4