1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """Details of how to perform actions on remote peers."""
6 from twisted.internet import reactor
7 from twisted.python import log
9 from khash import intify
10 from util import uncompact
13 """Base class for some long running asynchronous proccesses like finding nodes or values.
15 @type caller: L{khashmir.Khashmir}
16 @ivar caller: the DHT instance that is performing the action
17 @type target: C{string}
18 @ivar target: the target of the action, usually a DHT key
19 @type config: C{dictionary}
20 @ivar config: the configuration variables for the DHT
21 @type action: C{string}
22 @ivar action: the name of the action to call on remote nodes
24 @ivar num: the target key in integer form
25 @type queried: C{dictionary}
26 @ivar queried: the nodes that have been queried for this action,
27 keys are node IDs, values are the node itself
28 @type answered: C{dictionary}
29 @ivar answered: the nodes that have answered the queries
30 @type found: C{dictionary}
31 @ivar found: nodes that have been found so far by the action
32 @type sorted_nodes: C{list} of L{node.Node}
33 @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
34 @type results: C{dictionary}
35 @ivar results: keys are the results found so far by the action
36 @type desired_results: C{int}
37 @ivar desired_results: the minimum number of results that are needed
38 before the action should stop
39 @type callback: C{method}
40 @ivar callback: the method to call with the results
41 @type outstanding: C{int}
42 @ivar outstanding: the number of requests currently outstanding
43 @type outstanding_results: C{int}
44 @ivar outstanding_results: the number of results that are expected from
45 the requests that are currently outstanding
46 @type finished: C{boolean}
47 @ivar finished: whether the action is done
49 @ivar sort: used to sort nodes by their proximity to the target
52 def __init__(self, caller, target, callback, config, stats, action, num_results = None):
53 """Initialize the action.
55 @type caller: L{khashmir.Khashmir}
56 @param caller: the DHT instance that is performing the action
57 @type target: C{string}
58 @param target: the target of the action, usually a DHT key
59 @type callback: C{method}
60 @param callback: the method to call with the results
61 @type config: C{dictionary}
62 @param config: the configuration variables for the DHT
63 @type stats: L{stats.StatsLogger}
64 @param stats: the statistics gatherer
65 @type action: C{string}
66 @param action: the name of the action to call on remote nodes
67 @type num_results: C{int}
68 @param num_results: the minimum number of results that are needed before
69 the action should stop (optional, defaults to getting all the results)
77 stats.startedAction(action)
78 self.num = intify(target)
82 self.sorted_nodes = []
84 self.desired_results = num_results
85 self.callback = callback
87 self.outstanding_results = 0
90 def sort(a, b, num=self.num):
91 """Sort nodes relative to the ID we are looking for."""
92 x, y = num ^ a.num, num ^ b.num
101 def goWithNodes(self, nodes):
102 """Start the action's process with a list of nodes to contact."""
104 if node.id == self.caller.node.id:
107 self.found[node.id] = node
112 """Schedule requests to be sent to remote nodes."""
113 # Check if we are already done
114 if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
115 (self.desired_results < 0 and
116 len(self.answered) >= self.config['STORE_REDUNDANCY'])):
118 result = self.generateResult()
119 reactor.callLater(0, self.callback, *result)
121 if self.finished or (self.desired_results and
122 len(self.results) + self.outstanding_results >= abs(self.desired_results)):
125 # Loop for each node that should be processed
126 for node in self.getNodesToProcess():
127 # Don't send requests twice or to ourself
128 if node.id not in self.queried and node.id != self.caller.node.id:
129 self.queried[node.id] = 1
131 # Get the action to call on the node
133 f = getattr(node, self.action)
134 except AttributeError:
135 log.msg("%s doesn't have a %s method!" % (node, self.action))
137 # Get the arguments to the action's method
139 args, expected_results = self.generateArgs(node)
143 # Call the action on the remote node
144 self.outstanding += 1
145 self.outstanding_results += expected_results
146 df = f(self.caller.node.id, *args)
147 df.addCallbacks(self.gotResponse, self.actionFailed,
148 callbackArgs = (node, expected_results),
149 errbackArgs = (node, expected_results))
151 # We might have to stop for now
152 if (self.outstanding >= self.config['CONCURRENT_REQS'] or
153 (self.desired_results and
154 len(self.results) + self.outstanding_results >= abs(self.desired_results))):
157 assert self.outstanding >= 0
158 assert self.outstanding_results >= 0
160 # If no requests are outstanding, then we are done
161 if self.outstanding == 0:
163 result = self.generateResult()
164 reactor.callLater(0, self.callback, *result)
166 def gotResponse(self, dict, node, expected_results):
167 """Receive a response from a remote node."""
168 self.caller.insertNode(node)
169 if self.finished or self.answered.has_key(node.id):
170 # a day late and a dollar short
172 self.outstanding -= 1
173 self.outstanding_results -= expected_results
174 self.answered[node.id] = 1
175 self.processResponse(dict['rsp'])
178 def actionFailed(self, err, node, expected_results):
179 """Receive an error from a remote node."""
180 log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
182 self.caller.table.nodeFailed(node)
183 self.outstanding -= 1
184 self.outstanding_results -= expected_results
187 def handleGotNodes(self, nodes):
188 """Process any received node contact info in the response.
190 Not called by default, but suitable for being called by
191 L{processResponse} in a recursive node search.
193 for compact_node in nodes:
194 node_contact = uncompact(compact_node)
195 node = self.caller.Node(node_contact)
196 if not self.found.has_key(node.id):
197 self.found[node.id] = node
200 """Sort the nodes, if necessary.
202 Assumes nodes are never removed from the L{found} dictionary.
204 if len(self.sorted_nodes) != len(self.found):
205 self.sorted_nodes = self.found.values()
206 self.sorted_nodes.sort(self.sort)
208 #{ Subclass for specific actions
209 def getNodesToProcess(self):
210 """Generate a list of nodes to process next.
212 This implementation is suitable for a recurring search over all nodes.
215 return self.sorted_nodes[:self.config['K']]
217 def generateArgs(self, node):
218 """Generate the arguments to the node's action.
220 These arguments will be appended to our node ID when calling the action.
221 Also return the number of results expected from this request.
223 @raise ValueError: if the node should not be queried
225 return (self.target, ), 0
227 def processResponse(self, dict):
228 """Process the response dictionary received from the remote node."""
229 self.handleGotNodes(dict['nodes'])
231 def generateResult(self, nodes):
232 """Create the final result to return to the L{callback} function."""
236 class FindNode(ActionBase):
237 """Find the closest nodes to the key."""
239 def __init__(self, caller, target, callback, config, stats, action="find_node"):
240 ActionBase.__init__(self, caller, target, callback, config, stats, action)
242 def processResponse(self, dict):
243 """Save the token received from each node."""
244 if dict["id"] in self.found:
245 self.found[dict["id"]].updateToken(dict.get('token', ''))
246 self.handleGotNodes(dict['nodes'])
248 def generateResult(self):
249 """Result is the K closest nodes to the target."""
251 return (self.sorted_nodes[:self.config['K']], )
254 class FindValue(ActionBase):
255 """Find the closest nodes to the key and check for values."""
257 def __init__(self, caller, target, callback, config, stats, action="find_value"):
258 ActionBase.__init__(self, caller, target, callback, config, stats, action)
260 def processResponse(self, dict):
261 """Save the number of values each node has."""
262 if dict["id"] in self.found:
263 self.found[dict["id"]].updateNumValues(dict.get('num', 0))
264 self.handleGotNodes(dict['nodes'])
266 def generateResult(self):
267 """Result is the nodes that have values, sorted by proximity to the key."""
269 return ([node for node in self.sorted_nodes if node.num_values > 0], )
272 class GetValue(ActionBase):
273 """Retrieve values from a list of nodes."""
275 def __init__(self, caller, target, local_results, num_results, callback, config, stats, action="get_value"):
276 """Initialize the action with the locally available results.
278 @type local_results: C{list} of C{string}
279 @param local_results: the values that were available in this node
281 ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
283 for result in local_results:
284 self.results[result] = 1
286 def getNodesToProcess(self):
287 """Nodes are never added, always return the same sorted node list."""
288 return self.sorted_nodes
290 def generateArgs(self, node):
291 """Arguments include the number of values to request."""
292 if node.num_values > 0:
293 # Request all desired results from each node, just to be sure.
294 num_values = abs(self.desired_results) - len(self.results)
295 assert num_values > 0
296 if num_values > node.num_values:
298 return (self.target, num_values), node.num_values
300 raise ValueError, "Don't try and get values from this node because it doesn't have any"
302 def processResponse(self, dict):
303 """Save the returned values, calling the L{callback} each time there are new ones."""
304 if dict.has_key('values'):
305 def x(y, z=self.results):
311 z = len(dict['values'])
312 v = filter(None, map(x, dict['values']))
314 reactor.callLater(0, self.callback, self.target, v)
316 def generateResult(self):
317 """Results have all been returned, now send the empty list to end the action."""
318 return (self.target, [])
321 class StoreValue(ActionBase):
322 """Store a value in a list of nodes."""
324 def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
325 """Initialize the action with the value to store.
327 @type value: C{string}
328 @param value: the value to store in the nodes
330 ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
333 def getNodesToProcess(self):
334 """Nodes are never added, always return the same sorted list."""
335 return self.sorted_nodes
337 def generateArgs(self, node):
338 """Args include the value to store and the node's token."""
340 return (self.target, self.value, node.token), 1
342 raise ValueError, "Don't store at this node since we don't know it's token"
344 def processResponse(self, dict):
345 """Save the response, though it should be nothin but the ID."""
346 self.results[dict["id"]] = dict
348 def generateResult(self):
349 """Return all the response IDs received."""
350 return (self.target, self.value, self.results.values())