2 """Details of how to perform actions on remote peers."""
4 from datetime import datetime
6 from twisted.internet import reactor, defer
7 from twisted.python import log
9 from khash import intify
11 from util import uncompact
14 """Base class for some long running asynchronous proccesses like finding nodes or values.
16 @type caller: L{khashmir.Khashmir}
17 @ivar caller: the DHT instance that is performing the action
18 @type target: C{string}
19 @ivar target: the target of the action, usually a DHT key
20 @type config: C{dictionary}
21 @ivar config: the configuration variables for the DHT
22 @type action: C{string}
23 @ivar action: the name of the action to call on remote nodes
24 @type stats: L{stats.StatsLogger}
25 @ivar stats: the statistics modules to report to
27 @ivar num: the target key in integer form
28 @type queried: C{dictionary}
29 @ivar queried: the nodes that have been queried for this action,
30 keys are node IDs, values are the node itself
31 @type answered: C{dictionary}
32 @ivar answered: the nodes that have answered the queries
33 @type failed: C{dictionary}
34 @ivar failed: the nodes that have failed to answer the queries
35 @type found: C{dictionary}
36 @ivar found: nodes that have been found so far by the action
37 @type sorted_nodes: C{list} of L{node.Node}
38 @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
39 @type results: C{dictionary}
40 @ivar results: keys are the results found so far by the action
41 @type desired_results: C{int}
42 @ivar desired_results: the minimum number of results that are needed
43 before the action should stop
44 @type callback: C{method}
45 @ivar callback: the method to call with the results
46 @type outstanding: C{dictionary}
47 @ivar outstanding: the nodes that have outstanding requests for this action,
48 keys are node IDs, values are the number of outstanding results from the node
49 @type outstanding_results: C{int}
50 @ivar outstanding_results: the total number of results that are expected from
51 the requests that are currently outstanding
52 @type finished: C{boolean}
53 @ivar finished: whether the action is done
54 @type started: C{datetime.datetime}
55 @ivar started: the time the action was started at
57 @ivar sort: used to sort nodes by their proximity to the target
60 def __init__(self, caller, target, callback, config, stats, action, num_results = None):
61 """Initialize the action.
63 @type caller: L{khashmir.Khashmir}
64 @param caller: the DHT instance that is performing the action
65 @type target: C{string}
66 @param target: the target of the action, usually a DHT key
67 @type callback: C{method}
68 @param callback: the method to call with the results
69 @type config: C{dictionary}
70 @param config: the configuration variables for the DHT
71 @type stats: L{stats.StatsLogger}
72 @param stats: the statistics gatherer
73 @type action: C{string}
74 @param action: the name of the action to call on remote nodes
75 @type num_results: C{int}
76 @param num_results: the minimum number of results that are needed before
77 the action should stop (optional, defaults to getting all the results)
86 self.stats.startedAction(action)
87 self.num = intify(target)
92 self.sorted_nodes = []
94 self.desired_results = num_results
95 self.callback = callback
97 self.outstanding_results = 0
99 self.started = datetime.now()
101 def sort(a, b, num=self.num):
102 """Sort nodes relative to the ID we are looking for."""
103 x, y = num ^ a.num, num ^ b.num
112 def goWithNodes(self, nodes):
113 """Start the action's process with a list of nodes to contact."""
114 self.started = datetime.now()
116 self.found[node.id] = node
121 """Schedule requests to be sent to remote nodes."""
125 # Get the nodes to be processed
126 nodes = self.getNodesToProcess()
128 # Check if we are already done
129 if nodes is None or (self.desired_results and
130 ((len(self.results) >= abs(self.desired_results)) or
131 (self.desired_results < 0 and
132 len(self.answered) >= self.config['STORE_REDUNDANCY']))):
134 result = self.generateResult()
135 reactor.callLater(0, self.callback, *result)
138 # Check if we have enough outstanding results coming
139 if (self.desired_results and
140 len(self.results) + self.outstanding_results >= abs(self.desired_results)):
143 # Loop for each node that should be processed
145 # Don't send requests twice or to ourself
146 if node.id not in self.queried:
147 self.queried[node.id] = 1
149 # Get the action to call on the node
150 if node.id == self.caller.node.id:
152 f = getattr(self.caller, 'krpc_' + self.action)
153 except AttributeError:
154 log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
158 f = getattr(node, self.action)
159 except AttributeError:
160 log.msg("%s doesn't have a %s method!" % (node, self.action))
163 # Get the arguments to the action's method
165 args, expected_results = self.generateArgs(node)
169 # Call the action on the remote node
170 self.outstanding[node.id] = expected_results
171 self.outstanding_results += expected_results
172 df = defer.maybeDeferred(f, *args)
173 reactor.callLater(0, df.addCallbacks,
174 *(self.gotResponse, self.actionFailed),
175 **{'callbackArgs': (node, ),
176 'errbackArgs': (node, )})
178 # We might have to stop for now
179 if (len(self.outstanding) >= self.config['CONCURRENT_REQS'] or
180 (self.desired_results and
181 len(self.results) + self.outstanding_results >= abs(self.desired_results))):
184 assert self.outstanding_results >= 0
186 # If no requests are outstanding, then we are done
187 if len(self.outstanding) == 0:
189 result = self.generateResult()
190 reactor.callLater(0, self.callback, *result)
192 def gotResponse(self, dict, node):
193 """Receive a response from a remote node."""
194 if node.id != self.caller.node.id:
195 reactor.callLater(0, self.caller.insertNode, node)
196 if self.finished or self.answered.has_key(node.id):
197 # a day late and a dollar short
199 self.answered[node.id] = 1
200 self.processResponse(dict)
201 if self.outstanding.has_key(node.id):
202 self.outstanding_results -= self.outstanding[node.id]
203 del self.outstanding[node.id]
206 def actionFailed(self, err, node):
207 """Receive an error from a remote node."""
208 log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage()))
209 if node.id != self.caller.node.id:
210 self.caller.table.nodeFailed(node)
211 self.failed[node.id] = 1
212 if self.outstanding.has_key(node.id):
213 self.outstanding_results -= self.outstanding[node.id]
214 del self.outstanding[node.id]
217 def handleGotNodes(self, nodes):
218 """Process any received node contact info in the response.
220 Not called by default, but suitable for being called by
221 L{processResponse} in a recursive node search.
223 for compact_node in nodes:
224 node_contact = uncompact(compact_node)
225 node = self.caller.Node(node_contact)
226 if not self.found.has_key(node.id):
227 self.found[node.id] = node
230 """Sort the nodes, if necessary.
232 Assumes nodes are never removed from the L{found} dictionary.
234 if len(self.sorted_nodes) != len(self.found):
235 self.sorted_nodes = self.found.values()
236 self.sorted_nodes.sort(self.sort)
238 #{ Subclass for specific actions
239 def getNodesToProcess(self):
240 """Generate a list of nodes to process next.
242 This implementation is suitable for a recurring search over all nodes.
243 It will stop the search when the closest K nodes have been queried.
244 It also prematurely drops requests to nodes that have fallen way behind.
246 @return: sorted list of nodes to query, or None if we are done
248 # Find the K closest nodes that haven't failed, count how many answered
252 for node in self.sorted_nodes:
253 if node.id not in self.failed:
254 closest_K.append(node)
255 if node.id in self.answered:
257 if len(closest_K) >= K:
260 # If we have responses from the K closest nodes, then we are done
262 log.msg('Got the answers we need, aborting search')
265 # Check the oustanding requests to see if they are still closest
266 for id in self.outstanding.keys():
267 if self.found[id] not in closest_K:
268 # Request is not important, allow another to go
269 log.msg("Request to %s/%s is taking too long, moving on" %
270 (self.found[id].host, self.found[id].port))
271 self.outstanding_results -= self.outstanding[id]
272 del self.outstanding[id]
276 def generateArgs(self, node):
277 """Generate the arguments to the node's action.
279 Also return the number of results expected from this request.
281 @raise ValueError: if the node should not be queried
283 return (self.caller.node.id, self.target), 0
285 def processResponse(self, dict):
286 """Process the response dictionary received from the remote node."""
287 self.handleGotNodes(dict['nodes'])
289 def generateResult(self, nodes):
290 """Create the final result to return to the L{callback} function."""
291 self.stats.completedAction(self.action, self.started)
295 class FindNode(ActionBase):
296 """Find the closest nodes to the key."""
298 def __init__(self, caller, target, callback, config, stats, action="find_node"):
299 ActionBase.__init__(self, caller, target, callback, config, stats, action)
301 def processResponse(self, dict):
302 """Save the token received from each node."""
303 if dict["id"] in self.found:
304 self.found[dict["id"]].updateToken(dict.get('token', ''))
305 self.handleGotNodes(dict['nodes'])
307 def generateResult(self):
308 """Result is the K closest nodes to the target."""
310 self.stats.completedAction(self.action, self.started)
312 for node in self.sorted_nodes:
313 if node.id not in self.failed:
314 closest_K.append(node)
315 if len(closest_K) >= K:
320 class FindValue(ActionBase):
321 """Find the closest nodes to the key and check for values."""
323 def __init__(self, caller, target, callback, config, stats, action="find_value"):
324 ActionBase.__init__(self, caller, target, callback, config, stats, action)
326 def processResponse(self, dict):
327 """Save the number of values each node has."""
328 if dict["id"] in self.found:
329 self.found[dict["id"]].updateNumValues(dict.get('num', 0))
330 self.handleGotNodes(dict['nodes'])
332 def generateResult(self):
333 """Result is the nodes that have values, sorted by proximity to the key."""
335 self.stats.completedAction(self.action, self.started)
336 return ([node for node in self.sorted_nodes if node.num_values > 0], )
339 class GetValue(ActionBase):
340 """Retrieve values from a list of nodes."""
342 def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
343 ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
345 def getNodesToProcess(self):
346 """Nodes are never added, always return the same sorted node list."""
347 return self.sorted_nodes
349 def generateArgs(self, node):
350 """Arguments include the number of values to request."""
351 if node.num_values > 0:
352 # Request all desired results from each node, just to be sure.
353 num_values = abs(self.desired_results) - len(self.results)
354 assert num_values > 0
355 if num_values > node.num_values:
357 return (self.caller.node.id, self.target, num_values), node.num_values
359 raise ValueError, "Don't try and get values from this node because it doesn't have any"
361 def processResponse(self, dict):
362 """Save the returned values, calling the L{callback} each time there are new ones."""
363 if dict.has_key('values'):
364 def x(y, z=self.results):
370 z = len(dict['values'])
371 v = filter(None, map(x, dict['values']))
373 reactor.callLater(0, self.callback, self.target, v)
375 def generateResult(self):
376 """Results have all been returned, now send the empty list to end the action."""
377 self.stats.completedAction(self.action, self.started)
378 return (self.target, [])
381 class StoreValue(ActionBase):
382 """Store a value in a list of nodes."""
384 def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
385 """Initialize the action with the value to store.
387 @type value: C{string}
388 @param value: the value to store in the nodes
390 ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
393 def getNodesToProcess(self):
394 """Nodes are never added, always return the same sorted list."""
395 return self.sorted_nodes
397 def generateArgs(self, node):
398 """Args include the value to store and the node's token."""
400 return (self.caller.node.id, self.target, self.value, node.token), 1
402 raise ValueError, "Don't store at this node since we don't know it's token"
404 def processResponse(self, dict):
405 """Save the response, though it should be nothin but the ID."""
406 self.results[dict["id"]] = dict
408 def generateResult(self):
409 """Return all the response IDs received."""
410 self.stats.completedAction(self.action, self.started)
411 return (self.target, self.value, self.results.values())