-## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
"""Details of how to perform actions on remote peers."""
-from twisted.internet import reactor
+from datetime import datetime
+
+from twisted.internet import reactor, defer
from twisted.python import log
from khash import intify
+from ktable import K
from util import uncompact
class ActionBase:
@ivar config: the configuration variables for the DHT
@type action: C{string}
@ivar action: the name of the action to call on remote nodes
+ @type stats: L{stats.StatsLogger}
+ @ivar stats: the statistics modules to report to
@type num: C{long}
@ivar num: the target key in integer form
@type queried: C{dictionary}
keys are node IDs, values are the node itself
@type answered: C{dictionary}
@ivar answered: the nodes that have answered the queries
+ @type failed: C{dictionary}
+ @ivar failed: the nodes that have failed to answer the queries
@type found: C{dictionary}
@ivar found: nodes that have been found so far by the action
@type sorted_nodes: C{list} of L{node.Node}
before the action should stop
@type callback: C{method}
@ivar callback: the method to call with the results
- @type outstanding: C{int}
- @ivar outstanding: the number of requests currently outstanding
+ @type outstanding: C{dictionary}
+ @ivar outstanding: the nodes that have outstanding requests for this action,
+ keys are node IDs, values are the number of outstanding results from the node
@type outstanding_results: C{int}
- @ivar outstanding_results: the number of results that are expected from
+ @ivar outstanding_results: the total number of results that are expected from
the requests that are currently outstanding
@type finished: C{boolean}
@ivar finished: whether the action is done
+ @type started: C{datetime.datetime}
+ @ivar started: the time the action was started at
@type sort: C{method}
@ivar sort: used to sort nodes by their proximity to the target
"""
self.target = target
self.config = config
self.action = action
- stats.startedAction(action)
+ self.stats = stats
+ self.stats.startedAction(action)
self.num = intify(target)
self.queried = {}
self.answered = {}
+ self.failed = {}
self.found = {}
self.sorted_nodes = []
self.results = {}
self.desired_results = num_results
self.callback = callback
- self.outstanding = 0
+ self.outstanding = {}
self.outstanding_results = 0
self.finished = False
+ self.started = datetime.now()
def sort(a, b, num=self.num):
"""Sort nodes relative to the ID we are looking for."""
#{ Main operation
def goWithNodes(self, nodes):
"""Start the action's process with a list of nodes to contact."""
+ self.started = datetime.now()
for node in nodes:
- if node.id == self.caller.node.id:
- continue
- else:
- self.found[node.id] = node
+ self.found[node.id] = node
self.sortNodes()
self.schedule()
def schedule(self):
"""Schedule requests to be sent to remote nodes."""
+ if self.finished:
+ return
+
+ # Get the nodes to be processed
+ nodes = self.getNodesToProcess()
+
# Check if we are already done
- if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
- (self.desired_results < 0 and
- len(self.answered) >= self.config['STORE_REDUNDANCY'])):
+ if nodes is None or (self.desired_results and
+ ((len(self.results) >= abs(self.desired_results)) or
+ (self.desired_results < 0 and
+ len(self.answered) >= self.config['STORE_REDUNDANCY']))):
self.finished = True
result = self.generateResult()
reactor.callLater(0, self.callback, *result)
+ return
- if self.finished or (self.desired_results and
- len(self.results) + self.outstanding_results >= abs(self.desired_results)):
+ # Check if we have enough outstanding results coming
+ if (self.desired_results and
+ len(self.results) + self.outstanding_results >= abs(self.desired_results)):
return
# Loop for each node that should be processed
- for node in self.getNodesToProcess():
+ for node in nodes:
# Don't send requests twice or to ourself
- if node.id not in self.queried and node.id != self.caller.node.id:
+ if node.id not in self.queried:
self.queried[node.id] = 1
# Get the action to call on the node
- try:
- f = getattr(node, self.action)
- except AttributeError:
- log.msg("%s doesn't have a %s method!" % (node, self.action))
+ if node.id == self.caller.node.id:
+ try:
+ f = getattr(self.caller, 'krpc_' + self.action)
+ except AttributeError:
+ log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
+ continue
else:
- # Get the arguments to the action's method
try:
- args, expected_results = self.generateArgs(node)
- except ValueError:
- pass
- else:
- # Call the action on the remote node
- self.outstanding += 1
- self.outstanding_results += expected_results
- df = f(self.caller.node.id, *args)
- df.addCallbacks(self.gotResponse, self.actionFailed,
- callbackArgs = (node, expected_results),
- errbackArgs = (node, expected_results))
+ f = getattr(node, self.action)
+ except AttributeError:
+ log.msg("%s doesn't have a %s method!" % (node, self.action))
+ continue
+
+ # Get the arguments to the action's method
+ try:
+ args, expected_results = self.generateArgs(node)
+ except ValueError:
+ continue
+
+ # Call the action on the remote node
+ self.outstanding[node.id] = expected_results
+ self.outstanding_results += expected_results
+ df = defer.maybeDeferred(f, *args)
+ reactor.callLater(0, df.addCallbacks,
+ *(self.gotResponse, self.actionFailed),
+ **{'callbackArgs': (node, ),
+ 'errbackArgs': (node, )})
# We might have to stop for now
- if (self.outstanding >= self.config['CONCURRENT_REQS'] or
+ if (len(self.outstanding) >= self.config['CONCURRENT_REQS'] or
(self.desired_results and
len(self.results) + self.outstanding_results >= abs(self.desired_results))):
break
- assert self.outstanding >= 0
assert self.outstanding_results >= 0
# If no requests are outstanding, then we are done
- if self.outstanding == 0:
+ if len(self.outstanding) == 0:
self.finished = True
result = self.generateResult()
reactor.callLater(0, self.callback, *result)
- def gotResponse(self, dict, node, expected_results):
+ def gotResponse(self, dict, node):
"""Receive a response from a remote node."""
- self.caller.insertNode(node)
+ if node.id != self.caller.node.id:
+ reactor.callLater(0, self.caller.insertNode, node)
if self.finished or self.answered.has_key(node.id):
# a day late and a dollar short
return
- self.outstanding -= 1
- self.outstanding_results -= expected_results
self.answered[node.id] = 1
- self.processResponse(dict['rsp'])
+ self.processResponse(dict)
+ if self.outstanding.has_key(node.id):
+ self.outstanding_results -= self.outstanding[node.id]
+ del self.outstanding[node.id]
self.schedule()
- def actionFailed(self, err, node, expected_results):
+ def actionFailed(self, err, node):
"""Receive an error from a remote node."""
- log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
- log.err(err)
- self.caller.table.nodeFailed(node)
- self.outstanding -= 1
- self.outstanding_results -= expected_results
+ log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage()))
+ if node.id != self.caller.node.id:
+ self.caller.table.nodeFailed(node)
+ self.failed[node.id] = 1
+ if self.outstanding.has_key(node.id):
+ self.outstanding_results -= self.outstanding[node.id]
+ del self.outstanding[node.id]
self.schedule()
def handleGotNodes(self, nodes):
"""Generate a list of nodes to process next.
This implementation is suitable for a recurring search over all nodes.
+ It will stop the search when the closest K nodes have been queried.
+ It also prematurely drops requests to nodes that have fallen way behind.
+
+ @return: sorted list of nodes to query, or None if we are done
"""
+ # Find the K closest nodes that haven't failed, count how many answered
self.sortNodes()
- return self.sorted_nodes[:self.config['K']]
+ closest_K = []
+ ans = 0
+ for node in self.sorted_nodes:
+ if node.id not in self.failed:
+ closest_K.append(node)
+ if node.id in self.answered:
+ ans += 1
+ if len(closest_K) >= K:
+ break
+
+ # If we have responses from the K closest nodes, then we are done
+ if ans >= K:
+ log.msg('Got the answers we need, aborting search')
+ return None
+
+ # Check the oustanding requests to see if they are still closest
+ for id in self.outstanding.keys():
+ if self.found[id] not in closest_K:
+ # Request is not important, allow another to go
+ log.msg("Request to %s/%s is taking too long, moving on" %
+ (self.found[id].host, self.found[id].port))
+ self.outstanding_results -= self.outstanding[id]
+ del self.outstanding[id]
+
+ return closest_K
def generateArgs(self, node):
"""Generate the arguments to the node's action.
- These arguments will be appended to our node ID when calling the action.
Also return the number of results expected from this request.
@raise ValueError: if the node should not be queried
"""
- return (self.target, ), 0
+ return (self.caller.node.id, self.target), 0
def processResponse(self, dict):
"""Process the response dictionary received from the remote node."""
def generateResult(self, nodes):
"""Create the final result to return to the L{callback} function."""
+ self.stats.completedAction(self.action, self.started)
return []
class FindNode(ActionBase):
"""Find the closest nodes to the key."""
- def __init__(self, caller, target, callback, config, stats, action="findNode"):
+ def __init__(self, caller, target, callback, config, stats, action="find_node"):
ActionBase.__init__(self, caller, target, callback, config, stats, action)
def processResponse(self, dict):
def generateResult(self):
"""Result is the K closest nodes to the target."""
self.sortNodes()
- return (self.sorted_nodes[:self.config['K']], )
+ self.stats.completedAction(self.action, self.started)
+ closest_K = []
+ for node in self.sorted_nodes:
+ if node.id not in self.failed:
+ closest_K.append(node)
+ if len(closest_K) >= K:
+ break
+ return (closest_K, )
class FindValue(ActionBase):
"""Find the closest nodes to the key and check for values."""
- def __init__(self, caller, target, callback, config, stats, action="findValue"):
+ def __init__(self, caller, target, callback, config, stats, action="find_value"):
ActionBase.__init__(self, caller, target, callback, config, stats, action)
def processResponse(self, dict):
def generateResult(self):
"""Result is the nodes that have values, sorted by proximity to the key."""
self.sortNodes()
+ self.stats.completedAction(self.action, self.started)
return ([node for node in self.sorted_nodes if node.num_values > 0], )
class GetValue(ActionBase):
"""Retrieve values from a list of nodes."""
- def __init__(self, caller, target, local_results, num_results, callback, config, stats, action="getValue"):
- """Initialize the action with the locally available results.
-
- @type local_results: C{list} of C{string}
- @param local_results: the values that were available in this node
- """
+ def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
- if local_results:
- for result in local_results:
- self.results[result] = 1
def getNodesToProcess(self):
"""Nodes are never added, always return the same sorted node list."""
assert num_values > 0
if num_values > node.num_values:
num_values = 0
- return (self.target, num_values), node.num_values
+ return (self.caller.node.id, self.target, num_values), node.num_values
else:
raise ValueError, "Don't try and get values from this node because it doesn't have any"
def generateResult(self):
"""Results have all been returned, now send the empty list to end the action."""
+ self.stats.completedAction(self.action, self.started)
return (self.target, [])
class StoreValue(ActionBase):
"""Store a value in a list of nodes."""
- def __init__(self, caller, target, value, num_results, callback, config, stats, action="storeValue"):
+ def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
"""Initialize the action with the value to store.
@type value: C{string}
def generateArgs(self, node):
"""Args include the value to store and the node's token."""
if node.token:
- return (self.target, self.value, node.token), 1
+ return (self.caller.node.id, self.target, self.value, node.token), 1
else:
raise ValueError, "Don't store at this node since we don't know it's token"
def generateResult(self):
"""Return all the response IDs received."""
+ self.stats.completedAction(self.action, self.started)
return (self.target, self.value, self.results.values())