Standardize the number of values retrieved from the DHT.
authorCameron Dale <camrdale@gmail.com>
Sun, 24 Feb 2008 22:01:13 +0000 (14:01 -0800)
committerCameron Dale <camrdale@gmail.com>
Sun, 24 Feb 2008 22:01:13 +0000 (14:01 -0800)
Added the new RETRIEVE_VALUES configuration parameter.
The desired number of values can now be negative, indicating
that only the closest STORE_REDUNDANCY nodes are to be queried.

apt-dht.conf
apt_dht/apt_dht_conf.py
apt_dht_Khashmir/DHT.py
apt_dht_Khashmir/actions.py
apt_dht_Khashmir/khashmir.py
debian/apt-dht.conf.sgml
test.py

index 8982669..e9a481b 100644 (file)
@@ -77,6 +77,14 @@ CONCURRENT_REQS = 4
 # how many hosts to post values to
 STORE_REDUNDANCY = 3
 
+# How many values to attempt to retrieve from the DHT.
+# Setting this to 0 will try and get all values (which could take a while if
+# a lot of nodes have values). Setting it negative will try to get that
+# number of results from only the closest STORE_REDUNDANCY nodes to the hash.
+# The default is a large negative number so all values from the closest
+# STORE_REDUNDANCY nodes will be retrieved.
+RETRIEVE_VALUES = -10000
+
 # how many times in a row a node can fail to respond before it's booted from the routing table
 MAX_FAILURES = 3
 
index 3fad625..06709c3 100644 (file)
@@ -80,6 +80,14 @@ DHT_DEFAULTS = {
     # how many hosts to post to
     'STORE_REDUNDANCY': '3',
     
+    # How many values to attempt to retrieve from the DHT.
+    # Setting this to 0 will try and get all values (which could take a while if
+    # a lot of nodes have values). Setting it negative will try to get that
+    # number of results from only the closest STORE_REDUNDANCY nodes to the hash.
+    # The default is a large negative number so all values from the closest
+    # STORE_REDUNDANCY nodes will be retrieved.
+    'RETRIEVE_VALUES': '-10000',
+
     ###  ROUTING TABLE STUFF
     # how many times in a row a node can fail to respond before it's booted from the routing table
     'MAX_FAILURES': '3',
index ab206d2..48ae1f4 100644 (file)
@@ -46,7 +46,7 @@ class DHT:
         self.bootstrap_node = self.config_parser.getboolean(section, 'BOOTSTRAP_NODE')
         for k in self.config_parser.options(section):
             if k in ['K', 'HASH_LENGTH', 'CONCURRENT_REQS', 'STORE_REDUNDANCY', 
-                     'MAX_FAILURES', 'PORT']:
+                     'RETRIEVE_VALUES', 'MAX_FAILURES', 'PORT']:
                 self.config[k] = self.config_parser.getint(section, k)
             elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL', 
                        'BUCKET_STALENESS', 'KEY_EXPIRE']:
@@ -184,7 +184,8 @@ class TestSimpleDHT(unittest.TestCase):
     timeout = 2
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
-                    'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+                    'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+                    'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
@@ -276,7 +277,8 @@ class TestMultiDHT(unittest.TestCase):
     num = 20
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
-                    'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+                    'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+                    'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
index c80591a..e244c49 100644 (file)
@@ -50,13 +50,15 @@ class ActionBase:
     def schedule(self):
         """Schedule requests to be sent to remote nodes."""
         # Check if we are already done
-        if self.desired_results and len(self.results) >= self.desired_results:
+        if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
+                                     (self.desired_results < 0 and
+                                      len(self.answered) >= self.config['STORE_REDUNDANCY'])):
             self.finished=1
             result = self.generateResult()
             reactor.callLater(0, self.callback, *result)
 
         if self.finished or (self.desired_results and 
-                             len(self.results) + self.outstanding_results >= self.desired_results):
+                             len(self.results) + self.outstanding_results >= abs(self.desired_results)):
             return
         
         for node in self.getNodesToProcess():
