+++ /dev/null
-## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
-
-"""Details of how to perform actions on remote peers."""
-
-from twisted.internet import reactor
-from twisted.python import log
-
-from khash import intify
-from util import uncompact
-
-class ActionBase:
- """Base class for some long running asynchronous proccesses like finding nodes or values.
-
- @type caller: L{khashmir.Khashmir}
- @ivar caller: the DHT instance that is performing the action
- @type target: C{string}
- @ivar target: the target of the action, usually a DHT key
- @type config: C{dictionary}
- @ivar config: the configuration variables for the DHT
- @type action: C{string}
- @ivar action: the name of the action to call on remote nodes
- @type num: C{long}
- @ivar num: the target key in integer form
- @type queried: C{dictionary}
- @ivar queried: the nodes that have been queried for this action,
- keys are node IDs, values are the node itself
- @type answered: C{dictionary}
- @ivar answered: the nodes that have answered the queries
- @type found: C{dictionary}
- @ivar found: nodes that have been found so far by the action
- @type sorted_nodes: C{list} of L{node.Node}
- @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
- @type results: C{dictionary}
- @ivar results: keys are the results found so far by the action
- @type desired_results: C{int}
- @ivar desired_results: the minimum number of results that are needed
- before the action should stop
- @type callback: C{method}
- @ivar callback: the method to call with the results
- @type outstanding: C{int}
- @ivar outstanding: the number of requests currently outstanding
- @type outstanding_results: C{int}
- @ivar outstanding_results: the number of results that are expected from
- the requests that are currently outstanding
- @type finished: C{boolean}
- @ivar finished: whether the action is done
- @type sort: C{method}
- @ivar sort: used to sort nodes by their proximity to the target
- """
-
- def __init__(self, caller, target, callback, config, action, num_results = None):
- """Initialize the action.
-
- @type caller: L{khashmir.Khashmir}
- @param caller: the DHT instance that is performing the action
- @type target: C{string}
- @param target: the target of the action, usually a DHT key
- @type callback: C{method}
- @param callback: the method to call with the results
- @type config: C{dictionary}
- @param config: the configuration variables for the DHT
- @type action: C{string}
- @param action: the name of the action to call on remote nodes
- @type num_results: C{int}
- @param num_results: the minimum number of results that are needed before
- the action should stop (optional, defaults to getting all the results)
-
- """
-
- self.caller = caller
- self.target = target
- self.config = config
- self.action = action
- self.num = intify(target)
- self.queried = {}
- self.answered = {}
- self.found = {}
- self.sorted_nodes = []
- self.results = {}
- self.desired_results = num_results
- self.callback = callback
- self.outstanding = 0
- self.outstanding_results = 0
- self.finished = False
-
- def sort(a, b, num=self.num):
- """Sort nodes relative to the ID we are looking for."""
- x, y = num ^ a.num, num ^ b.num
- if x > y:
- return 1
- elif x < y:
- return -1
- return 0
- self.sort = sort
-
- #{ Main operation
- def goWithNodes(self, nodes):
- """Start the action's process with a list of nodes to contact."""
- for node in nodes:
- if node.id == self.caller.node.id:
- continue
- else:
- self.found[node.id] = node
- self.sortNodes()
- self.schedule()
-
- def schedule(self):
- """Schedule requests to be sent to remote nodes."""
- # Check if we are already done
- if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
- (self.desired_results < 0 and
- len(self.answered) >= self.config['STORE_REDUNDANCY'])):
- self.finished = True
- result = self.generateResult()
- reactor.callLater(0, self.callback, *result)
-
- if self.finished or (self.desired_results and
- len(self.results) + self.outstanding_results >= abs(self.desired_results)):
- return
-
- # Loop for each node that should be processed
- for node in self.getNodesToProcess():
- # Don't send requests twice or to ourself
- if node.id not in self.queried and node.id != self.caller.node.id:
- self.queried[node.id] = 1
-
- # Get the action to call on the node
- try:
- f = getattr(node, self.action)
- except AttributeError:
- log.msg("%s doesn't have a %s method!" % (node, self.action))
- else:
- # Get the arguments to the action's method
- try:
- args, expected_results = self.generateArgs(node)
- except ValueError:
- pass
- else:
- # Call the action on the remote node
- self.outstanding += 1
- self.outstanding_results += expected_results
- df = f(self.caller.node.id, *args)
- df.addCallbacks(self.gotResponse, self.actionFailed,
- callbackArgs = (node, expected_results),
- errbackArgs = (node, expected_results))
-
- # We might have to stop for now
- if (self.outstanding >= self.config['CONCURRENT_REQS'] or
- (self.desired_results and
- len(self.results) + self.outstanding_results >= abs(self.desired_results))):
- break
-
- assert self.outstanding >= 0
- assert self.outstanding_results >= 0
-
- # If no requests are outstanding, then we are done
- if self.outstanding == 0:
- self.finished = True
- result = self.generateResult()
- reactor.callLater(0, self.callback, *result)
-
- def gotResponse(self, dict, node, expected_results):
- """Receive a response from a remote node."""
- self.caller.insertNode(node)
- if self.finished or self.answered.has_key(node.id):
- # a day late and a dollar short
- return
- self.outstanding -= 1
- self.outstanding_results -= expected_results
- self.answered[node.id] = 1
- self.processResponse(dict['rsp'])
- self.schedule()
-
- def actionFailed(self, err, node, expected_results):
- """Receive an error from a remote node."""
- log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
- log.err(err)
- self.caller.table.nodeFailed(node)
- self.outstanding -= 1
- self.outstanding_results -= expected_results
- self.schedule()
-
- def handleGotNodes(self, nodes):
- """Process any received node contact info in the response.
-
- Not called by default, but suitable for being called by
- L{processResponse} in a recursive node search.
- """
- for compact_node in nodes:
- node_contact = uncompact(compact_node)
- node = self.caller.Node(node_contact)
- if not self.found.has_key(node.id):
- self.found[node.id] = node
-
- def sortNodes(self):
- """Sort the nodes, if necessary.
-
- Assumes nodes are never removed from the L{found} dictionary.
- """
- if len(self.sorted_nodes) != len(self.found):
- self.sorted_nodes = self.found.values()
- self.sorted_nodes.sort(self.sort)
-
- #{ Subclass for specific actions
- def getNodesToProcess(self):
- """Generate a list of nodes to process next.
-
- This implementation is suitable for a recurring search over all nodes.
- """
- self.sortNodes()
- return self.sorted_nodes[:self.config['K']]
-
- def generateArgs(self, node):
- """Generate the arguments to the node's action.
-
- These arguments will be appended to our node ID when calling the action.
- Also return the number of results expected from this request.
-
- @raise ValueError: if the node should not be queried
- """
- return (self.target, ), 0
-
- def processResponse(self, dict):
- """Process the response dictionary received from the remote node."""
- self.handleGotNodes(dict['nodes'])
-
- def generateResult(self, nodes):
- """Create the final result to return to the L{callback} function."""
- return []
-
-
-class FindNode(ActionBase):
- """Find the closest nodes to the key."""
-
- def __init__(self, caller, target, callback, config, action="findNode"):
- ActionBase.__init__(self, caller, target, callback, config, action)
-
- def processResponse(self, dict):
- """Save the token received from each node."""
- if dict["id"] in self.found:
- self.found[dict["id"]].updateToken(dict.get('token', ''))
- self.handleGotNodes(dict['nodes'])
-
- def generateResult(self):
- """Result is the K closest nodes to the target."""
- self.sortNodes()
- return (self.sorted_nodes[:self.config['K']], )
-
-
-class FindValue(ActionBase):
- """Find the closest nodes to the key and check for values."""
-
- def __init__(self, caller, target, callback, config, action="findValue"):
- ActionBase.__init__(self, caller, target, callback, config, action)
-
- def processResponse(self, dict):
- """Save the number of values each node has."""
- if dict["id"] in self.found:
- self.found[dict["id"]].updateNumValues(dict.get('num', 0))
- self.handleGotNodes(dict['nodes'])
-
- def generateResult(self):
- """Result is the nodes that have values, sorted by proximity to the key."""
- self.sortNodes()
- return ([node for node in self.sorted_nodes if node.num_values > 0], )
-
-
-class GetValue(ActionBase):
- """Retrieve values from a list of nodes."""
-
- def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"):
- """Initialize the action with the locally available results.
-
- @type local_results: C{list} of C{string}
- @param local_results: the values that were available in this node
- """
- ActionBase.__init__(self, caller, target, callback, config, action, num_results)
- if local_results:
- for result in local_results:
- self.results[result] = 1
-
- def getNodesToProcess(self):
- """Nodes are never added, always return the same sorted node list."""
- return self.sorted_nodes
-
- def generateArgs(self, node):
- """Arguments include the number of values to request."""
- if node.num_values > 0:
- # Request all desired results from each node, just to be sure.
- num_values = abs(self.desired_results) - len(self.results)
- assert num_values > 0
- if num_values > node.num_values:
- num_values = 0
- return (self.target, num_values), node.num_values
- else:
- raise ValueError, "Don't try and get values from this node because it doesn't have any"
-
- def processResponse(self, dict):
- """Save the returned values, calling the L{callback} each time there are new ones."""
- if dict.has_key('values'):
- def x(y, z=self.results):
- if not z.has_key(y):
- z[y] = 1
- return y
- else:
- return None
- z = len(dict['values'])
- v = filter(None, map(x, dict['values']))
- if len(v):
- reactor.callLater(0, self.callback, self.target, v)
-
- def generateResult(self):
- """Results have all been returned, now send the empty list to end the action."""
- return (self.target, [])
-
-
-class StoreValue(ActionBase):
- """Store a value in a list of nodes."""
-
- def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"):
- """Initialize the action with the value to store.
-
- @type value: C{string}
- @param value: the value to store in the nodes
- """
- ActionBase.__init__(self, caller, target, callback, config, action, num_results)
- self.value = value
-
- def getNodesToProcess(self):
- """Nodes are never added, always return the same sorted list."""
- return self.sorted_nodes
-
- def generateArgs(self, node):
- """Args include the value to store and the node's token."""
- if node.token:
- return (self.target, self.value, node.token), 1
- else:
- raise ValueError, "Don't store at this node since we don't know it's token"
-
- def processResponse(self, dict):
- """Save the response, though it should be nothin but the ID."""
- self.results[dict["id"]] = dict
-
- def generateResult(self):
- """Return all the response IDs received."""
- return (self.target, self.value, self.results.values())