1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """Details of how to perform actions on remote peers."""
6 from twisted.internet import reactor, defer
7 from twisted.python import log
9 from khash import intify
10 from util import uncompact
13 """Base class for some long running asynchronous proccesses like finding nodes or values.
15 @type caller: L{khashmir.Khashmir}
16 @ivar caller: the DHT instance that is performing the action
17 @type target: C{string}
18 @ivar target: the target of the action, usually a DHT key
19 @type config: C{dictionary}
20 @ivar config: the configuration variables for the DHT
21 @type action: C{string}
22 @ivar action: the name of the action to call on remote nodes
24 @ivar num: the target key in integer form
25 @type queried: C{dictionary}
26 @ivar queried: the nodes that have been queried for this action,
27 keys are node IDs, values are the node itself
28 @type answered: C{dictionary}
29 @ivar answered: the nodes that have answered the queries
30 @type found: C{dictionary}
31 @ivar found: nodes that have been found so far by the action
32 @type sorted_nodes: C{list} of L{node.Node}
33 @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
34 @type results: C{dictionary}
35 @ivar results: keys are the results found so far by the action
36 @type desired_results: C{int}
37 @ivar desired_results: the minimum number of results that are needed
38 before the action should stop
39 @type callback: C{method}
40 @ivar callback: the method to call with the results
41 @type outstanding: C{int}
42 @ivar outstanding: the number of requests currently outstanding
43 @type outstanding_results: C{int}
44 @ivar outstanding_results: the number of results that are expected from
45 the requests that are currently outstanding
46 @type finished: C{boolean}
47 @ivar finished: whether the action is done
49 @ivar sort: used to sort nodes by their proximity to the target
52 def __init__(self, caller, target, callback, config, stats, action, num_results = None):
53 """Initialize the action.
55 @type caller: L{khashmir.Khashmir}
56 @param caller: the DHT instance that is performing the action
57 @type target: C{string}
58 @param target: the target of the action, usually a DHT key
59 @type callback: C{method}
60 @param callback: the method to call with the results
61 @type config: C{dictionary}
62 @param config: the configuration variables for the DHT
63 @type stats: L{stats.StatsLogger}
64 @param stats: the statistics gatherer
65 @type action: C{string}
66 @param action: the name of the action to call on remote nodes
67 @type num_results: C{int}
68 @param num_results: the minimum number of results that are needed before
69 the action should stop (optional, defaults to getting all the results)
77 stats.startedAction(action)
78 self.num = intify(target)
82 self.sorted_nodes = []
84 self.desired_results = num_results
85 self.callback = callback
87 self.outstanding_results = 0
90 def sort(a, b, num=self.num):
91 """Sort nodes relative to the ID we are looking for."""
92 x, y = num ^ a.num, num ^ b.num
101 def goWithNodes(self, nodes):
102 """Start the action's process with a list of nodes to contact."""
104 self.found[node.id] = node
109 """Schedule requests to be sent to remote nodes."""
110 # Check if we are already done
111 if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
112 (self.desired_results < 0 and
113 len(self.answered) >= self.config['STORE_REDUNDANCY'])):
115 result = self.generateResult()
116 reactor.callLater(0, self.callback, *result)
118 if self.finished or (self.desired_results and
119 len(self.results) + self.outstanding_results >= abs(self.desired_results)):
122 # Loop for each node that should be processed
123 for node in self.getNodesToProcess():
124 # Don't send requests twice or to ourself
125 if node.id not in self.queried:
126 self.queried[node.id] = 1
128 # Get the action to call on the node
129 if node.id == self.caller.node.id:
131 f = getattr(self.caller, 'krpc_' + self.action)
132 except AttributeError:
133 log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
137 f = getattr(node, self.action)
138 except AttributeError:
139 log.msg("%s doesn't have a %s method!" % (node, self.action))
142 # Get the arguments to the action's method
144 args, expected_results = self.generateArgs(node)
148 # Call the action on the remote node
149 self.outstanding += 1
150 self.outstanding_results += expected_results
151 df = defer.maybeDeferred(f, *args)
152 reactor.callLater(0, df.addCallbacks,
153 *(self.gotResponse, self.actionFailed),
154 **{'callbackArgs': (node, expected_results, df),
155 'errbackArgs': (node, expected_results, df)})
157 # We might have to stop for now
158 if (self.outstanding >= self.config['CONCURRENT_REQS'] or
159 (self.desired_results and
160 len(self.results) + self.outstanding_results >= abs(self.desired_results))):
163 assert self.outstanding >= 0
164 assert self.outstanding_results >= 0
166 # If no requests are outstanding, then we are done
167 if self.outstanding == 0:
169 result = self.generateResult()
170 reactor.callLater(0, self.callback, *result)
172 def gotResponse(self, dict, node, expected_results, df):
173 """Receive a response from a remote node."""
174 self.caller.insertNode(node)
175 if self.finished or self.answered.has_key(node.id):
176 # a day late and a dollar short
178 self.outstanding -= 1
179 self.outstanding_results -= expected_results
180 self.answered[node.id] = 1
181 self.processResponse(dict)
184 def actionFailed(self, err, node, expected_results, df):
185 """Receive an error from a remote node."""
186 log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
188 self.caller.table.nodeFailed(node)
189 self.outstanding -= 1
190 self.outstanding_results -= expected_results
193 def handleGotNodes(self, nodes):
194 """Process any received node contact info in the response.
196 Not called by default, but suitable for being called by
197 L{processResponse} in a recursive node search.
199 for compact_node in nodes:
200 node_contact = uncompact(compact_node)
201 node = self.caller.Node(node_contact)
202 if not self.found.has_key(node.id):
203 self.found[node.id] = node
206 """Sort the nodes, if necessary.
208 Assumes nodes are never removed from the L{found} dictionary.
210 if len(self.sorted_nodes) != len(self.found):
211 self.sorted_nodes = self.found.values()
212 self.sorted_nodes.sort(self.sort)
214 #{ Subclass for specific actions
215 def getNodesToProcess(self):
216 """Generate a list of nodes to process next.
218 This implementation is suitable for a recurring search over all nodes.
221 return self.sorted_nodes[:self.config['K']]
223 def generateArgs(self, node):
224 """Generate the arguments to the node's action.
226 Also return the number of results expected from this request.
228 @raise ValueError: if the node should not be queried
230 return (self.caller.node.id, self.target), 0
232 def processResponse(self, dict):
233 """Process the response dictionary received from the remote node."""
234 self.handleGotNodes(dict['nodes'])
236 def generateResult(self, nodes):
237 """Create the final result to return to the L{callback} function."""
241 class FindNode(ActionBase):
242 """Find the closest nodes to the key."""
244 def __init__(self, caller, target, callback, config, stats, action="find_node"):
245 ActionBase.__init__(self, caller, target, callback, config, stats, action)
247 def processResponse(self, dict):
248 """Save the token received from each node."""
249 if dict["id"] in self.found:
250 self.found[dict["id"]].updateToken(dict.get('token', ''))
251 self.handleGotNodes(dict['nodes'])
253 def generateResult(self):
254 """Result is the K closest nodes to the target."""
256 return (self.sorted_nodes[:self.config['K']], )
259 class FindValue(ActionBase):
260 """Find the closest nodes to the key and check for values."""
262 def __init__(self, caller, target, callback, config, stats, action="find_value"):
263 ActionBase.__init__(self, caller, target, callback, config, stats, action)
265 def processResponse(self, dict):
266 """Save the number of values each node has."""
267 if dict["id"] in self.found:
268 self.found[dict["id"]].updateNumValues(dict.get('num', 0))
269 self.handleGotNodes(dict['nodes'])
271 def generateResult(self):
272 """Result is the nodes that have values, sorted by proximity to the key."""
274 return ([node for node in self.sorted_nodes if node.num_values > 0], )
277 class GetValue(ActionBase):
278 """Retrieve values from a list of nodes."""
280 def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
281 ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
283 def getNodesToProcess(self):
284 """Nodes are never added, always return the same sorted node list."""
285 return self.sorted_nodes
287 def generateArgs(self, node):
288 """Arguments include the number of values to request."""
289 if node.num_values > 0:
290 # Request all desired results from each node, just to be sure.
291 num_values = abs(self.desired_results) - len(self.results)
292 assert num_values > 0
293 if num_values > node.num_values:
295 return (self.caller.node.id, self.target, num_values), node.num_values
297 raise ValueError, "Don't try and get values from this node because it doesn't have any"
299 def processResponse(self, dict):
300 """Save the returned values, calling the L{callback} each time there are new ones."""
301 if dict.has_key('values'):
302 def x(y, z=self.results):
308 z = len(dict['values'])
309 v = filter(None, map(x, dict['values']))
311 reactor.callLater(0, self.callback, self.target, v)
313 def generateResult(self):
314 """Results have all been returned, now send the empty list to end the action."""
315 return (self.target, [])
318 class StoreValue(ActionBase):
319 """Store a value in a list of nodes."""
321 def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
322 """Initialize the action with the value to store.
324 @type value: C{string}
325 @param value: the value to store in the nodes
327 ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
330 def getNodesToProcess(self):
331 """Nodes are never added, always return the same sorted list."""
332 return self.sorted_nodes
334 def generateArgs(self, node):
335 """Args include the value to store and the node's token."""
337 return (self.caller.node.id, self.target, self.value, node.token), 1
339 raise ValueError, "Don't store at this node since we don't know it's token"
341 def processResponse(self, dict):
342 """Save the response, though it should be nothin but the ID."""
343 self.results[dict["id"]] = dict
345 def generateResult(self):
346 """Return all the response IDs received."""
347 return (self.target, self.value, self.results.values())