@@ -86,11 +88,13 @@ class ActionBase:
             # We might have to stop for now
             if (self.outstanding >= self.config['CONCURRENT_REQS'] or
                 (self.desired_results and
-                 self.outstanding_results >= self.desired_results)):
+                 len(self.results) + self.outstanding_results >= abs(self.desired_results))):
                 break
             
+        assert self.outstanding >= 0
+        assert self.outstanding_results >= 0
+
         # If no requests are outstanding, then we are done
-        assert self.outstanding >=0
         if self.outstanding == 0:
             self.finished = 1
             result = self.generateResult()
@@ -113,7 +117,6 @@ class ActionBase:
         log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
         log.err(err)
         self.caller.table.nodeFailed(node)
-        self.answered[node.id] = 1
         self.outstanding -= 1
         self.outstanding_results -= expected_results
         self.schedule()
@@ -156,7 +159,7 @@ class ActionBase:
     
     def processResponse(self, dict):
         """Process the response dictionary received from the remote node."""
-        pass
+        self.handleGotNodes(dict['nodes'])
 
     def generateResult(self, nodes):
         """Create the result to return to the callback function."""
@@ -213,7 +216,12 @@ class GetValue(ActionBase):
     def generateArgs(self, node):
         """Args include the number of values to request."""
         if node.num_values > 0:
-            return (self.target, 0), node.num_values
+            # Request all desired results from each node, just to be sure.
+            num_values = abs(self.desired_results) - len(self.results)
+            assert num_values > 0
+            if num_values > node.num_values:
+                num_values = 0
+            return (self.target, num_values), node.num_values
         else:
             raise ValueError, "Don't try and get values from this node because it doesn't have any"
 
index 5895669..eeaab0a 100644 (file)
@@ -253,7 +253,7 @@ class KhashmirRead(KhashmirBase):
 
         def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
             # create our search state
-            state = GetValue(self, key, local_values, 50, response, self.config)
+            state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
             reactor.callLater(0, state.goWithNodes, nodes)
             
         # this call is asynch
@@ -322,7 +322,8 @@ class SimpleTests(unittest.TestCase):
     timeout = 10
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
-                    'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+                    'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+                    'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
@@ -395,7 +396,8 @@ class MultiTest(unittest.TestCase):
     num = 20
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
-                    'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+                    'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+                    'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
index 6ded80c..e1da2a3 100644 (file)
            </listitem>
          </varlistentry>
          <varlistentry>
+           <term><option>RETRIEVE_VALUES = <replaceable>number</replaceable></option></term>
+            <listitem>
+             <para>The <replaceable>number</replaceable> of values to attempt to retrieve from the DHT.
+                 Setting this to 0 will try and get all values (which could take a while if
+                 a lot of nodes have values). Setting it negative will try to get that
+                 number of results from only the closest STORE_REDUNDANCY nodes to the hash.
+                 (Default is -10000, which is a large negative number so all values from the closest
+                 STORE_REDUNDANCY nodes will be retrieved.)</para>
+           </listitem>
+         </varlistentry>
+         <varlistentry>
            <term><option>MAX_FAILURES = <replaceable>number</replaceable></option></term>
             <listitem>
              <para>The <replaceable>number</replaceable> of times in a row a node can fail to
diff --git a/test.py b/test.py
index 5468a1d..d697b37 100755 (executable)
--- a/test.py
+++ b/test.py
@@ -371,6 +371,14 @@ CONCURRENT_REQS = 4
 # how many hosts to post to
 STORE_REDUNDANCY = 3
 
+# How many values to attempt to retrieve from the DHT.
+# Setting this to 0 will try and get all values (which could take a while if
+# a lot of nodes have values). Setting it negative will try to get that
+# number of results from only the closest STORE_REDUNDANCY nodes to the hash.
+# The default is a large negative number so all values from the closest
+# STORE_REDUNDANCY nodes will be retrieved.
+RETRIEVE_VALUES = -10000
+
 # how many times in a row a node can fail to respond before it's booted from the routing table
 MAX_FAILURES = 3