2 """Details of how to perform actions on remote peers."""
4 from datetime import datetime
6 from twisted.internet import reactor, defer
7 from twisted.python import log
9 from khash import intify
11 from util import uncompact
14 """Base class for some long running asynchronous proccesses like finding nodes or values.
16 @type caller: L{khashmir.Khashmir}
17 @ivar caller: the DHT instance that is performing the action
18 @type target: C{string}
19 @ivar target: the target of the action, usually a DHT key
20 @type config: C{dictionary}
21 @ivar config: the configuration variables for the DHT
22 @type action: C{string}
23 @ivar action: the name of the action to call on remote nodes
24 @type stats: L{stats.StatsLogger}
25 @ivar stats: the statistics modules to report to
27 @ivar num: the target key in integer form
28 @type queried: C{dictionary}
29 @ivar queried: the nodes that have been queried for this action,
30 keys are node IDs, values are the node itself
31 @type answered: C{dictionary}
32 @ivar answered: the nodes that have answered the queries
33 @type failed: C{dictionary}
34 @ivar failed: the nodes that have failed to answer the queries
35 @type found: C{dictionary}
36 @ivar found: nodes that have been found so far by the action
37 @type sorted_nodes: C{list} of L{node.Node}
38 @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
39 @type results: C{dictionary}
40 @ivar results: keys are the results found so far by the action
41 @type desired_results: C{int}
42 @ivar desired_results: the minimum number of results that are needed
43 before the action should stop
44 @type callback: C{method}
45 @ivar callback: the method to call with the results
46 @type outstanding: C{dictionary}
47 @ivar outstanding: the nodes that have outstanding requests for this action,
48 keys are node IDs, values are the number of outstanding results from the node
49 @type outstanding_results: C{int}
50 @ivar outstanding_results: the total number of results that are expected from
51 the requests that are currently outstanding
52 @type finished: C{boolean}
53 @ivar finished: whether the action is done
54 @type started: C{datetime.datetime}
55 @ivar started: the time the action was started at
57 @ivar sort: used to sort nodes by their proximity to the target
60 def __init__(self, caller, target, callback, config, stats, action, num_results = None):
61 """Initialize the action.
63 @type caller: L{khashmir.Khashmir}
64 @param caller: the DHT instance that is performing the action
65 @type target: C{string}
66 @param target: the target of the action, usually a DHT key
67 @type callback: C{method}
68 @param callback: the method to call with the results
69 @type config: C{dictionary}
70 @param config: the configuration variables for the DHT
71 @type stats: L{stats.StatsLogger}
72 @param stats: the statistics gatherer
73 @type action: C{string}
74 @param action: the name of the action to call on remote nodes
75 @type num_results: C{int}
76 @param num_results: the minimum number of results that are needed before
77 the action should stop (optional, defaults to getting all the results)
86 self.stats.startedAction(action)
87 self.num = intify(target)
92 self.sorted_nodes = []
94 self.desired_results = num_results
95 self.callback = callback
97 self.outstanding_results = 0
99 self.started = datetime.now()
101 def sort(a, b, num=self.num):
102 """Sort nodes relative to the ID we are looking for."""
103 x, y = num ^ a.num, num ^ b.num
112 def goWithNodes(self, nodes):
113 """Start the action's process with a list of nodes to contact."""
114 self.started = datetime.now()
116 self.found[node.id] = node
121 """Schedule requests to be sent to remote nodes."""
125 # Get the nodes to be processed
126 nodes = self.getNodesToProcess()
128 # Check if we are already done
129 if nodes is None or (self.desired_results and
130 ((len(self.results) >= abs(self.desired_results)) or
131 (self.desired_results < 0 and
132 len(self.answered) >= self.config['STORE_REDUNDANCY']))):
134 result = self.generateResult()
135 reactor.callLater(0, self.callback, *result)
138 # Check if we have enough outstanding results coming
139 if (self.desired_results and
140 len(self.results) + self.outstanding_results >= abs(self.desired_results)):
143 # Loop for each node that should be processed
145 # Don't send requests twice or to ourself
146 if node.id not in self.queried:
147 self.queried[node.id] = 1
149 # Get the action to call on the node
150 if node.id == self.caller.node.id:
152 f = getattr(self.caller, 'krpc_' + self.action)
153 except AttributeError:
154 log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
158 f = getattr(node, self.action)
159 except AttributeError:
160 log.msg("%s doesn't have a %s method!" % (node, self.action))
163 # Get the arguments to the action's method
165 args, expected_results = self.generateArgs(node)
169 # Call the action on the remote node
170 self.outstanding[node.id] = expected_results
171 self.outstanding_results += expected_results
172 df = defer.maybeDeferred(f, *args)
173 reactor.callLater(0, df.addCallbacks,
174 *(self.gotResponse, self.actionFailed),
175 **{'callbackArgs': (node, ),
176 'errbackArgs': (node, )})
178 # We might have to stop for now
179 if (len(self.outstanding) >= self.config['CONCURRENT_REQS'] or
180 (self.desired_results and
181 len(self.results) + self.outstanding_results >= abs(self.desired_results))):
184 assert self.outstanding_results >= 0
186 # If no requests are outstanding, then we are done
187 if len(self.outstanding) == 0:
189 result = self.generateResult()
190 reactor.callLater(0, self.callback, *result)
192 def gotResponse(self, dict, node):
193 """Receive a response from a remote node."""
194 if node.id != self.caller.node.id:
195 reactor.callLater(0, self.caller.insertNode, node)
196 if self.finished or self.answered.has_key(node.id):
197 # a day late and a dollar short
200 # Process the response
201 self.processResponse(dict)
202 self.answered[node.id] = 1
204 # Unexpected error with the response
205 log.msg("action %s failed on %s/%s: %r" % (self.action, node.host, node.port, e))
206 if node.id != self.caller.node.id:
207 self.caller.nodeFailed(node)
208 self.failed[node.id] = 1
209 if self.outstanding.has_key(node.id):
210 self.outstanding_results -= self.outstanding[node.id]
211 del self.outstanding[node.id]
214 def actionFailed(self, err, node):
215 """Receive an error from a remote node."""
216 log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage()))
217 if node.id != self.caller.node.id:
218 self.caller.nodeFailed(node)
219 self.failed[node.id] = 1
220 if self.outstanding.has_key(node.id):
221 self.outstanding_results -= self.outstanding[node.id]
222 del self.outstanding[node.id]
225 def handleGotNodes(self, nodes):
226 """Process any received node contact info in the response.
228 Not called by default, but suitable for being called by
229 L{processResponse} in a recursive node search.
231 if nodes and type(nodes) != list:
232 raise ValueError, "got a malformed response, from bittorrent perhaps"
233 for compact_node in nodes:
234 node_contact = uncompact(compact_node)
235 node = self.caller.Node(node_contact)
236 if not self.found.has_key(node.id):
237 self.found[node.id] = node
240 """Sort the nodes, if necessary.
242 Assumes nodes are never removed from the L{found} dictionary.
244 if len(self.sorted_nodes) != len(self.found):
245 self.sorted_nodes = self.found.values()
246 self.sorted_nodes.sort(self.sort)
248 #{ Subclass for specific actions
249 def getNodesToProcess(self):
250 """Generate a list of nodes to process next.
252 This implementation is suitable for a recurring search over all nodes.
253 It will stop the search when the closest K nodes have been queried.
254 It also prematurely drops requests to nodes that have fallen way behind.
256 @return: sorted list of nodes to query, or None if we are done
258 # Find the K closest nodes that haven't failed, count how many answered
262 for node in self.sorted_nodes:
263 if node.id not in self.failed:
264 closest_K.append(node)
265 if node.id in self.answered:
267 if len(closest_K) >= K:
270 # If we have responses from the K closest nodes, then we are done
272 log.msg('Got the answers we need, aborting search')
275 # Check the oustanding requests to see if they are still closest
276 for id in self.outstanding.keys():
277 if self.found[id] not in closest_K:
278 # Request is not important, allow another to go
279 log.msg("Request to %s/%s is taking too long, moving on" %
280 (self.found[id].host, self.found[id].port))
281 self.outstanding_results -= self.outstanding[id]
282 del self.outstanding[id]
286 def generateArgs(self, node):
287 """Generate the arguments to the node's action.
289 Also return the number of results expected from this request.
291 @raise ValueError: if the node should not be queried
293 return (self.caller.node.id, self.target), 0
295 def processResponse(self, dict):
296 """Process the response dictionary received from the remote node."""
297 self.handleGotNodes(dict['nodes'])
299 def generateResult(self, nodes):
300 """Create the final result to return to the L{callback} function."""
301 self.stats.completedAction(self.action, self.started)
305 class FindNode(ActionBase):
306 """Find the closest nodes to the key."""
308 def __init__(self, caller, target, callback, config, stats, action="find_node"):
309 ActionBase.__init__(self, caller, target, callback, config, stats, action)
311 def processResponse(self, dict):
312 """Save the token received from each node."""
313 if dict["id"] in self.found:
314 self.found[dict["id"]].updateToken(dict.get('token', ''))
315 self.handleGotNodes(dict['nodes'])
317 def generateResult(self):
318 """Result is the K closest nodes to the target."""
320 self.stats.completedAction(self.action, self.started)
322 for node in self.sorted_nodes:
323 if node.id not in self.failed:
324 closest_K.append(node)
325 if len(closest_K) >= K:
330 class FindValue(ActionBase):
331 """Find the closest nodes to the key and check for values."""
333 def __init__(self, caller, target, callback, config, stats, action="find_value"):
334 ActionBase.__init__(self, caller, target, callback, config, stats, action)
336 def processResponse(self, dict):
337 """Save the number of values each node has."""
338 if dict["id"] in self.found:
339 self.found[dict["id"]].updateNumValues(dict.get('num', 0))
340 self.handleGotNodes(dict['nodes'])
342 def generateResult(self):
343 """Result is the nodes that have values, sorted by proximity to the key."""
345 self.stats.completedAction(self.action, self.started)
346 return ([node for node in self.sorted_nodes if node.num_values > 0], )
349 class GetValue(ActionBase):
350 """Retrieve values from a list of nodes."""
352 def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
353 ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
355 def getNodesToProcess(self):
356 """Nodes are never added, always return the same sorted node list."""
357 return self.sorted_nodes
359 def generateArgs(self, node):
360 """Arguments include the number of values to request."""
361 if node.num_values > 0:
362 # Request all desired results from each node, just to be sure.
363 num_values = abs(self.desired_results) - len(self.results)
364 assert num_values > 0
365 if num_values > node.num_values:
367 return (self.caller.node.id, self.target, num_values), node.num_values
369 raise ValueError, "Don't try and get values from this node because it doesn't have any"
371 def processResponse(self, dict):
372 """Save the returned values, calling the L{callback} each time there are new ones."""
373 if dict.has_key('values'):
374 def x(y, z=self.results):
380 z = len(dict['values'])
381 v = filter(None, map(x, dict['values']))
383 reactor.callLater(0, self.callback, self.target, v)
385 def generateResult(self):
386 """Results have all been returned, now send the empty list to end the action."""
387 self.stats.completedAction(self.action, self.started)
388 return (self.target, [])
391 class StoreValue(ActionBase):
392 """Store a value in a list of nodes."""
394 def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
395 """Initialize the action with the value to store.
397 @type value: C{string}
398 @param value: the value to store in the nodes
400 ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
403 def getNodesToProcess(self):
404 """Nodes are never added, always return the same sorted list."""
405 return self.sorted_nodes
407 def generateArgs(self, node):
408 """Args include the value to store and the node's token."""
410 return (self.caller.node.id, self.target, self.value, node.token), 1
412 raise ValueError, "Don't store at this node since we don't know it's token"
414 def processResponse(self, dict):
415 """Save the response, though it should be nothin but the ID."""
416 self.results[dict["id"]] = dict
418 def generateResult(self):
419 """Return all the response IDs received."""
420 self.stats.completedAction(self.action, self.started)
421 return (self.target, self.value, self.results.values())