Clean up the DHT config, making K and HASH_LENGTH constants instead.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """Details of how to perform actions on remote peers."""
5
6 from twisted.internet import reactor, defer
7 from twisted.python import log
8
9 from khash import intify
10 from ktable import K
11 from util import uncompact
12
13 class ActionBase:
14     """Base class for some long running asynchronous proccesses like finding nodes or values.
15     
16     @type caller: L{khashmir.Khashmir}
17     @ivar caller: the DHT instance that is performing the action
18     @type target: C{string}
19     @ivar target: the target of the action, usually a DHT key
20     @type config: C{dictionary}
21     @ivar config: the configuration variables for the DHT
22     @type action: C{string}
23     @ivar action: the name of the action to call on remote nodes
24     @type num: C{long}
25     @ivar num: the target key in integer form
26     @type queried: C{dictionary}
27     @ivar queried: the nodes that have been queried for this action,
28         keys are node IDs, values are the node itself
29     @type answered: C{dictionary}
30     @ivar answered: the nodes that have answered the queries
31     @type found: C{dictionary}
32     @ivar found: nodes that have been found so far by the action
33     @type sorted_nodes: C{list} of L{node.Node}
34     @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
35     @type results: C{dictionary}
36     @ivar results: keys are the results found so far by the action
37     @type desired_results: C{int}
38     @ivar desired_results: the minimum number of results that are needed
39         before the action should stop
40     @type callback: C{method}
41     @ivar callback: the method to call with the results
42     @type outstanding: C{int}
43     @ivar outstanding: the number of requests currently outstanding
44     @type outstanding_results: C{int}
45     @ivar outstanding_results: the number of results that are expected from
46         the requests that are currently outstanding
47     @type finished: C{boolean}
48     @ivar finished: whether the action is done
49     @type sort: C{method}
50     @ivar sort: used to sort nodes by their proximity to the target
51     """
52     
53     def __init__(self, caller, target, callback, config, stats, action, num_results = None):
54         """Initialize the action.
55         
56         @type caller: L{khashmir.Khashmir}
57         @param caller: the DHT instance that is performing the action
58         @type target: C{string}
59         @param target: the target of the action, usually a DHT key
60         @type callback: C{method}
61         @param callback: the method to call with the results
62         @type config: C{dictionary}
63         @param config: the configuration variables for the DHT
64         @type stats: L{stats.StatsLogger}
65         @param stats: the statistics gatherer
66         @type action: C{string}
67         @param action: the name of the action to call on remote nodes
68         @type num_results: C{int}
69         @param num_results: the minimum number of results that are needed before
70             the action should stop (optional, defaults to getting all the results)
71         
72         """
73         
74         self.caller = caller
75         self.target = target
76         self.config = config
77         self.action = action
78         stats.startedAction(action)
79         self.num = intify(target)
80         self.queried = {}
81         self.answered = {}
82         self.found = {}
83         self.sorted_nodes = []
84         self.results = {}
85         self.desired_results = num_results
86         self.callback = callback
87         self.outstanding = 0
88         self.outstanding_results = 0
89         self.finished = False
90     
91         def sort(a, b, num=self.num):
92             """Sort nodes relative to the ID we are looking for."""
93             x, y = num ^ a.num, num ^ b.num
94             if x > y:
95                 return 1
96             elif x < y:
97                 return -1
98             return 0
99         self.sort = sort
100
101     #{ Main operation
102     def goWithNodes(self, nodes):
103         """Start the action's process with a list of nodes to contact."""
104         for node in nodes:
105             self.found[node.id] = node
106         self.sortNodes()
107         self.schedule()
108     
109     def schedule(self):
110         """Schedule requests to be sent to remote nodes."""
111         # Check if we are already done
112         if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
113                                      (self.desired_results < 0 and
114                                       len(self.answered) >= self.config['STORE_REDUNDANCY'])):
115             self.finished = True
116             result = self.generateResult()
117             reactor.callLater(0, self.callback, *result)
118
119         if self.finished or (self.desired_results and 
120                              len(self.results) + self.outstanding_results >= abs(self.desired_results)):
121             return
122         
123         # Loop for each node that should be processed
124         for node in self.getNodesToProcess():
125             # Don't send requests twice or to ourself
126             if node.id not in self.queried:
127                 self.queried[node.id] = 1
128                 
129                 # Get the action to call on the node
130                 if node.id == self.caller.node.id:
131                     try:
132                         f = getattr(self.caller, 'krpc_' + self.action)
133                     except AttributeError:
134                         log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
135                         continue
136                 else:
137                     try:
138                         f = getattr(node, self.action)
139                     except AttributeError:
140                         log.msg("%s doesn't have a %s method!" % (node, self.action))
141                         continue
142
143                 # Get the arguments to the action's method
144                 try:
145                     args, expected_results = self.generateArgs(node)
146                 except ValueError:
147                     continue
148
149                 # Call the action on the remote node
150                 self.outstanding += 1
151                 self.outstanding_results += expected_results
152                 df = defer.maybeDeferred(f, *args)
153                 reactor.callLater(0, df.addCallbacks,
154                                   *(self.gotResponse, self.actionFailed),
155                                   **{'callbackArgs': (node, expected_results, df),
156                                      'errbackArgs': (node, expected_results, df)})
157                         
158             # We might have to stop for now
159             if (self.outstanding >= self.config['CONCURRENT_REQS'] or
160                 (self.desired_results and
161                  len(self.results) + self.outstanding_results >= abs(self.desired_results))):
162                 break
163             
164         assert self.outstanding >= 0
165         assert self.outstanding_results >= 0
166
167         # If no requests are outstanding, then we are done
168         if self.outstanding == 0:
169             self.finished = True
170             result = self.generateResult()
171             reactor.callLater(0, self.callback, *result)
172
173     def gotResponse(self, dict, node, expected_results, df):
174         """Receive a response from a remote node."""
175         self.caller.insertNode(node)
176         if self.finished or self.answered.has_key(node.id):
177             # a day late and a dollar short
178             return
179         self.outstanding -= 1
180         self.outstanding_results -= expected_results
181         self.answered[node.id] = 1
182         self.processResponse(dict)
183         self.schedule()
184
185     def actionFailed(self, err, node, expected_results, df):
186         """Receive an error from a remote node."""
187         log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
188         log.err(err)
189         self.caller.table.nodeFailed(node)
190         self.outstanding -= 1
191         self.outstanding_results -= expected_results
192         self.schedule()
193     
194     def handleGotNodes(self, nodes):
195         """Process any received node contact info in the response.
196         
197         Not called by default, but suitable for being called by
198         L{processResponse} in a recursive node search.
199         """
200         for compact_node in nodes:
201             node_contact = uncompact(compact_node)
202             node = self.caller.Node(node_contact)
203             if not self.found.has_key(node.id):
204                 self.found[node.id] = node
205
206     def sortNodes(self):
207         """Sort the nodes, if necessary.
208         
209         Assumes nodes are never removed from the L{found} dictionary.
210         """
211         if len(self.sorted_nodes) != len(self.found):
212             self.sorted_nodes = self.found.values()
213             self.sorted_nodes.sort(self.sort)
214                 
215     #{ Subclass for specific actions
216     def getNodesToProcess(self):
217         """Generate a list of nodes to process next.
218         
219         This implementation is suitable for a recurring search over all nodes.
220         """
221         self.sortNodes()
222         return self.sorted_nodes[:K]
223     
224     def generateArgs(self, node):
225         """Generate the arguments to the node's action.
226         
227         Also return the number of results expected from this request.
228         
229         @raise ValueError: if the node should not be queried
230         """
231         return (self.caller.node.id, self.target), 0
232     
233     def processResponse(self, dict):
234         """Process the response dictionary received from the remote node."""
235         self.handleGotNodes(dict['nodes'])
236
237     def generateResult(self, nodes):
238         """Create the final result to return to the L{callback} function."""
239         return []
240         
241
242 class FindNode(ActionBase):
243     """Find the closest nodes to the key."""
244
245     def __init__(self, caller, target, callback, config, stats, action="find_node"):
246         ActionBase.__init__(self, caller, target, callback, config, stats, action)
247
248     def processResponse(self, dict):
249         """Save the token received from each node."""
250         if dict["id"] in self.found:
251             self.found[dict["id"]].updateToken(dict.get('token', ''))
252         self.handleGotNodes(dict['nodes'])
253
254     def generateResult(self):
255         """Result is the K closest nodes to the target."""
256         self.sortNodes()
257         return (self.sorted_nodes[:K], )
258     
259
260 class FindValue(ActionBase):
261     """Find the closest nodes to the key and check for values."""
262
263     def __init__(self, caller, target, callback, config, stats, action="find_value"):
264         ActionBase.__init__(self, caller, target, callback, config, stats, action)
265
266     def processResponse(self, dict):
267         """Save the number of values each node has."""
268         if dict["id"] in self.found:
269             self.found[dict["id"]].updateNumValues(dict.get('num', 0))
270         self.handleGotNodes(dict['nodes'])
271         
272     def generateResult(self):
273         """Result is the nodes that have values, sorted by proximity to the key."""
274         self.sortNodes()
275         return ([node for node in self.sorted_nodes if node.num_values > 0], )
276     
277
278 class GetValue(ActionBase):
279     """Retrieve values from a list of nodes."""
280     
281     def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
282         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
283
284     def getNodesToProcess(self):
285         """Nodes are never added, always return the same sorted node list."""
286         return self.sorted_nodes
287     
288     def generateArgs(self, node):
289         """Arguments include the number of values to request."""
290         if node.num_values > 0:
291             # Request all desired results from each node, just to be sure.
292             num_values = abs(self.desired_results) - len(self.results)
293             assert num_values > 0
294             if num_values > node.num_values:
295                 num_values = 0
296             return (self.caller.node.id, self.target, num_values), node.num_values
297         else:
298             raise ValueError, "Don't try and get values from this node because it doesn't have any"
299
300     def processResponse(self, dict):
301         """Save the returned values, calling the L{callback} each time there are new ones."""
302         if dict.has_key('values'):
303             def x(y, z=self.results):
304                 if not z.has_key(y):
305                     z[y] = 1
306                     return y
307                 else:
308                     return None
309             z = len(dict['values'])
310             v = filter(None, map(x, dict['values']))
311             if len(v):
312                 reactor.callLater(0, self.callback, self.target, v)
313
314     def generateResult(self):
315         """Results have all been returned, now send the empty list to end the action."""
316         return (self.target, [])
317         
318
319 class StoreValue(ActionBase):
320     """Store a value in a list of nodes."""
321
322     def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
323         """Initialize the action with the value to store.
324         
325         @type value: C{string}
326         @param value: the value to store in the nodes
327         """
328         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
329         self.value = value
330         
331     def getNodesToProcess(self):
332         """Nodes are never added, always return the same sorted list."""
333         return self.sorted_nodes
334
335     def generateArgs(self, node):
336         """Args include the value to store and the node's token."""
337         if node.token:
338             return (self.caller.node.id, self.target, self.value, node.token), 1
339         else:
340             raise ValueError, "Don't store at this node since we don't know it's token"
341
342     def processResponse(self, dict):
343         """Save the response, though it should be nothin but the ID."""
344         self.results[dict["id"]] = dict
345     
346     def generateResult(self):
347         """Return all the response IDs received."""
348         return (self.target, self.value, self.results.values())