1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """Details of how to perform actions on remote peers."""
6 from twisted.internet import reactor
7 from twisted.python import log
9 from khash import intify
10 from util import uncompact
13 """Base class for some long running asynchronous proccesses like finding nodes or values.
15 @type caller: L{khashmir.Khashmir}
16 @ivar caller: the DHT instance that is performing the action
17 @type target: C{string}
18 @ivar target: the target of the action, usually a DHT key
19 @type config: C{dictionary}
20 @ivar config: the configuration variables for the DHT
21 @type action: C{string}
22 @ivar action: the name of the action to call on remote nodes
24 @ivar num: the target key in integer form
25 @type queried: C{dictionary}
26 @ivar queried: the nodes that have been queried for this action,
27 keys are node IDs, values are the node itself
28 @type answered: C{dictionary}
29 @ivar answered: the nodes that have answered the queries
30 @type found: C{dictionary}
31 @ivar found: nodes that have been found so far by the action
32 @type sorted_nodes: C{list} of L{node.Node}
33 @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
34 @type results: C{dictionary}
35 @ivar results: keys are the results found so far by the action
36 @type desired_results: C{int}
37 @ivar desired_results: the minimum number of results that are needed
38 before the action should stop
39 @type callback: C{method}
40 @ivar callback: the method to call with the results
41 @type outstanding: C{int}
42 @ivar outstanding: the number of requests currently outstanding
43 @type outstanding_results: C{int}
44 @ivar outstanding_results: the number of results that are expected from
45 the requests that are currently outstanding
46 @type finished: C{boolean}
47 @ivar finished: whether the action is done
49 @ivar sort: used to sort nodes by their proximity to the target
52 def __init__(self, caller, target, callback, config, action, num_results = None):
53 """Initialize the action.
55 @type caller: L{khashmir.Khashmir}
56 @param caller: the DHT instance that is performing the action
57 @type target: C{string}
58 @param target: the target of the action, usually a DHT key
59 @type callback: C{method}
60 @param callback: the method to call with the results
61 @type config: C{dictionary}
62 @param config: the configuration variables for the DHT
63 @type action: C{string}
64 @param action: the name of the action to call on remote nodes
65 @type num_results: C{int}
66 @param num_results: the minimum number of results that are needed before
67 the action should stop (optional, defaults to getting all the results)
75 self.num = intify(target)
79 self.sorted_nodes = []
81 self.desired_results = num_results
82 self.callback = callback
84 self.outstanding_results = 0
87 def sort(a, b, num=self.num):
88 """Sort nodes relative to the ID we are looking for."""
89 x, y = num ^ a.num, num ^ b.num
98 def goWithNodes(self, nodes):
99 """Start the action's process with a list of nodes to contact."""
101 if node.id == self.caller.node.id:
104 self.found[node.id] = node
109 """Schedule requests to be sent to remote nodes."""
110 # Check if we are already done
111 if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
112 (self.desired_results < 0 and
113 len(self.answered) >= self.config['STORE_REDUNDANCY'])):
115 result = self.generateResult()
116 reactor.callLater(0, self.callback, *result)
118 if self.finished or (self.desired_results and
119 len(self.results) + self.outstanding_results >= abs(self.desired_results)):
122 # Loop for each node that should be processed
123 for node in self.getNodesToProcess():
124 # Don't send requests twice or to ourself
125 if node.id not in self.queried and node.id != self.caller.node.id:
126 self.queried[node.id] = 1
128 # Get the action to call on the node
130 f = getattr(node, self.action)
131 except AttributeError:
132 log.msg("%s doesn't have a %s method!" % (node, self.action))
134 # Get the arguments to the action's method
136 args, expected_results = self.generateArgs(node)
140 # Call the action on the remote node
141 self.outstanding += 1
142 self.outstanding_results += expected_results
143 df = f(self.caller.node.id, *args)
144 df.addCallbacks(self.gotResponse, self.actionFailed,
145 callbackArgs = (node, expected_results),
146 errbackArgs = (node, expected_results))
148 # We might have to stop for now
149 if (self.outstanding >= self.config['CONCURRENT_REQS'] or
150 (self.desired_results and
151 len(self.results) + self.outstanding_results >= abs(self.desired_results))):
154 assert self.outstanding >= 0
155 assert self.outstanding_results >= 0
157 # If no requests are outstanding, then we are done
158 if self.outstanding == 0:
160 result = self.generateResult()
161 reactor.callLater(0, self.callback, *result)
163 def gotResponse(self, dict, node, expected_results):
164 """Receive a response from a remote node."""
165 self.caller.insertNode(node)
166 if self.finished or self.answered.has_key(node.id):
167 # a day late and a dollar short
169 self.outstanding -= 1
170 self.outstanding_results -= expected_results
171 self.answered[node.id] = 1
172 self.processResponse(dict['rsp'])
175 def actionFailed(self, err, node, expected_results):
176 """Receive an error from a remote node."""
177 log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
179 self.caller.table.nodeFailed(node)
180 self.outstanding -= 1
181 self.outstanding_results -= expected_results
184 def handleGotNodes(self, nodes):
185 """Process any received node contact info in the response.
187 Not called by default, but suitable for being called by
188 L{processResponse} in a recursive node search.
190 for compact_node in nodes:
191 node_contact = uncompact(compact_node)
192 node = self.caller.Node(node_contact)
193 if not self.found.has_key(node.id):
194 self.found[node.id] = node
197 """Sort the nodes, if necessary.
199 Assumes nodes are never removed from the L{found} dictionary.
201 if len(self.sorted_nodes) != len(self.found):
202 self.sorted_nodes = self.found.values()
203 self.sorted_nodes.sort(self.sort)
205 #{ Subclass for specific actions
206 def getNodesToProcess(self):
207 """Generate a list of nodes to process next.
209 This implementation is suitable for a recurring search over all nodes.
212 return self.sorted_nodes[:self.config['K']]
214 def generateArgs(self, node):
215 """Generate the arguments to the node's action.
217 These arguments will be appended to our node ID when calling the action.
218 Also return the number of results expected from this request.
220 @raise ValueError: if the node should not be queried
222 return (self.target, ), 0
224 def processResponse(self, dict):
225 """Process the response dictionary received from the remote node."""
226 self.handleGotNodes(dict['nodes'])
228 def generateResult(self, nodes):
229 """Create the final result to return to the L{callback} function."""
233 class FindNode(ActionBase):
234 """Find the closest nodes to the key."""
236 def __init__(self, caller, target, callback, config, action="findNode"):
237 ActionBase.__init__(self, caller, target, callback, config, action)
239 def processResponse(self, dict):
240 """Save the token received from each node."""
241 if dict["id"] in self.found:
242 self.found[dict["id"]].updateToken(dict.get('token', ''))
243 self.handleGotNodes(dict['nodes'])
245 def generateResult(self):
246 """Result is the K closest nodes to the target."""
248 return (self.sorted_nodes[:self.config['K']], )
251 class FindValue(ActionBase):
252 """Find the closest nodes to the key and check for values."""
254 def __init__(self, caller, target, callback, config, action="findValue"):
255 ActionBase.__init__(self, caller, target, callback, config, action)
257 def processResponse(self, dict):
258 """Save the number of values each node has."""
259 if dict["id"] in self.found:
260 self.found[dict["id"]].updateNumValues(dict.get('num', 0))
261 self.handleGotNodes(dict['nodes'])
263 def generateResult(self):
264 """Result is the nodes that have values, sorted by proximity to the key."""
266 return ([node for node in self.sorted_nodes if node.num_values > 0], )
269 class GetValue(ActionBase):
270 """Retrieve values from a list of nodes."""
272 def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"):
273 """Initialize the action with the locally available results.
275 @type local_results: C{list} of C{string}
276 @param local_results: the values that were available in this node
278 ActionBase.__init__(self, caller, target, callback, config, action, num_results)
280 for result in local_results:
281 self.results[result] = 1
283 def getNodesToProcess(self):
284 """Nodes are never added, always return the same sorted node list."""
285 return self.sorted_nodes
287 def generateArgs(self, node):
288 """Arguments include the number of values to request."""
289 if node.num_values > 0:
290 # Request all desired results from each node, just to be sure.
291 num_values = abs(self.desired_results) - len(self.results)
292 assert num_values > 0
293 if num_values > node.num_values:
295 return (self.target, num_values), node.num_values
297 raise ValueError, "Don't try and get values from this node because it doesn't have any"
299 def processResponse(self, dict):
300 """Save the returned values, calling the L{callback} each time there are new ones."""
301 if dict.has_key('values'):
302 def x(y, z=self.results):
308 z = len(dict['values'])
309 v = filter(None, map(x, dict['values']))
311 reactor.callLater(0, self.callback, self.target, v)
313 def generateResult(self):
314 """Results have all been returned, now send the empty list to end the action."""
315 return (self.target, [])
318 class StoreValue(ActionBase):
319 """Store a value in a list of nodes."""
321 def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"):
322 """Initialize the action with the value to store.
324 @type value: C{string}
325 @param value: the value to store in the nodes
327 ActionBase.__init__(self, caller, target, callback, config, action, num_results)
330 def getNodesToProcess(self):
331 """Nodes are never added, always return the same sorted list."""
332 return self.sorted_nodes
334 def generateArgs(self, node):
335 """Args include the value to store and the node's token."""
337 return (self.target, self.value, node.token), 1
339 raise ValueError, "Don't store at this node since we don't know it's token"
341 def processResponse(self, dict):
342 """Save the response, though it should be nothin but the ID."""
343 self.results[dict["id"]] = dict
345 def generateResult(self):
346 """Return all the response IDs received."""
347 return (self.target, self.value, self.results.values())