From 6df4eb289b9d83bd19dad8de3ca3de7283af7833 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Sun, 24 Feb 2008 14:01:13 -0800 Subject: [PATCH] Standardize the number of values retrieved from the DHT. Added the new RETRIEVE_VALUES configuration parameter. The desired number of values can now be negative, indicating that only the closest STORE_REDUNDANCY nodes are to be queried. --- apt-dht.conf | 8 ++++++++ apt_dht/apt_dht_conf.py | 8 ++++++++ apt_dht_Khashmir/DHT.py | 8 +++++--- apt_dht_Khashmir/actions.py | 22 +++++++++++++++------- apt_dht_Khashmir/khashmir.py | 8 +++++--- debian/apt-dht.conf.sgml | 11 +++++++++++ test.py | 8 ++++++++ 7 files changed, 60 insertions(+), 13 deletions(-) diff --git a/apt-dht.conf b/apt-dht.conf index 8982669..e9a481b 100644 --- a/apt-dht.conf +++ b/apt-dht.conf @@ -77,6 +77,14 @@ CONCURRENT_REQS = 4 # how many hosts to post values to STORE_REDUNDANCY = 3 +# How many values to attempt to retrieve from the DHT. +# Setting this to 0 will try and get all values (which could take a while if +# a lot of nodes have values). Setting it negative will try to get that +# number of results from only the closest STORE_REDUNDANCY nodes to the hash. +# The default is a large negative number so all values from the closest +# STORE_REDUNDANCY nodes will be retrieved. +RETRIEVE_VALUES = -10000 + # how many times in a row a node can fail to respond before it's booted from the routing table MAX_FAILURES = 3 diff --git a/apt_dht/apt_dht_conf.py b/apt_dht/apt_dht_conf.py index 3fad625..06709c3 100644 --- a/apt_dht/apt_dht_conf.py +++ b/apt_dht/apt_dht_conf.py @@ -80,6 +80,14 @@ DHT_DEFAULTS = { # how many hosts to post to 'STORE_REDUNDANCY': '3', + # How many values to attempt to retrieve from the DHT. + # Setting this to 0 will try and get all values (which could take a while if + # a lot of nodes have values). Setting it negative will try to get that + # number of results from only the closest STORE_REDUNDANCY nodes to the hash. + # The default is a large negative number so all values from the closest + # STORE_REDUNDANCY nodes will be retrieved. + 'RETRIEVE_VALUES': '-10000', + ### ROUTING TABLE STUFF # how many times in a row a node can fail to respond before it's booted from the routing table 'MAX_FAILURES': '3', diff --git a/apt_dht_Khashmir/DHT.py b/apt_dht_Khashmir/DHT.py index ab206d2..48ae1f4 100644 --- a/apt_dht_Khashmir/DHT.py +++ b/apt_dht_Khashmir/DHT.py @@ -46,7 +46,7 @@ class DHT: self.bootstrap_node = self.config_parser.getboolean(section, 'BOOTSTRAP_NODE') for k in self.config_parser.options(section): if k in ['K', 'HASH_LENGTH', 'CONCURRENT_REQS', 'STORE_REDUNDANCY', - 'MAX_FAILURES', 'PORT']: + 'RETRIEVE_VALUES', 'MAX_FAILURES', 'PORT']: self.config[k] = self.config_parser.getint(section, k) elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL', 'BUCKET_STALENESS', 'KEY_EXPIRE']: @@ -184,7 +184,8 @@ class TestSimpleDHT(unittest.TestCase): timeout = 2 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, - 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, + 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, + 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEY_EXPIRE': 3600, 'SPEW': False, } @@ -276,7 +277,8 @@ class TestMultiDHT(unittest.TestCase): num = 20 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, - 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, + 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, + 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEY_EXPIRE': 3600, 'SPEW': False, } diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index c80591a..e244c49 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -50,13 +50,15 @@ class ActionBase: def schedule(self): """Schedule requests to be sent to remote nodes.""" # Check if we are already done - if self.desired_results and len(self.results) >= self.desired_results: + if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or + (self.desired_results < 0 and + len(self.answered) >= self.config['STORE_REDUNDANCY'])): self.finished=1 result = self.generateResult() reactor.callLater(0, self.callback, *result) if self.finished or (self.desired_results and - len(self.results) + self.outstanding_results >= self.desired_results): + len(self.results) + self.outstanding_results >= abs(self.desired_results)): return for node in self.getNodesToProcess(): @@ -86,11 +88,13 @@ class ActionBase: # We might have to stop for now if (self.outstanding >= self.config['CONCURRENT_REQS'] or (self.desired_results and - self.outstanding_results >= self.desired_results)): + len(self.results) + self.outstanding_results >= abs(self.desired_results))): break + assert self.outstanding >= 0 + assert self.outstanding_results >= 0 + # If no requests are outstanding, then we are done - assert self.outstanding >=0 if self.outstanding == 0: self.finished = 1 result = self.generateResult() @@ -113,7 +117,6 @@ class ActionBase: log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port)) log.err(err) self.caller.table.nodeFailed(node) - self.answered[node.id] = 1 self.outstanding -= 1 self.outstanding_results -= expected_results self.schedule() @@ -156,7 +159,7 @@ class ActionBase: def processResponse(self, dict): """Process the response dictionary received from the remote node.""" - pass + self.handleGotNodes(dict['nodes']) def generateResult(self, nodes): """Create the result to return to the callback function.""" @@ -213,7 +216,12 @@ class GetValue(ActionBase): def generateArgs(self, node): """Args include the number of values to request.""" if node.num_values > 0: - return (self.target, 0), node.num_values + # Request all desired results from each node, just to be sure. + num_values = abs(self.desired_results) - len(self.results) + assert num_values > 0 + if num_values > node.num_values: + num_values = 0 + return (self.target, num_values), node.num_values else: raise ValueError, "Don't try and get values from this node because it doesn't have any" diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index 5895669..eeaab0a 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -253,7 +253,7 @@ class KhashmirRead(KhashmirBase): def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self): # create our search state - state = GetValue(self, key, local_values, 50, response, self.config) + state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config) reactor.callLater(0, state.goWithNodes, nodes) # this call is asynch @@ -322,7 +322,8 @@ class SimpleTests(unittest.TestCase): timeout = 10 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, - 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, + 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, + 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEY_EXPIRE': 3600, 'SPEW': False, } @@ -395,7 +396,8 @@ class MultiTest(unittest.TestCase): num = 20 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, - 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, + 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, + 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEY_EXPIRE': 3600, 'SPEW': False, } diff --git a/debian/apt-dht.conf.sgml b/debian/apt-dht.conf.sgml index 6ded80c..e1da2a3 100644 --- a/debian/apt-dht.conf.sgml +++ b/debian/apt-dht.conf.sgml @@ -212,6 +212,17 @@ (Default is 3.) + + + + The number of values to attempt to retrieve from the DHT. + Setting this to 0 will try and get all values (which could take a while if + a lot of nodes have values). Setting it negative will try to get that + number of results from only the closest STORE_REDUNDANCY nodes to the hash. + (Default is -10000, which is a large negative number so all values from the closest + STORE_REDUNDANCY nodes will be retrieved.) + + diff --git a/test.py b/test.py index 5468a1d..d697b37 100755 --- a/test.py +++ b/test.py @@ -371,6 +371,14 @@ CONCURRENT_REQS = 4 # how many hosts to post to STORE_REDUNDANCY = 3 +# How many values to attempt to retrieve from the DHT. +# Setting this to 0 will try and get all values (which could take a while if +# a lot of nodes have values). Setting it negative will try to get that +# number of results from only the closest STORE_REDUNDANCY nodes to the hash. +# The default is a large negative number so all values from the closest +# STORE_REDUNDANCY nodes will be retrieved. +RETRIEVE_VALUES = -10000 + # how many times in a row a node can fail to respond before it's booted from the routing table MAX_FAILURES = 3 -- 2.30.2