1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """Details of how to perform actions on remote peers."""
6 from datetime import datetime
8 from twisted.internet import reactor, defer
9 from twisted.python import log
11 from khash import intify
13 from util import uncompact
16 """Base class for some long running asynchronous proccesses like finding nodes or values.
18 @type caller: L{khashmir.Khashmir}
19 @ivar caller: the DHT instance that is performing the action
20 @type target: C{string}
21 @ivar target: the target of the action, usually a DHT key
22 @type config: C{dictionary}
23 @ivar config: the configuration variables for the DHT
24 @type action: C{string}
25 @ivar action: the name of the action to call on remote nodes
26 @type stats: L{stats.StatsLogger}
27 @ivar stats: the statistics modules to report to
29 @ivar num: the target key in integer form
30 @type queried: C{dictionary}
31 @ivar queried: the nodes that have been queried for this action,
32 keys are node IDs, values are the node itself
33 @type answered: C{dictionary}
34 @ivar answered: the nodes that have answered the queries
35 @type found: C{dictionary}
36 @ivar found: nodes that have been found so far by the action
37 @type sorted_nodes: C{list} of L{node.Node}
38 @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
39 @type results: C{dictionary}
40 @ivar results: keys are the results found so far by the action
41 @type desired_results: C{int}
42 @ivar desired_results: the minimum number of results that are needed
43 before the action should stop
44 @type callback: C{method}
45 @ivar callback: the method to call with the results
46 @type outstanding: C{int}
47 @ivar outstanding: the number of requests currently outstanding
48 @type outstanding_results: C{int}
49 @ivar outstanding_results: the number of results that are expected from
50 the requests that are currently outstanding
51 @type finished: C{boolean}
52 @ivar finished: whether the action is done
53 @type started: C{datetime.datetime}
54 @ivar started: the time the action was started at
56 @ivar sort: used to sort nodes by their proximity to the target
59 def __init__(self, caller, target, callback, config, stats, action, num_results = None):
60 """Initialize the action.
62 @type caller: L{khashmir.Khashmir}
63 @param caller: the DHT instance that is performing the action
64 @type target: C{string}
65 @param target: the target of the action, usually a DHT key
66 @type callback: C{method}
67 @param callback: the method to call with the results
68 @type config: C{dictionary}
69 @param config: the configuration variables for the DHT
70 @type stats: L{stats.StatsLogger}
71 @param stats: the statistics gatherer
72 @type action: C{string}
73 @param action: the name of the action to call on remote nodes
74 @type num_results: C{int}
75 @param num_results: the minimum number of results that are needed before
76 the action should stop (optional, defaults to getting all the results)
85 self.stats.startedAction(action)
86 self.num = intify(target)
90 self.sorted_nodes = []
92 self.desired_results = num_results
93 self.callback = callback
95 self.outstanding_results = 0
97 self.started = datetime.now()
99 def sort(a, b, num=self.num):
100 """Sort nodes relative to the ID we are looking for."""
101 x, y = num ^ a.num, num ^ b.num
110 def goWithNodes(self, nodes):
111 """Start the action's process with a list of nodes to contact."""
112 self.started = datetime.now()
114 self.found[node.id] = node
119 """Schedule requests to be sent to remote nodes."""
123 # Check if we are already done
124 if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
125 (self.desired_results < 0 and
126 len(self.answered) >= self.config['STORE_REDUNDANCY'])):
128 result = self.generateResult()
129 reactor.callLater(0, self.callback, *result)
132 # Check if we have enough outstanding results coming
133 if (self.desired_results and
134 len(self.results) + self.outstanding_results >= abs(self.desired_results)):
137 # Loop for each node that should be processed
138 for node in self.getNodesToProcess():
139 # Don't send requests twice or to ourself
140 if node.id not in self.queried:
141 self.queried[node.id] = 1
143 # Get the action to call on the node
144 if node.id == self.caller.node.id:
146 f = getattr(self.caller, 'krpc_' + self.action)
147 except AttributeError:
148 log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
152 f = getattr(node, self.action)
153 except AttributeError:
154 log.msg("%s doesn't have a %s method!" % (node, self.action))
157 # Get the arguments to the action's method
159 args, expected_results = self.generateArgs(node)
163 # Call the action on the remote node
164 self.outstanding += 1
165 self.outstanding_results += expected_results
166 df = defer.maybeDeferred(f, *args)
167 reactor.callLater(0, df.addCallbacks,
168 *(self.gotResponse, self.actionFailed),
169 **{'callbackArgs': (node, expected_results, df),
170 'errbackArgs': (node, expected_results, df)})
172 # We might have to stop for now
173 if (self.outstanding >= self.config['CONCURRENT_REQS'] or
174 (self.desired_results and
175 len(self.results) + self.outstanding_results >= abs(self.desired_results))):
178 assert self.outstanding >= 0
179 assert self.outstanding_results >= 0
181 # If no requests are outstanding, then we are done
182 if self.outstanding == 0:
184 result = self.generateResult()
185 reactor.callLater(0, self.callback, *result)
187 def gotResponse(self, dict, node, expected_results, df):
188 """Receive a response from a remote node."""
189 self.caller.insertNode(node)
190 if self.finished or self.answered.has_key(node.id):
191 # a day late and a dollar short
193 self.outstanding -= 1
194 self.outstanding_results -= expected_results
195 self.answered[node.id] = 1
196 self.processResponse(dict)
199 def actionFailed(self, err, node, expected_results, df):
200 """Receive an error from a remote node."""
201 log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage()))
202 self.caller.table.nodeFailed(node)
203 self.outstanding -= 1
204 self.outstanding_results -= expected_results
207 def handleGotNodes(self, nodes):
208 """Process any received node contact info in the response.
210 Not called by default, but suitable for being called by
211 L{processResponse} in a recursive node search.
213 for compact_node in nodes:
214 node_contact = uncompact(compact_node)
215 node = self.caller.Node(node_contact)
216 if not self.found.has_key(node.id):
217 self.found[node.id] = node
220 """Sort the nodes, if necessary.
222 Assumes nodes are never removed from the L{found} dictionary.
224 if len(self.sorted_nodes) != len(self.found):
225 self.sorted_nodes = self.found.values()
226 self.sorted_nodes.sort(self.sort)
228 #{ Subclass for specific actions
229 def getNodesToProcess(self):
230 """Generate a list of nodes to process next.
232 This implementation is suitable for a recurring search over all nodes.
235 return self.sorted_nodes[:K]
237 def generateArgs(self, node):
238 """Generate the arguments to the node's action.
240 Also return the number of results expected from this request.
242 @raise ValueError: if the node should not be queried
244 return (self.caller.node.id, self.target), 0
246 def processResponse(self, dict):
247 """Process the response dictionary received from the remote node."""
248 self.handleGotNodes(dict['nodes'])
250 def generateResult(self, nodes):
251 """Create the final result to return to the L{callback} function."""
252 self.stats.completedAction(self.action, self.started)
256 class FindNode(ActionBase):
257 """Find the closest nodes to the key."""
259 def __init__(self, caller, target, callback, config, stats, action="find_node"):
260 ActionBase.__init__(self, caller, target, callback, config, stats, action)
262 def processResponse(self, dict):
263 """Save the token received from each node."""
264 if dict["id"] in self.found:
265 self.found[dict["id"]].updateToken(dict.get('token', ''))
266 self.handleGotNodes(dict['nodes'])
268 def generateResult(self):
269 """Result is the K closest nodes to the target."""
271 self.stats.completedAction(self.action, self.started)
272 return (self.sorted_nodes[:K], )
275 class FindValue(ActionBase):
276 """Find the closest nodes to the key and check for values."""
278 def __init__(self, caller, target, callback, config, stats, action="find_value"):
279 ActionBase.__init__(self, caller, target, callback, config, stats, action)
281 def processResponse(self, dict):
282 """Save the number of values each node has."""
283 if dict["id"] in self.found:
284 self.found[dict["id"]].updateNumValues(dict.get('num', 0))
285 self.handleGotNodes(dict['nodes'])
287 def generateResult(self):
288 """Result is the nodes that have values, sorted by proximity to the key."""
290 self.stats.completedAction(self.action, self.started)
291 return ([node for node in self.sorted_nodes if node.num_values > 0], )
294 class GetValue(ActionBase):
295 """Retrieve values from a list of nodes."""
297 def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
298 ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
300 def getNodesToProcess(self):
301 """Nodes are never added, always return the same sorted node list."""
302 return self.sorted_nodes
304 def generateArgs(self, node):
305 """Arguments include the number of values to request."""
306 if node.num_values > 0:
307 # Request all desired results from each node, just to be sure.
308 num_values = abs(self.desired_results) - len(self.results)
309 assert num_values > 0
310 if num_values > node.num_values:
312 return (self.caller.node.id, self.target, num_values), node.num_values
314 raise ValueError, "Don't try and get values from this node because it doesn't have any"
316 def processResponse(self, dict):
317 """Save the returned values, calling the L{callback} each time there are new ones."""
318 if dict.has_key('values'):
319 def x(y, z=self.results):
325 z = len(dict['values'])
326 v = filter(None, map(x, dict['values']))
328 reactor.callLater(0, self.callback, self.target, v)
330 def generateResult(self):
331 """Results have all been returned, now send the empty list to end the action."""
332 self.stats.completedAction(self.action, self.started)
333 return (self.target, [])
336 class StoreValue(ActionBase):
337 """Store a value in a list of nodes."""
339 def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
340 """Initialize the action with the value to store.
342 @type value: C{string}
343 @param value: the value to store in the nodes
345 ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
348 def getNodesToProcess(self):
349 """Nodes are never added, always return the same sorted list."""
350 return self.sorted_nodes
352 def generateArgs(self, node):
353 """Args include the value to store and the node's token."""
355 return (self.caller.node.id, self.target, self.value, node.token), 1
357 raise ValueError, "Don't store at this node since we don't know it's token"
359 def processResponse(self, dict):
360 """Save the response, though it should be nothin but the ID."""
361 self.results[dict["id"]] = dict
363 def generateResult(self):
364 """Return all the response IDs received."""
365 self.stats.completedAction(self.action, self.started)
366 return (self.target, self.value, self.results.values())