Various documentation fixes and additions.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """Details of how to perform actions on remote peers."""
5
6 from twisted.internet import reactor
7 from twisted.python import log
8
9 from khash import intify
10 from util import uncompact
11
12 class ActionBase:
13     """Base class for some long running asynchronous proccesses like finding nodes or values.
14     
15     @type caller: L{khashmir.Khashmir}
16     @ivar caller: the DHT instance that is performing the action
17     @type target: C{string}
18     @ivar target: the target of the action, usually a DHT key
19     @type config: C{dictionary}
20     @ivar config: the configuration variables for the DHT
21     @type action: C{string}
22     @ivar action: the name of the action to call on remote nodes
23     @type num: C{long}
24     @ivar num: the target key in integer form
25     @type queried: C{dictionary}
26     @ivar queried: the nodes that have been queried for this action,
27         keys are node IDs, values are the node itself
28     @type answered: C{dictionary}
29     @ivar answered: the nodes that have answered the queries
30     @type found: C{dictionary}
31     @ivar found: nodes that have been found so far by the action
32     @type sorted_nodes: C{list} of L{node.Node}
33     @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
34     @type results: C{dictionary}
35     @ivar results: keys are the results found so far by the action
36     @type desired_results: C{int}
37     @ivar desired_results: the minimum number of results that are needed
38         before the action should stop
39     @type callback: C{method}
40     @ivar callback: the method to call with the results
41     @type outstanding: C{int}
42     @ivar outstanding: the number of requests currently outstanding
43     @type outstanding_results: C{int}
44     @ivar outstanding_results: the number of results that are expected from
45         the requests that are currently outstanding
46     @type finished: C{boolean}
47     @ivar finished: whether the action is done
48     @type sort: C{method}
49     @ivar sort: used to sort nodes by their proximity to the target
50     """
51     
52     def __init__(self, caller, target, callback, config, action, num_results = None):
53         """Initialize the action.
54         
55         @type caller: L{khashmir.Khashmir}
56         @param caller: the DHT instance that is performing the action
57         @type target: C{string}
58         @param target: the target of the action, usually a DHT key
59         @type callback: C{method}
60         @param callback: the method to call with the results
61         @type config: C{dictionary}
62         @param config: the configuration variables for the DHT
63         @type action: C{string}
64         @param action: the name of the action to call on remote nodes
65         @type num_results: C{int}
66         @param num_results: the minimum number of results that are needed before
67             the action should stop (optional, defaults to getting all the results)
68         
69         """
70         
71         self.caller = caller
72         self.target = target
73         self.config = config
74         self.action = action
75         self.num = intify(target)
76         self.queried = {}
77         self.answered = {}
78         self.found = {}
79         self.sorted_nodes = []
80         self.results = {}
81         self.desired_results = num_results
82         self.callback = callback
83         self.outstanding = 0
84         self.outstanding_results = 0
85         self.finished = False
86     
87         def sort(a, b, num=self.num):
88             """Sort nodes relative to the ID we are looking for."""
89             x, y = num ^ a.num, num ^ b.num
90             if x > y:
91                 return 1
92             elif x < y:
93                 return -1
94             return 0
95         self.sort = sort
96
97     #{ Main operation
98     def goWithNodes(self, nodes):
99         """Start the action's process with a list of nodes to contact."""
100         for node in nodes:
101             if node.id == self.caller.node.id:
102                 continue
103             else:
104                 self.found[node.id] = node
105         self.sortNodes()
106         self.schedule()
107     
108     def schedule(self):
109         """Schedule requests to be sent to remote nodes."""
110         # Check if we are already done
111         if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
112                                      (self.desired_results < 0 and
113                                       len(self.answered) >= self.config['STORE_REDUNDANCY'])):
114             self.finished = True
115             result = self.generateResult()
116             reactor.callLater(0, self.callback, *result)
117
118         if self.finished or (self.desired_results and 
119                              len(self.results) + self.outstanding_results >= abs(self.desired_results)):
120             return
121         
122         # Loop for each node that should be processed
123         for node in self.getNodesToProcess():
124             # Don't send requests twice or to ourself
125             if node.id not in self.queried and node.id != self.caller.node.id:
126                 self.queried[node.id] = 1
127                 
128                 # Get the action to call on the node
129                 try:
130                     f = getattr(node, self.action)
131                 except AttributeError:
132                     log.msg("%s doesn't have a %s method!" % (node, self.action))
133                 else:
134                     # Get the arguments to the action's method
135                     try:
136                         args, expected_results = self.generateArgs(node)
137                     except ValueError:
138                         pass
139                     else:
140                         # Call the action on the remote node
141                         self.outstanding += 1
142                         self.outstanding_results += expected_results
143                         df = f(self.caller.node.id, *args)
144                         df.addCallbacks(self.gotResponse, self.actionFailed,
145                                         callbackArgs = (node, expected_results),
146                                         errbackArgs = (node, expected_results))
147                         
148             # We might have to stop for now
149             if (self.outstanding >= self.config['CONCURRENT_REQS'] or
150                 (self.desired_results and
151                  len(self.results) + self.outstanding_results >= abs(self.desired_results))):
152                 break
153             
154         assert self.outstanding >= 0
155         assert self.outstanding_results >= 0
156
157         # If no requests are outstanding, then we are done
158         if self.outstanding == 0:
159             self.finished = True
160             result = self.generateResult()
161             reactor.callLater(0, self.callback, *result)
162
163     def gotResponse(self, dict, node, expected_results):
164         """Receive a response from a remote node."""
165         self.caller.insertNode(node)
166         if self.finished or self.answered.has_key(node.id):
167             # a day late and a dollar short
168             return
169         self.outstanding -= 1
170         self.outstanding_results -= expected_results
171         self.answered[node.id] = 1
172         self.processResponse(dict['rsp'])
173         self.schedule()
174
175     def actionFailed(self, err, node, expected_results):
176         """Receive an error from a remote node."""
177         log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
178         log.err(err)
179         self.caller.table.nodeFailed(node)
180         self.outstanding -= 1
181         self.outstanding_results -= expected_results
182         self.schedule()
183     
184     def handleGotNodes(self, nodes):
185         """Process any received node contact info in the response.
186         
187         Not called by default, but suitable for being called by
188         L{processResponse} in a recursive node search.
189         """
190         for compact_node in nodes:
191             node_contact = uncompact(compact_node)
192             node = self.caller.Node(node_contact)
193             if not self.found.has_key(node.id):
194                 self.found[node.id] = node
195
196     def sortNodes(self):
197         """Sort the nodes, if necessary.
198         
199         Assumes nodes are never removed from the L{found} dictionary.
200         """
201         if len(self.sorted_nodes) != len(self.found):
202             self.sorted_nodes = self.found.values()
203             self.sorted_nodes.sort(self.sort)
204                 
205     #{ Subclass for specific actions
206     def getNodesToProcess(self):
207         """Generate a list of nodes to process next.
208         
209         This implementation is suitable for a recurring search over all nodes.
210         """
211         self.sortNodes()
212         return self.sorted_nodes[:self.config['K']]
213     
214     def generateArgs(self, node):
215         """Generate the arguments to the node's action.
216         
217         These arguments will be appended to our node ID when calling the action.
218         Also return the number of results expected from this request.
219         
220         @raise ValueError: if the node should not be queried
221         """
222         return (self.target, ), 0
223     
224     def processResponse(self, dict):
225         """Process the response dictionary received from the remote node."""
226         self.handleGotNodes(dict['nodes'])
227
228     def generateResult(self, nodes):
229         """Create the final result to return to the L{callback} function."""
230         return []
231         
232
233 class FindNode(ActionBase):
234     """Find the closest nodes to the key."""
235
236     def __init__(self, caller, target, callback, config, action="findNode"):
237         ActionBase.__init__(self, caller, target, callback, config, action)
238
239     def processResponse(self, dict):
240         """Save the token received from each node."""
241         if dict["id"] in self.found:
242             self.found[dict["id"]].updateToken(dict.get('token', ''))
243         self.handleGotNodes(dict['nodes'])
244
245     def generateResult(self):
246         """Result is the K closest nodes to the target."""
247         self.sortNodes()
248         return (self.sorted_nodes[:self.config['K']], )
249     
250
251 class FindValue(ActionBase):
252     """Find the closest nodes to the key and check for values."""
253
254     def __init__(self, caller, target, callback, config, action="findValue"):
255         ActionBase.__init__(self, caller, target, callback, config, action)
256
257     def processResponse(self, dict):
258         """Save the number of values each node has."""
259         if dict["id"] in self.found:
260             self.found[dict["id"]].updateNumValues(dict.get('num', 0))
261         self.handleGotNodes(dict['nodes'])
262         
263     def generateResult(self):
264         """Result is the nodes that have values, sorted by proximity to the key."""
265         self.sortNodes()
266         return ([node for node in self.sorted_nodes if node.num_values > 0], )
267     
268
269 class GetValue(ActionBase):
270     """Retrieve values from a list of nodes."""
271     
272     def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"):
273         """Initialize the action with the locally available results.
274         
275         @type local_results: C{list} of C{string}
276         @param local_results: the values that were available in this node
277         """
278         ActionBase.__init__(self, caller, target, callback, config, action, num_results)
279         if local_results:
280             for result in local_results:
281                 self.results[result] = 1
282
283     def getNodesToProcess(self):
284         """Nodes are never added, always return the same sorted node list."""
285         return self.sorted_nodes
286     
287     def generateArgs(self, node):
288         """Arguments include the number of values to request."""
289         if node.num_values > 0:
290             # Request all desired results from each node, just to be sure.
291             num_values = abs(self.desired_results) - len(self.results)
292             assert num_values > 0
293             if num_values > node.num_values:
294                 num_values = 0
295             return (self.target, num_values), node.num_values
296         else:
297             raise ValueError, "Don't try and get values from this node because it doesn't have any"
298
299     def processResponse(self, dict):
300         """Save the returned values, calling the L{callback} each time there are new ones."""
301         if dict.has_key('values'):
302             def x(y, z=self.results):
303                 if not z.has_key(y):
304                     z[y] = 1
305                     return y
306                 else:
307                     return None
308             z = len(dict['values'])
309             v = filter(None, map(x, dict['values']))
310             if len(v):
311                 reactor.callLater(0, self.callback, self.target, v)
312
313     def generateResult(self):
314         """Results have all been returned, now send the empty list to end the action."""
315         return (self.target, [])
316         
317
318 class StoreValue(ActionBase):
319     """Store a value in a list of nodes."""
320
321     def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"):
322         """Initialize the action with the value to store.
323         
324         @type value: C{string}
325         @param value: the value to store in the nodes
326         """
327         ActionBase.__init__(self, caller, target, callback, config, action, num_results)
328         self.value = value
329         
330     def getNodesToProcess(self):
331         """Nodes are never added, always return the same sorted list."""
332         return self.sorted_nodes
333
334     def generateArgs(self, node):
335         """Args include the value to store and the node's token."""
336         if node.token:
337             return (self.target, self.value, node.token), 1
338         else:
339             raise ValueError, "Don't store at this node since we don't know it's token"
340
341     def processResponse(self, dict):
342         """Save the response, though it should be nothin but the ID."""
343         self.results[dict["id"]] = dict
344     
345     def generateResult(self):
346         """Return all the response IDs received."""
347         return (self.target, self.value, self.results.values())