Rename the knode functions to match the krpc method names.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """Details of how to perform actions on remote peers."""
5
6 from twisted.internet import reactor
7 from twisted.python import log
8
9 from khash import intify
10 from util import uncompact
11
12 class ActionBase:
13     """Base class for some long running asynchronous proccesses like finding nodes or values.
14     
15     @type caller: L{khashmir.Khashmir}
16     @ivar caller: the DHT instance that is performing the action
17     @type target: C{string}
18     @ivar target: the target of the action, usually a DHT key
19     @type config: C{dictionary}
20     @ivar config: the configuration variables for the DHT
21     @type action: C{string}
22     @ivar action: the name of the action to call on remote nodes
23     @type num: C{long}
24     @ivar num: the target key in integer form
25     @type queried: C{dictionary}
26     @ivar queried: the nodes that have been queried for this action,
27         keys are node IDs, values are the node itself
28     @type answered: C{dictionary}
29     @ivar answered: the nodes that have answered the queries
30     @type found: C{dictionary}
31     @ivar found: nodes that have been found so far by the action
32     @type sorted_nodes: C{list} of L{node.Node}
33     @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
34     @type results: C{dictionary}
35     @ivar results: keys are the results found so far by the action
36     @type desired_results: C{int}
37     @ivar desired_results: the minimum number of results that are needed
38         before the action should stop
39     @type callback: C{method}
40     @ivar callback: the method to call with the results
41     @type outstanding: C{int}
42     @ivar outstanding: the number of requests currently outstanding
43     @type outstanding_results: C{int}
44     @ivar outstanding_results: the number of results that are expected from
45         the requests that are currently outstanding
46     @type finished: C{boolean}
47     @ivar finished: whether the action is done
48     @type sort: C{method}
49     @ivar sort: used to sort nodes by their proximity to the target
50     """
51     
52     def __init__(self, caller, target, callback, config, stats, action, num_results = None):
53         """Initialize the action.
54         
55         @type caller: L{khashmir.Khashmir}
56         @param caller: the DHT instance that is performing the action
57         @type target: C{string}
58         @param target: the target of the action, usually a DHT key
59         @type callback: C{method}
60         @param callback: the method to call with the results
61         @type config: C{dictionary}
62         @param config: the configuration variables for the DHT
63         @type stats: L{stats.StatsLogger}
64         @param stats: the statistics gatherer
65         @type action: C{string}
66         @param action: the name of the action to call on remote nodes
67         @type num_results: C{int}
68         @param num_results: the minimum number of results that are needed before
69             the action should stop (optional, defaults to getting all the results)
70         
71         """
72         
73         self.caller = caller
74         self.target = target
75         self.config = config
76         self.action = action
77         stats.startedAction(action)
78         self.num = intify(target)
79         self.queried = {}
80         self.answered = {}
81         self.found = {}
82         self.sorted_nodes = []
83         self.results = {}
84         self.desired_results = num_results
85         self.callback = callback
86         self.outstanding = 0
87         self.outstanding_results = 0
88         self.finished = False
89     
90         def sort(a, b, num=self.num):
91             """Sort nodes relative to the ID we are looking for."""
92             x, y = num ^ a.num, num ^ b.num
93             if x > y:
94                 return 1
95             elif x < y:
96                 return -1
97             return 0
98         self.sort = sort
99
100     #{ Main operation
101     def goWithNodes(self, nodes):
102         """Start the action's process with a list of nodes to contact."""
103         for node in nodes:
104             if node.id == self.caller.node.id:
105                 continue
106             else:
107                 self.found[node.id] = node
108         self.sortNodes()
109         self.schedule()
110     
111     def schedule(self):
112         """Schedule requests to be sent to remote nodes."""
113         # Check if we are already done
114         if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
115                                      (self.desired_results < 0 and
116                                       len(self.answered) >= self.config['STORE_REDUNDANCY'])):
117             self.finished = True
118             result = self.generateResult()
119             reactor.callLater(0, self.callback, *result)
120
121         if self.finished or (self.desired_results and 
122                              len(self.results) + self.outstanding_results >= abs(self.desired_results)):
123             return
124         
125         # Loop for each node that should be processed
126         for node in self.getNodesToProcess():
127             # Don't send requests twice or to ourself
128             if node.id not in self.queried and node.id != self.caller.node.id:
129                 self.queried[node.id] = 1
130                 
131                 # Get the action to call on the node
132                 try:
133                     f = getattr(node, self.action)
134                 except AttributeError:
135                     log.msg("%s doesn't have a %s method!" % (node, self.action))
136                 else:
137                     # Get the arguments to the action's method
138                     try:
139                         args, expected_results = self.generateArgs(node)
140                     except ValueError:
141                         pass
142                     else:
143                         # Call the action on the remote node
144                         self.outstanding += 1
145                         self.outstanding_results += expected_results
146                         df = f(self.caller.node.id, *args)
147                         df.addCallbacks(self.gotResponse, self.actionFailed,
148                                         callbackArgs = (node, expected_results),
149                                         errbackArgs = (node, expected_results))
150                         
151             # We might have to stop for now
152             if (self.outstanding >= self.config['CONCURRENT_REQS'] or
153                 (self.desired_results and
154                  len(self.results) + self.outstanding_results >= abs(self.desired_results))):
155                 break
156             
157         assert self.outstanding >= 0
158         assert self.outstanding_results >= 0
159
160         # If no requests are outstanding, then we are done
161         if self.outstanding == 0:
162             self.finished = True
163             result = self.generateResult()
164             reactor.callLater(0, self.callback, *result)
165
166     def gotResponse(self, dict, node, expected_results):
167         """Receive a response from a remote node."""
168         self.caller.insertNode(node)
169         if self.finished or self.answered.has_key(node.id):
170             # a day late and a dollar short
171             return
172         self.outstanding -= 1
173         self.outstanding_results -= expected_results
174         self.answered[node.id] = 1
175         self.processResponse(dict['rsp'])
176         self.schedule()
177
178     def actionFailed(self, err, node, expected_results):
179         """Receive an error from a remote node."""
180         log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
181         log.err(err)
182         self.caller.table.nodeFailed(node)
183         self.outstanding -= 1
184         self.outstanding_results -= expected_results
185         self.schedule()
186     
187     def handleGotNodes(self, nodes):
188         """Process any received node contact info in the response.
189         
190         Not called by default, but suitable for being called by
191         L{processResponse} in a recursive node search.
192         """
193         for compact_node in nodes:
194             node_contact = uncompact(compact_node)
195             node = self.caller.Node(node_contact)
196             if not self.found.has_key(node.id):
197                 self.found[node.id] = node
198
199     def sortNodes(self):
200         """Sort the nodes, if necessary.
201         
202         Assumes nodes are never removed from the L{found} dictionary.
203         """
204         if len(self.sorted_nodes) != len(self.found):
205             self.sorted_nodes = self.found.values()
206             self.sorted_nodes.sort(self.sort)
207                 
208     #{ Subclass for specific actions
209     def getNodesToProcess(self):
210         """Generate a list of nodes to process next.
211         
212         This implementation is suitable for a recurring search over all nodes.
213         """
214         self.sortNodes()
215         return self.sorted_nodes[:self.config['K']]
216     
217     def generateArgs(self, node):
218         """Generate the arguments to the node's action.
219         
220         These arguments will be appended to our node ID when calling the action.
221         Also return the number of results expected from this request.
222         
223         @raise ValueError: if the node should not be queried
224         """
225         return (self.target, ), 0
226     
227     def processResponse(self, dict):
228         """Process the response dictionary received from the remote node."""
229         self.handleGotNodes(dict['nodes'])
230
231     def generateResult(self, nodes):
232         """Create the final result to return to the L{callback} function."""
233         return []
234         
235
236 class FindNode(ActionBase):
237     """Find the closest nodes to the key."""
238
239     def __init__(self, caller, target, callback, config, stats, action="find_node"):
240         ActionBase.__init__(self, caller, target, callback, config, stats, action)
241
242     def processResponse(self, dict):
243         """Save the token received from each node."""
244         if dict["id"] in self.found:
245             self.found[dict["id"]].updateToken(dict.get('token', ''))
246         self.handleGotNodes(dict['nodes'])
247
248     def generateResult(self):
249         """Result is the K closest nodes to the target."""
250         self.sortNodes()
251         return (self.sorted_nodes[:self.config['K']], )
252     
253
254 class FindValue(ActionBase):
255     """Find the closest nodes to the key and check for values."""
256
257     def __init__(self, caller, target, callback, config, stats, action="find_value"):
258         ActionBase.__init__(self, caller, target, callback, config, stats, action)
259
260     def processResponse(self, dict):
261         """Save the number of values each node has."""
262         if dict["id"] in self.found:
263             self.found[dict["id"]].updateNumValues(dict.get('num', 0))
264         self.handleGotNodes(dict['nodes'])
265         
266     def generateResult(self):
267         """Result is the nodes that have values, sorted by proximity to the key."""
268         self.sortNodes()
269         return ([node for node in self.sorted_nodes if node.num_values > 0], )
270     
271
272 class GetValue(ActionBase):
273     """Retrieve values from a list of nodes."""
274     
275     def __init__(self, caller, target, local_results, num_results, callback, config, stats, action="get_value"):
276         """Initialize the action with the locally available results.
277         
278         @type local_results: C{list} of C{string}
279         @param local_results: the values that were available in this node
280         """
281         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
282         if local_results:
283             for result in local_results:
284                 self.results[result] = 1
285
286     def getNodesToProcess(self):
287         """Nodes are never added, always return the same sorted node list."""
288         return self.sorted_nodes
289     
290     def generateArgs(self, node):
291         """Arguments include the number of values to request."""
292         if node.num_values > 0:
293             # Request all desired results from each node, just to be sure.
294             num_values = abs(self.desired_results) - len(self.results)
295             assert num_values > 0
296             if num_values > node.num_values:
297                 num_values = 0
298             return (self.target, num_values), node.num_values
299         else:
300             raise ValueError, "Don't try and get values from this node because it doesn't have any"
301
302     def processResponse(self, dict):
303         """Save the returned values, calling the L{callback} each time there are new ones."""
304         if dict.has_key('values'):
305             def x(y, z=self.results):
306                 if not z.has_key(y):
307                     z[y] = 1
308                     return y
309                 else:
310                     return None
311             z = len(dict['values'])
312             v = filter(None, map(x, dict['values']))
313             if len(v):
314                 reactor.callLater(0, self.callback, self.target, v)
315
316     def generateResult(self):
317         """Results have all been returned, now send the empty list to end the action."""
318         return (self.target, [])
319         
320
321 class StoreValue(ActionBase):
322     """Store a value in a list of nodes."""
323
324     def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
325         """Initialize the action with the value to store.
326         
327         @type value: C{string}
328         @param value: the value to store in the nodes
329         """
330         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
331         self.value = value
332         
333     def getNodesToProcess(self):
334         """Nodes are never added, always return the same sorted list."""
335         return self.sorted_nodes
336
337     def generateArgs(self, node):
338         """Args include the value to store and the node's token."""
339         if node.token:
340             return (self.target, self.value, node.token), 1
341         else:
342             raise ValueError, "Don't store at this node since we don't know it's token"
343
344     def processResponse(self, dict):
345         """Save the response, though it should be nothin but the ID."""
346         self.results[dict["id"]] = dict
347     
348     def generateResult(self):
349         """Return all the response IDs received."""
350         return (self.target, self.value, self.results.values())