Added the new RETRIEVE_VALUES configuration parameter.
The desired number of values can now be negative, indicating
that only the closest STORE_REDUNDANCY nodes are to be queried.
# how many hosts to post values to
STORE_REDUNDANCY = 3
+# How many values to attempt to retrieve from the DHT.
+# Setting this to 0 will try and get all values (which could take a while if
+# a lot of nodes have values). Setting it negative will try to get that
+# number of results from only the closest STORE_REDUNDANCY nodes to the hash.
+# The default is a large negative number so all values from the closest
+# STORE_REDUNDANCY nodes will be retrieved.
+RETRIEVE_VALUES = -10000
+
# how many times in a row a node can fail to respond before it's booted from the routing table
MAX_FAILURES = 3
# how many hosts to post to
'STORE_REDUNDANCY': '3',
+ # How many values to attempt to retrieve from the DHT.
+ # Setting this to 0 will try and get all values (which could take a while if
+ # a lot of nodes have values). Setting it negative will try to get that
+ # number of results from only the closest STORE_REDUNDANCY nodes to the hash.
+ # The default is a large negative number so all values from the closest
+ # STORE_REDUNDANCY nodes will be retrieved.
+ 'RETRIEVE_VALUES': '-10000',
+
### ROUTING TABLE STUFF
# how many times in a row a node can fail to respond before it's booted from the routing table
'MAX_FAILURES': '3',
self.bootstrap_node = self.config_parser.getboolean(section, 'BOOTSTRAP_NODE')
for k in self.config_parser.options(section):
if k in ['K', 'HASH_LENGTH', 'CONCURRENT_REQS', 'STORE_REDUNDANCY',
- 'MAX_FAILURES', 'PORT']:
+ 'RETRIEVE_VALUES', 'MAX_FAILURES', 'PORT']:
self.config[k] = self.config_parser.getint(section, k)
elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL',
'BUCKET_STALENESS', 'KEY_EXPIRE']:
timeout = 2
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
- 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+ 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+ 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEY_EXPIRE': 3600, 'SPEW': False, }
num = 20
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
- 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+ 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+ 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEY_EXPIRE': 3600, 'SPEW': False, }
def schedule(self):
"""Schedule requests to be sent to remote nodes."""
# Check if we are already done
- if self.desired_results and len(self.results) >= self.desired_results:
+ if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
+ (self.desired_results < 0 and
+ len(self.answered) >= self.config['STORE_REDUNDANCY'])):
self.finished=1
result = self.generateResult()
reactor.callLater(0, self.callback, *result)
if self.finished or (self.desired_results and
- len(self.results) + self.outstanding_results >= self.desired_results):
+ len(self.results) + self.outstanding_results >= abs(self.desired_results)):
return
for node in self.getNodesToProcess():
# We might have to stop for now
if (self.outstanding >= self.config['CONCURRENT_REQS'] or
(self.desired_results and
- self.outstanding_results >= self.desired_results)):
+ len(self.results) + self.outstanding_results >= abs(self.desired_results))):
break
+ assert self.outstanding >= 0
+ assert self.outstanding_results >= 0
+
# If no requests are outstanding, then we are done
- assert self.outstanding >=0
if self.outstanding == 0:
self.finished = 1
result = self.generateResult()
log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
log.err(err)
self.caller.table.nodeFailed(node)
- self.answered[node.id] = 1
self.outstanding -= 1
self.outstanding_results -= expected_results
self.schedule()
def processResponse(self, dict):
"""Process the response dictionary received from the remote node."""
- pass
+ self.handleGotNodes(dict['nodes'])
def generateResult(self, nodes):
"""Create the result to return to the callback function."""
def generateArgs(self, node):
"""Args include the number of values to request."""
if node.num_values > 0:
- return (self.target, 0), node.num_values
+ # Request all desired results from each node, just to be sure.
+ num_values = abs(self.desired_results) - len(self.results)
+ assert num_values > 0
+ if num_values > node.num_values:
+ num_values = 0
+ return (self.target, num_values), node.num_values
else:
raise ValueError, "Don't try and get values from this node because it doesn't have any"
def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
# create our search state
- state = GetValue(self, key, local_values, 50, response, self.config)
+ state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
reactor.callLater(0, state.goWithNodes, nodes)
# this call is asynch
timeout = 10
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
- 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+ 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+ 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEY_EXPIRE': 3600, 'SPEW': False, }
num = 20
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
- 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+ 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+ 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEY_EXPIRE': 3600, 'SPEW': False, }
(Default is 3.)</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>RETRIEVE_VALUES = <replaceable>number</replaceable></option></term>
+ <listitem>
+ <para>The <replaceable>number</replaceable> of values to attempt to retrieve from the DHT.
+ Setting this to 0 will try and get all values (which could take a while if
+ a lot of nodes have values). Setting it negative will try to get that
+ number of results from only the closest STORE_REDUNDANCY nodes to the hash.
+ (Default is -10000, which is a large negative number so all values from the closest
+ STORE_REDUNDANCY nodes will be retrieved.)</para>
+ </listitem>
+ </varlistentry>
<varlistentry>
<term><option>MAX_FAILURES = <replaceable>number</replaceable></option></term>
<listitem>
# how many hosts to post to
STORE_REDUNDANCY = 3
+# How many values to attempt to retrieve from the DHT.
+# Setting this to 0 will try and get all values (which could take a while if
+# a lot of nodes have values). Setting it negative will try to get that
+# number of results from only the closest STORE_REDUNDANCY nodes to the hash.
+# The default is a large negative number so all values from the closest
+# STORE_REDUNDANCY nodes will be retrieved.
+RETRIEVE_VALUES = -10000
+
# how many times in a row a node can fail to respond before it's booted from the routing table
MAX_FAILURES = 3