Clean up the copyrights mentioned in the code.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / actions.py
1
2 """Details of how to perform actions on remote peers."""
3
4 from datetime import datetime
5
6 from twisted.internet import reactor, defer
7 from twisted.python import log
8
9 from khash import intify
10 from ktable import K
11 from util import uncompact
12
13 class ActionBase:
14     """Base class for some long running asynchronous proccesses like finding nodes or values.
15     
16     @type caller: L{khashmir.Khashmir}
17     @ivar caller: the DHT instance that is performing the action
18     @type target: C{string}
19     @ivar target: the target of the action, usually a DHT key
20     @type config: C{dictionary}
21     @ivar config: the configuration variables for the DHT
22     @type action: C{string}
23     @ivar action: the name of the action to call on remote nodes
24     @type stats: L{stats.StatsLogger}
25     @ivar stats: the statistics modules to report to
26     @type num: C{long}
27     @ivar num: the target key in integer form
28     @type queried: C{dictionary}
29     @ivar queried: the nodes that have been queried for this action,
30         keys are node IDs, values are the node itself
31     @type answered: C{dictionary}
32     @ivar answered: the nodes that have answered the queries
33     @type found: C{dictionary}
34     @ivar found: nodes that have been found so far by the action
35     @type sorted_nodes: C{list} of L{node.Node}
36     @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
37     @type results: C{dictionary}
38     @ivar results: keys are the results found so far by the action
39     @type desired_results: C{int}
40     @ivar desired_results: the minimum number of results that are needed
41         before the action should stop
42     @type callback: C{method}
43     @ivar callback: the method to call with the results
44     @type outstanding: C{int}
45     @ivar outstanding: the number of requests currently outstanding
46     @type outstanding_results: C{int}
47     @ivar outstanding_results: the number of results that are expected from
48         the requests that are currently outstanding
49     @type finished: C{boolean}
50     @ivar finished: whether the action is done
51     @type started: C{datetime.datetime}
52     @ivar started: the time the action was started at
53     @type sort: C{method}
54     @ivar sort: used to sort nodes by their proximity to the target
55     """
56     
57     def __init__(self, caller, target, callback, config, stats, action, num_results = None):
58         """Initialize the action.
59         
60         @type caller: L{khashmir.Khashmir}
61         @param caller: the DHT instance that is performing the action
62         @type target: C{string}
63         @param target: the target of the action, usually a DHT key
64         @type callback: C{method}
65         @param callback: the method to call with the results
66         @type config: C{dictionary}
67         @param config: the configuration variables for the DHT
68         @type stats: L{stats.StatsLogger}
69         @param stats: the statistics gatherer
70         @type action: C{string}
71         @param action: the name of the action to call on remote nodes
72         @type num_results: C{int}
73         @param num_results: the minimum number of results that are needed before
74             the action should stop (optional, defaults to getting all the results)
75         
76         """
77         
78         self.caller = caller
79         self.target = target
80         self.config = config
81         self.action = action
82         self.stats = stats
83         self.stats.startedAction(action)
84         self.num = intify(target)
85         self.queried = {}
86         self.answered = {}
87         self.found = {}
88         self.sorted_nodes = []
89         self.results = {}
90         self.desired_results = num_results
91         self.callback = callback
92         self.outstanding = 0
93         self.outstanding_results = 0
94         self.finished = False
95         self.started = datetime.now()
96     
97         def sort(a, b, num=self.num):
98             """Sort nodes relative to the ID we are looking for."""
99             x, y = num ^ a.num, num ^ b.num
100             if x > y:
101                 return 1
102             elif x < y:
103                 return -1
104             return 0
105         self.sort = sort
106
107     #{ Main operation
108     def goWithNodes(self, nodes):
109         """Start the action's process with a list of nodes to contact."""
110         self.started = datetime.now()
111         for node in nodes:
112             self.found[node.id] = node
113         self.sortNodes()
114         self.schedule()
115     
116     def schedule(self):
117         """Schedule requests to be sent to remote nodes."""
118         if self.finished:
119             return
120         
121         # Check if we are already done
122         if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
123                                      (self.desired_results < 0 and
124                                       len(self.answered) >= self.config['STORE_REDUNDANCY'])):
125             self.finished = True
126             result = self.generateResult()
127             reactor.callLater(0, self.callback, *result)
128             return
129
130         # Check if we have enough outstanding results coming
131         if (self.desired_results and 
132             len(self.results) + self.outstanding_results >= abs(self.desired_results)):
133             return
134         
135         # Loop for each node that should be processed
136         for node in self.getNodesToProcess():
137             # Don't send requests twice or to ourself
138             if node.id not in self.queried:
139                 self.queried[node.id] = 1
140                 
141                 # Get the action to call on the node
142                 if node.id == self.caller.node.id:
143                     try:
144                         f = getattr(self.caller, 'krpc_' + self.action)
145                     except AttributeError:
146                         log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
147                         continue
148                 else:
149                     try:
150                         f = getattr(node, self.action)
151                     except AttributeError:
152                         log.msg("%s doesn't have a %s method!" % (node, self.action))
153                         continue
154
155                 # Get the arguments to the action's method
156                 try:
157                     args, expected_results = self.generateArgs(node)
158                 except ValueError:
159                     continue
160
161                 # Call the action on the remote node
162                 self.outstanding += 1
163                 self.outstanding_results += expected_results
164                 df = defer.maybeDeferred(f, *args)
165                 reactor.callLater(0, df.addCallbacks,
166                                   *(self.gotResponse, self.actionFailed),
167                                   **{'callbackArgs': (node, expected_results, df),
168                                      'errbackArgs': (node, expected_results, df)})
169                         
170             # We might have to stop for now
171             if (self.outstanding >= self.config['CONCURRENT_REQS'] or
172                 (self.desired_results and
173                  len(self.results) + self.outstanding_results >= abs(self.desired_results))):
174                 break
175             
176         assert self.outstanding >= 0
177         assert self.outstanding_results >= 0
178
179         # If no requests are outstanding, then we are done
180         if self.outstanding == 0:
181             self.finished = True
182             result = self.generateResult()
183             reactor.callLater(0, self.callback, *result)
184
185     def gotResponse(self, dict, node, expected_results, df):
186         """Receive a response from a remote node."""
187         reactor.callLater(0, self.caller.insertNode, node)
188         if self.finished or self.answered.has_key(node.id):
189             # a day late and a dollar short
190             return
191         self.outstanding -= 1
192         self.outstanding_results -= expected_results
193         self.answered[node.id] = 1
194         self.processResponse(dict)
195         self.schedule()
196
197     def actionFailed(self, err, node, expected_results, df):
198         """Receive an error from a remote node."""
199         log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage()))
200         self.caller.table.nodeFailed(node)
201         self.outstanding -= 1
202         self.outstanding_results -= expected_results
203         self.schedule()
204     
205     def handleGotNodes(self, nodes):
206         """Process any received node contact info in the response.
207         
208         Not called by default, but suitable for being called by
209         L{processResponse} in a recursive node search.
210         """
211         for compact_node in nodes:
212             node_contact = uncompact(compact_node)
213             node = self.caller.Node(node_contact)
214             if not self.found.has_key(node.id):
215                 self.found[node.id] = node
216
217     def sortNodes(self):
218         """Sort the nodes, if necessary.
219         
220         Assumes nodes are never removed from the L{found} dictionary.
221         """
222         if len(self.sorted_nodes) != len(self.found):
223             self.sorted_nodes = self.found.values()
224             self.sorted_nodes.sort(self.sort)
225                 
226     #{ Subclass for specific actions
227     def getNodesToProcess(self):
228         """Generate a list of nodes to process next.
229         
230         This implementation is suitable for a recurring search over all nodes.
231         """
232         self.sortNodes()
233         return self.sorted_nodes[:K]
234     
235     def generateArgs(self, node):
236         """Generate the arguments to the node's action.
237         
238         Also return the number of results expected from this request.
239         
240         @raise ValueError: if the node should not be queried
241         """
242         return (self.caller.node.id, self.target), 0
243     
244     def processResponse(self, dict):
245         """Process the response dictionary received from the remote node."""
246         self.handleGotNodes(dict['nodes'])
247
248     def generateResult(self, nodes):
249         """Create the final result to return to the L{callback} function."""
250         self.stats.completedAction(self.action, self.started)
251         return []
252         
253
254 class FindNode(ActionBase):
255     """Find the closest nodes to the key."""
256
257     def __init__(self, caller, target, callback, config, stats, action="find_node"):
258         ActionBase.__init__(self, caller, target, callback, config, stats, action)
259
260     def processResponse(self, dict):
261         """Save the token received from each node."""
262         if dict["id"] in self.found:
263             self.found[dict["id"]].updateToken(dict.get('token', ''))
264         self.handleGotNodes(dict['nodes'])
265
266     def generateResult(self):
267         """Result is the K closest nodes to the target."""
268         self.sortNodes()
269         self.stats.completedAction(self.action, self.started)
270         return (self.sorted_nodes[:K], )
271     
272
273 class FindValue(ActionBase):
274     """Find the closest nodes to the key and check for values."""
275
276     def __init__(self, caller, target, callback, config, stats, action="find_value"):
277         ActionBase.__init__(self, caller, target, callback, config, stats, action)
278
279     def processResponse(self, dict):
280         """Save the number of values each node has."""
281         if dict["id"] in self.found:
282             self.found[dict["id"]].updateNumValues(dict.get('num', 0))
283         self.handleGotNodes(dict['nodes'])
284         
285     def generateResult(self):
286         """Result is the nodes that have values, sorted by proximity to the key."""
287         self.sortNodes()
288         self.stats.completedAction(self.action, self.started)
289         return ([node for node in self.sorted_nodes if node.num_values > 0], )
290     
291
292 class GetValue(ActionBase):
293     """Retrieve values from a list of nodes."""
294     
295     def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
296         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
297
298     def getNodesToProcess(self):
299         """Nodes are never added, always return the same sorted node list."""
300         return self.sorted_nodes
301     
302     def generateArgs(self, node):
303         """Arguments include the number of values to request."""
304         if node.num_values > 0:
305             # Request all desired results from each node, just to be sure.
306             num_values = abs(self.desired_results) - len(self.results)
307             assert num_values > 0
308             if num_values > node.num_values:
309                 num_values = 0
310             return (self.caller.node.id, self.target, num_values), node.num_values
311         else:
312             raise ValueError, "Don't try and get values from this node because it doesn't have any"
313
314     def processResponse(self, dict):
315         """Save the returned values, calling the L{callback} each time there are new ones."""
316         if dict.has_key('values'):
317             def x(y, z=self.results):
318                 if not z.has_key(y):
319                     z[y] = 1
320                     return y
321                 else:
322                     return None
323             z = len(dict['values'])
324             v = filter(None, map(x, dict['values']))
325             if len(v):
326                 reactor.callLater(0, self.callback, self.target, v)
327
328     def generateResult(self):
329         """Results have all been returned, now send the empty list to end the action."""
330         self.stats.completedAction(self.action, self.started)
331         return (self.target, [])
332         
333
334 class StoreValue(ActionBase):
335     """Store a value in a list of nodes."""
336
337     def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
338         """Initialize the action with the value to store.
339         
340         @type value: C{string}
341         @param value: the value to store in the nodes
342         """
343         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
344         self.value = value
345         
346     def getNodesToProcess(self):
347         """Nodes are never added, always return the same sorted list."""
348         return self.sorted_nodes
349
350     def generateArgs(self, node):
351         """Args include the value to store and the node's token."""
352         if node.token:
353             return (self.caller.node.id, self.target, self.value, node.token), 1
354         else:
355             raise ValueError, "Don't store at this node since we don't know it's token"
356
357     def processResponse(self, dict):
358         """Save the response, though it should be nothin but the ID."""
359         self.results[dict["id"]] = dict
360     
361     def generateResult(self):
362         """Return all the response IDs received."""
363         self.stats.completedAction(self.action, self.started)
364         return (self.target, self.value, self.results.values())