## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
# see LICENSE.txt for license information
+"""Details of how to perform actions on remote peers."""
+
from twisted.internet import reactor
+from twisted.python import log
from khash import intify
+from util import uncompact
class ActionBase:
- """ base class for some long running asynchronous proccesses like finding nodes or values """
- def __init__(self, caller, target, callback, config):
+ """Base class for some long running asynchronous proccesses like finding nodes or values.
+
+ @type caller: L{khashmir.Khashmir}
+ @ivar caller: the DHT instance that is performing the action
+ @type target: C{string}
+ @ivar target: the target of the action, usually a DHT key
+ @type config: C{dictionary}
+ @ivar config: the configuration variables for the DHT
+ @type action: C{string}
+ @ivar action: the name of the action to call on remote nodes
+ @type num: C{long}
+ @ivar num: the target key in integer form
+ @type queried: C{dictionary}
+ @ivar queried: the nodes that have been queried for this action,
+ keys are node IDs, values are the node itself
+ @type answered: C{dictionary}
+ @ivar answered: the nodes that have answered the queries
+ @type found: C{dictionary}
+ @ivar found: nodes that have been found so far by the action
+ @type sorted_nodes: C{list} of L{node.Node}
+ @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
+ @type results: C{dictionary}
+ @ivar results: keys are the results found so far by the action
+ @type desired_results: C{int}
+ @ivar desired_results: the minimum number of results that are needed
+ before the action should stop
+ @type callback: C{method}
+ @ivar callback: the method to call with the results
+ @type outstanding: C{int}
+ @ivar outstanding: the number of requests currently outstanding
+ @type outstanding_results: C{int}
+ @ivar outstanding_results: the number of results that are expected from
+ the requests that are currently outstanding
+ @type finished: C{boolean}
+ @ivar finished: whether the action is done
+ @type sort: C{method}
+ @ivar sort: used to sort nodes by their proximity to the target
+ """
+
+ def __init__(self, caller, target, callback, config, action, num_results = None):
+ """Initialize the action.
+
+ @type caller: L{khashmir.Khashmir}
+ @param caller: the DHT instance that is performing the action
+ @type target: C{string}
+ @param target: the target of the action, usually a DHT key
+ @type callback: C{method}
+ @param callback: the method to call with the results
+ @type config: C{dictionary}
+ @param config: the configuration variables for the DHT
+ @type action: C{string}
+ @param action: the name of the action to call on remote nodes
+ @type num_results: C{int}
+ @param num_results: the minimum number of results that are needed before
+ the action should stop (optional, defaults to getting all the results)
+
+ """
+
self.caller = caller
self.target = target
self.config = config
+ self.action = action
self.num = intify(target)
- self.found = {}
self.queried = {}
self.answered = {}
+ self.found = {}
+ self.sorted_nodes = []
+ self.results = {}
+ self.desired_results = num_results
self.callback = callback
self.outstanding = 0
+ self.outstanding_results = 0
self.finished = 0
def sort(a, b, num=self.num):
- """ this function is for sorting nodes relative to the ID we are looking for """
+ """Sort nodes relative to the ID we are looking for."""
x, y = num ^ a.num, num ^ b.num
if x > y:
return 1
return -1
return 0
self.sort = sort
-
- def goWithNodes(self, t):
- pass
-
+
+ #{ Main operation
+ def goWithNodes(self, nodes):
+ """Start the action's process with a list of nodes to contact."""
+ for node in nodes:
+ if node.id == self.caller.node.id:
+ continue
+ else:
+ self.found[node.id] = node
+ self.sortNodes()
+ self.schedule()
+ def schedule(self):
+ """Schedule requests to be sent to remote nodes."""
+ # Check if we are already done
+ if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
+ (self.desired_results < 0 and
+ len(self.answered) >= self.config['STORE_REDUNDANCY'])):
+ self.finished=1
+ result = self.generateResult()
+ reactor.callLater(0, self.callback, *result)
+
+ if self.finished or (self.desired_results and
+ len(self.results) + self.outstanding_results >= abs(self.desired_results)):
+ return
+
+ # Loop for each node that should be processed
+ for node in self.getNodesToProcess():
+ # Don't send requests twice or to ourself
+ if node.id not in self.queried and node.id != self.caller.node.id:
+ self.queried[node.id] = 1
+
+ # Get the action to call on the node
+ try:
+ f = getattr(node, self.action)
+ except AttributeError:
+ log.msg("%s doesn't have a %s method!" % (node, self.action))
+ else:
+ # Get the arguments to the action's method
+ try:
+ args, expected_results = self.generateArgs(node)
+ except ValueError:
+ pass
+ else:
+ # Call the action on the remote node
+ self.outstanding += 1
+ self.outstanding_results += expected_results
+ df = f(self.caller.node.id, *args)
+ df.addCallbacks(self.gotResponse, self.actionFailed,
+ callbackArgs = (node, expected_results),
+ errbackArgs = (node, expected_results))
+
+ # We might have to stop for now
+ if (self.outstanding >= self.config['CONCURRENT_REQS'] or
+ (self.desired_results and
+ len(self.results) + self.outstanding_results >= abs(self.desired_results))):
+ break
+
+ assert self.outstanding >= 0
+ assert self.outstanding_results >= 0
-FIND_NODE_TIMEOUT = 15
+ # If no requests are outstanding, then we are done
+ if self.outstanding == 0:
+ self.finished = 1
+ result = self.generateResult()
+ reactor.callLater(0, self.callback, *result)
-class FindNode(ActionBase):
- """ find node action merits it's own class as it is a long running stateful process """
- def handleGotNodes(self, dict):
- _krpc_sender = dict['_krpc_sender']
- dict = dict['rsp']
- n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
- self.caller.insertNode(n)
- l = dict["nodes"]
- if self.finished or self.answered.has_key(dict["id"]):
+ def gotResponse(self, dict, node, expected_results):
+ """Receive a response from a remote node."""
+ self.caller.insertNode(node)
+ if self.finished or self.answered.has_key(node.id):
# a day late and a dollar short
return
- self.outstanding = self.outstanding - 1
- self.answered[dict["id"]] = 1
- for node in l:
- n = self.caller.Node(node)
- if not self.found.has_key(n.id):
- self.found[n.id] = n
+ self.outstanding -= 1
+ self.outstanding_results -= expected_results
+ self.answered[node.id] = 1
+ self.processResponse(dict['rsp'])
+ self.schedule()
+
+ def actionFailed(self, err, node, expected_results):
+ """Receive an error from a remote node."""
+ log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
+ log.err(err)
+ self.caller.table.nodeFailed(node)
+ self.outstanding -= 1
+ self.outstanding_results -= expected_results
self.schedule()
+
+ def handleGotNodes(self, nodes):
+ """Process any received node contact info in the response.
- def schedule(self):
+ Not called by default, but suitable for being called by
+ L{processResponse} in a recursive node search.
"""
- send messages to new peers, if necessary
+ for compact_node in nodes:
+ node_contact = uncompact(compact_node)
+ node = self.caller.Node(node_contact)
+ if not self.found.has_key(node.id):
+ self.found[node.id] = node
+
+ def sortNodes(self):
+ """Sort the nodes, if necessary.
+
+ Assumes nodes are never removed from the L{found} dictionary.
"""
- if self.finished:
- return
- l = self.found.values()
- l.sort(self.sort)
- for node in l[:self.config['K']]:
- if node.id == self.target:
- self.finished=1
- return self.callback([node])
- if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
- #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
- df = node.findNode(self.target, self.caller.node.id)
- df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
- self.outstanding = self.outstanding + 1
- self.queried[node.id] = 1
- if self.outstanding >= self.config['CONCURRENT_REQS']:
- break
- assert self.outstanding >=0
- if self.outstanding == 0:
- ## all done!!
- self.finished=1
- reactor.callLater(0, self.callback, l[:self.config['K']])
-
- def makeMsgFailed(self, node):
- def defaultGotNodes(err, self=self, node=node):
- print ">>> find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port), err
- self.caller.table.nodeFailed(node)
- self.outstanding = self.outstanding - 1
- self.schedule()
- return defaultGotNodes
-
- def goWithNodes(self, nodes):
+ if len(self.sorted_nodes) != len(self.found):
+ self.sorted_nodes = self.found.values()
+ self.sorted_nodes.sort(self.sort)
+
+ #{ Subclass for specific actions
+ def getNodesToProcess(self):
+ """Generate a list of nodes to process next.
+
+ This implementation is suitable for a recurring search over all nodes.
"""
- this starts the process, our argument is a transaction with t.extras being our list of nodes
- it's a transaction since we got called from the dispatcher
+ self.sortNodes()
+ return self.sorted_nodes[:self.config['K']]
+
+ def generateArgs(self, node):
+ """Generate the arguments to the node's action.
+
+ These arguments will be appended to our node ID when calling the action.
+ Also return the number of results expected from this request.
+
+ @raise ValueError: if the node should not be queried
"""
- for node in nodes:
- if node.id == self.caller.node.id:
- continue
- else:
- self.found[node.id] = node
+ return (self.target, ), 0
+
+ def processResponse(self, dict):
+ """Process the response dictionary received from the remote node."""
+ self.handleGotNodes(dict['nodes'])
+
+ def generateResult(self, nodes):
+ """Create the final result to return to the L{callback} function."""
+ return []
- self.schedule()
+
+class FindNode(ActionBase):
+ """Find the closest nodes to the key."""
+
+ def __init__(self, caller, target, callback, config, action="findNode"):
+ ActionBase.__init__(self, caller, target, callback, config, action)
+
+ def processResponse(self, dict):
+ """Save the token received from each node."""
+ if dict["id"] in self.found:
+ self.found[dict["id"]].updateToken(dict.get('token', ''))
+ self.handleGotNodes(dict['nodes'])
+
+ def generateResult(self):
+ """Result is the K closest nodes to the target."""
+ self.sortNodes()
+ return (self.sorted_nodes[:self.config['K']], )
-get_value_timeout = 15
-class GetValue(FindNode):
- def __init__(self, caller, target, callback, config, find="findValue"):
- FindNode.__init__(self, caller, target, callback, config)
- self.findValue = find
-
- """ get value task """
- def handleGotNodes(self, dict):
- _krpc_sender = dict['_krpc_sender']
- dict = dict['rsp']
- n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
- self.caller.insertNode(n)
- if self.finished or self.answered.has_key(dict["id"]):
- # a day late and a dollar short
- return
- self.outstanding = self.outstanding - 1
- self.answered[dict["id"]] = 1
- # go through nodes
- # if we have any closer than what we already got, query them
- if dict.has_key('nodes'):
- for node in dict['nodes']:
- n = self.caller.Node(node)
- if not self.found.has_key(n.id):
- self.found[n.id] = n
- elif dict.has_key('values'):
+class FindValue(ActionBase):
+ """Find the closest nodes to the key and check for values."""
+
+ def __init__(self, caller, target, callback, config, action="findValue"):
+ ActionBase.__init__(self, caller, target, callback, config, action)
+
+ def processResponse(self, dict):
+ """Save the number of values each node has."""
+ if dict["id"] in self.found:
+ self.found[dict["id"]].updateNumValues(dict.get('num', 0))
+ self.handleGotNodes(dict['nodes'])
+
+ def generateResult(self):
+ """Result is the nodes that have values, sorted by proximity to the key."""
+ self.sortNodes()
+ return ([node for node in self.sorted_nodes if node.num_values > 0], )
+
+
+class GetValue(ActionBase):
+ """Retrieve values from a list of nodes."""
+
+ def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"):
+ """Initialize the action with the locally available results.
+
+ @type local_results: C{list} of C{string}
+ @param local_results: the values that were available in this node
+ """
+ ActionBase.__init__(self, caller, target, callback, config, action, num_results)
+ if local_results:
+ for result in local_results:
+ self.results[result] = 1
+
+ def getNodesToProcess(self):
+ """Nodes are never added, always return the same sorted node list."""
+ return self.sorted_nodes
+
+ def generateArgs(self, node):
+ """Arguments include the number of values to request."""
+ if node.num_values > 0:
+ # Request all desired results from each node, just to be sure.
+ num_values = abs(self.desired_results) - len(self.results)
+ assert num_values > 0
+ if num_values > node.num_values:
+ num_values = 0
+ return (self.target, num_values), node.num_values
+ else:
+ raise ValueError, "Don't try and get values from this node because it doesn't have any"
+
+ def processResponse(self, dict):
+ """Save the returned values, calling the L{callback} each time there are new ones."""
+ if dict.has_key('values'):
def x(y, z=self.results):
if not z.has_key(y):
z[y] = 1
return None
z = len(dict['values'])
v = filter(None, map(x, dict['values']))
- if(len(v)):
+ if len(v):
reactor.callLater(0, self.callback, self.target, v)
- self.schedule()
-
- ## get value
- def schedule(self):
- if self.finished:
- return
- l = self.found.values()
- l.sort(self.sort)
-
- for node in l[:self.config['K']]:
- if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
- #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
- try:
- f = getattr(node, self.findValue)
- except AttributeError:
- print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
- else:
- df = f(self.target, self.caller.node.id)
- df.addCallback(self.handleGotNodes)
- df.addErrback(self.makeMsgFailed(node))
- self.outstanding = self.outstanding + 1
- self.queried[node.id] = 1
- if self.outstanding >= self.config['CONCURRENT_REQS']:
- break
- assert self.outstanding >=0
- if self.outstanding == 0:
- ## all done, didn't find it!!
- self.finished=1
- reactor.callLater(0, self.callback, self.target, [])
-
- ## get value
- def goWithNodes(self, nodes, found=None):
- self.results = {}
- if found:
- for n in found:
- self.results[n] = 1
- for node in nodes:
- if node.id == self.caller.node.id:
- continue
- else:
- self.found[node.id] = node
-
- self.schedule()
+ def generateResult(self):
+ """Results have all been returned, now send the empty list to end the action."""
+ return (self.target, [])
+
class StoreValue(ActionBase):
- def __init__(self, caller, target, value, originated, callback, config, store="storeValue"):
- ActionBase.__init__(self, caller, target, callback, config)
+ """Store a value in a list of nodes."""
+
+ def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"):
+ """Initialize the action with the value to store.
+
+ @type value: C{string}
+ @param value: the value to store in the nodes
+ """
+ ActionBase.__init__(self, caller, target, callback, config, action, num_results)
self.value = value
- self.originated = originated
- self.stored = []
- self.store = store
- def storedValue(self, t, node):
- self.outstanding -= 1
- self.caller.insertNode(node)
- if self.finished:
- return
- self.stored.append(t)
- if len(self.stored) >= self.config['STORE_REDUNDANCY']:
- self.finished=1
- self.callback(self.target, self.value, self.stored)
- else:
- if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
- self.schedule()
- return t
-
- def storeFailed(self, t, node):
- print ">>> store failed %s/%s" % (node.host, node.port)
- self.caller.nodeFailed(node)
- self.outstanding -= 1
- if self.finished:
- return t
- self.schedule()
- return t
-
- def schedule(self):
- if self.finished:
- return
- num = self.config['CONCURRENT_REQS'] - self.outstanding
- if num > self.config['STORE_REDUNDANCY']:
- num = self.config['STORE_REDUNDANCY']
- for i in range(num):
- try:
- node = self.nodes.pop()
- except IndexError:
- if self.outstanding == 0:
- self.finished = 1
- self.callback(self.target, self.value, self.stored)
- else:
- if not node.id == self.caller.node.id:
- self.outstanding += 1
- try:
- f = getattr(node, self.store)
- except AttributeError:
- print ">>> %s doesn't have a %s method!" % (node, self.store)
- else:
- df = f(self.target, self.value, self.originated, self.caller.node.id)
- df.addCallback(self.storedValue, node=node)
- df.addErrback(self.storeFailed, node=node)
-
- def goWithNodes(self, nodes):
- self.nodes = nodes
- self.nodes.sort(self.sort)
- self.schedule()
+ def getNodesToProcess(self):
+ """Nodes are never added, always return the same sorted list."""
+ return self.sorted_nodes
+ def generateArgs(self, node):
+ """Args include the value to store and the node's token."""
+ if node.token:
+ return (self.target, self.value, node.token), 1
+ else:
+ raise ValueError, "Don't store at this node since we don't know it's token"
-class KeyExpirer:
- def __init__(self, store, config):
- self.store = store
- self.config = config
- self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
+ def processResponse(self, dict):
+ """Save the response, though it should be nothin but the ID."""
+ self.results[dict["id"]] = dict
- def doExpire(self):
- self.store.expireValues(self.config['KE_AGE'])
- self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
-
- def shutdown(self):
- try:
- self.next_expire.cancel()
- except:
- pass
+ def generateResult(self):
+ """Return all the response IDs received."""
+ return (self.target, self.value, self.results.values())