c49959ff82741efba5570109209dcb480b74ad0e
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / actions.py
1
2 """Details of how to perform actions on remote peers."""
3
4 from datetime import datetime
5
6 from twisted.internet import reactor, defer
7 from twisted.python import log
8
9 from khash import intify
10 from ktable import K
11 from util import uncompact
12
13 class ActionBase:
14     """Base class for some long running asynchronous proccesses like finding nodes or values.
15     
16     @type caller: L{khashmir.Khashmir}
17     @ivar caller: the DHT instance that is performing the action
18     @type target: C{string}
19     @ivar target: the target of the action, usually a DHT key
20     @type config: C{dictionary}
21     @ivar config: the configuration variables for the DHT
22     @type action: C{string}
23     @ivar action: the name of the action to call on remote nodes
24     @type stats: L{stats.StatsLogger}
25     @ivar stats: the statistics modules to report to
26     @type num: C{long}
27     @ivar num: the target key in integer form
28     @type queried: C{dictionary}
29     @ivar queried: the nodes that have been queried for this action,
30         keys are node IDs, values are the node itself
31     @type answered: C{dictionary}
32     @ivar answered: the nodes that have answered the queries
33     @type failed: C{dictionary}
34     @ivar failed: the nodes that have failed to answer the queries
35     @type found: C{dictionary}
36     @ivar found: nodes that have been found so far by the action
37     @type sorted_nodes: C{list} of L{node.Node}
38     @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
39     @type results: C{dictionary}
40     @ivar results: keys are the results found so far by the action
41     @type desired_results: C{int}
42     @ivar desired_results: the minimum number of results that are needed
43         before the action should stop
44     @type callback: C{method}
45     @ivar callback: the method to call with the results
46     @type outstanding: C{dictionary}
47     @ivar outstanding: the nodes that have outstanding requests for this action,
48         keys are node IDs, values are the number of outstanding results from the node
49     @type outstanding_results: C{int}
50     @ivar outstanding_results: the total number of results that are expected from
51         the requests that are currently outstanding
52     @type finished: C{boolean}
53     @ivar finished: whether the action is done
54     @type started: C{datetime.datetime}
55     @ivar started: the time the action was started at
56     @type sort: C{method}
57     @ivar sort: used to sort nodes by their proximity to the target
58     """
59     
60     def __init__(self, caller, target, callback, config, stats, action, num_results = None):
61         """Initialize the action.
62         
63         @type caller: L{khashmir.Khashmir}
64         @param caller: the DHT instance that is performing the action
65         @type target: C{string}
66         @param target: the target of the action, usually a DHT key
67         @type callback: C{method}
68         @param callback: the method to call with the results
69         @type config: C{dictionary}
70         @param config: the configuration variables for the DHT
71         @type stats: L{stats.StatsLogger}
72         @param stats: the statistics gatherer
73         @type action: C{string}
74         @param action: the name of the action to call on remote nodes
75         @type num_results: C{int}
76         @param num_results: the minimum number of results that are needed before
77             the action should stop (optional, defaults to getting all the results)
78         
79         """
80         
81         self.caller = caller
82         self.target = target
83         self.config = config
84         self.action = action
85         self.stats = stats
86         self.stats.startedAction(action)
87         self.num = intify(target)
88         self.queried = {}
89         self.answered = {}
90         self.failed = {}
91         self.found = {}
92         self.sorted_nodes = []
93         self.results = {}
94         self.desired_results = num_results
95         self.callback = callback
96         self.outstanding = {}
97         self.outstanding_results = 0
98         self.finished = False
99         self.started = datetime.now()
100     
101         def sort(a, b, num=self.num):
102             """Sort nodes relative to the ID we are looking for."""
103             x, y = num ^ a.num, num ^ b.num
104             if x > y:
105                 return 1
106             elif x < y:
107                 return -1
108             return 0
109         self.sort = sort
110
111     #{ Main operation
112     def goWithNodes(self, nodes):
113         """Start the action's process with a list of nodes to contact."""
114         self.started = datetime.now()
115         for node in nodes:
116             self.found[node.id] = node
117         self.sortNodes()
118         self.schedule()
119     
120     def schedule(self):
121         """Schedule requests to be sent to remote nodes."""
122         if self.finished:
123             return
124         
125         # Get the nodes to be processed
126         nodes = self.getNodesToProcess()
127         
128         # Check if we are already done
129         if nodes is None or (self.desired_results and
130                              ((len(self.results) >= abs(self.desired_results)) or
131                               (self.desired_results < 0 and
132                                len(self.answered) >= self.config['STORE_REDUNDANCY']))):
133             self.finished = True
134             result = self.generateResult()
135             reactor.callLater(0, self.callback, *result)
136             return
137
138         # Check if we have enough outstanding results coming
139         if (self.desired_results and 
140             len(self.results) + self.outstanding_results >= abs(self.desired_results)):
141             return
142         
143         # Loop for each node that should be processed
144         for node in nodes:
145             # Don't send requests twice or to ourself
146             if node.id not in self.queried:
147                 self.queried[node.id] = 1
148                 
149                 # Get the action to call on the node
150                 if node.id == self.caller.node.id:
151                     try:
152                         f = getattr(self.caller, 'krpc_' + self.action)
153                     except AttributeError:
154                         log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
155                         continue
156                 else:
157                     try:
158                         f = getattr(node, self.action)
159                     except AttributeError:
160                         log.msg("%s doesn't have a %s method!" % (node, self.action))
161                         continue
162
163                 # Get the arguments to the action's method
164                 try:
165                     args, expected_results = self.generateArgs(node)
166                 except ValueError:
167                     continue
168
169                 # Call the action on the remote node
170                 self.outstanding[node.id] = expected_results
171                 self.outstanding_results += expected_results
172                 df = defer.maybeDeferred(f, *args)
173                 reactor.callLater(0, df.addCallbacks,
174                                   *(self.gotResponse, self.actionFailed),
175                                   **{'callbackArgs': (node, ),
176                                      'errbackArgs': (node, )})
177                         
178             # We might have to stop for now
179             if (len(self.outstanding) >= self.config['CONCURRENT_REQS'] or
180                 (self.desired_results and
181                  len(self.results) + self.outstanding_results >= abs(self.desired_results))):
182                 break
183             
184         assert self.outstanding_results >= 0
185
186         # If no requests are outstanding, then we are done
187         if len(self.outstanding) == 0:
188             self.finished = True
189             result = self.generateResult()
190             reactor.callLater(0, self.callback, *result)
191
192     def gotResponse(self, dict, node):
193         """Receive a response from a remote node."""
194         if node.id != self.caller.node.id:
195             reactor.callLater(0, self.caller.insertNode, node)
196         if self.finished or self.answered.has_key(node.id):
197             # a day late and a dollar short
198             return
199         self.answered[node.id] = 1
200         self.processResponse(dict)
201         if self.outstanding.has_key(node.id):
202             self.outstanding_results -= self.outstanding[node.id]
203             del self.outstanding[node.id]
204         self.schedule()
205
206     def actionFailed(self, err, node):
207         """Receive an error from a remote node."""
208         log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage()))
209         if node.id != self.caller.node.id:
210             self.caller.nodeFailed(node)
211         self.failed[node.id] = 1
212         if self.outstanding.has_key(node.id):
213             self.outstanding_results -= self.outstanding[node.id]
214             del self.outstanding[node.id]
215         self.schedule()
216     
217     def handleGotNodes(self, nodes):
218         """Process any received node contact info in the response.
219         
220         Not called by default, but suitable for being called by
221         L{processResponse} in a recursive node search.
222         """
223         for compact_node in nodes:
224             node_contact = uncompact(compact_node)
225             node = self.caller.Node(node_contact)
226             if not self.found.has_key(node.id):
227                 self.found[node.id] = node
228
229     def sortNodes(self):
230         """Sort the nodes, if necessary.
231         
232         Assumes nodes are never removed from the L{found} dictionary.
233         """
234         if len(self.sorted_nodes) != len(self.found):
235             self.sorted_nodes = self.found.values()
236             self.sorted_nodes.sort(self.sort)
237                 
238     #{ Subclass for specific actions
239     def getNodesToProcess(self):
240         """Generate a list of nodes to process next.
241         
242         This implementation is suitable for a recurring search over all nodes.
243         It will stop the search when the closest K nodes have been queried.
244         It also prematurely drops requests to nodes that have fallen way behind.
245         
246         @return: sorted list of nodes to query, or None if we are done
247         """
248         # Find the K closest nodes that haven't failed, count how many answered
249         self.sortNodes()
250         closest_K = []
251         ans = 0
252         for node in self.sorted_nodes:
253             if node.id not in self.failed:
254                 closest_K.append(node)
255                 if node.id in self.answered:
256                     ans += 1
257                 if len(closest_K) >= K:
258                     break
259         
260         # If we have responses from the K closest nodes, then we are done
261         if ans >= K:
262             log.msg('Got the answers we need, aborting search')
263             return None
264         
265         # Check the oustanding requests to see if they are still closest
266         for id in self.outstanding.keys():
267             if self.found[id] not in closest_K:
268                 # Request is not important, allow another to go
269                 log.msg("Request to %s/%s is taking too long, moving on" %
270                         (self.found[id].host, self.found[id].port))
271                 self.outstanding_results -= self.outstanding[id]
272                 del self.outstanding[id]
273
274         return closest_K
275     
276     def generateArgs(self, node):
277         """Generate the arguments to the node's action.
278         
279         Also return the number of results expected from this request.
280         
281         @raise ValueError: if the node should not be queried
282         """
283         return (self.caller.node.id, self.target), 0
284     
285     def processResponse(self, dict):
286         """Process the response dictionary received from the remote node."""
287         self.handleGotNodes(dict['nodes'])
288
289     def generateResult(self, nodes):
290         """Create the final result to return to the L{callback} function."""
291         self.stats.completedAction(self.action, self.started)
292         return []
293         
294
295 class FindNode(ActionBase):
296     """Find the closest nodes to the key."""
297
298     def __init__(self, caller, target, callback, config, stats, action="find_node"):
299         ActionBase.__init__(self, caller, target, callback, config, stats, action)
300
301     def processResponse(self, dict):
302         """Save the token received from each node."""
303         if dict["id"] in self.found:
304             self.found[dict["id"]].updateToken(dict.get('token', ''))
305         self.handleGotNodes(dict['nodes'])
306
307     def generateResult(self):
308         """Result is the K closest nodes to the target."""
309         self.sortNodes()
310         self.stats.completedAction(self.action, self.started)
311         closest_K = []
312         for node in self.sorted_nodes:
313             if node.id not in self.failed:
314                 closest_K.append(node)
315                 if len(closest_K) >= K:
316                     break
317         return (closest_K, )
318     
319
320 class FindValue(ActionBase):
321     """Find the closest nodes to the key and check for values."""
322
323     def __init__(self, caller, target, callback, config, stats, action="find_value"):
324         ActionBase.__init__(self, caller, target, callback, config, stats, action)
325
326     def processResponse(self, dict):
327         """Save the number of values each node has."""
328         if dict["id"] in self.found:
329             self.found[dict["id"]].updateNumValues(dict.get('num', 0))
330         self.handleGotNodes(dict['nodes'])
331         
332     def generateResult(self):
333         """Result is the nodes that have values, sorted by proximity to the key."""
334         self.sortNodes()
335         self.stats.completedAction(self.action, self.started)
336         return ([node for node in self.sorted_nodes if node.num_values > 0], )
337     
338
339 class GetValue(ActionBase):
340     """Retrieve values from a list of nodes."""
341     
342     def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
343         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
344
345     def getNodesToProcess(self):
346         """Nodes are never added, always return the same sorted node list."""
347         return self.sorted_nodes
348     
349     def generateArgs(self, node):
350         """Arguments include the number of values to request."""
351         if node.num_values > 0:
352             # Request all desired results from each node, just to be sure.
353             num_values = abs(self.desired_results) - len(self.results)
354             assert num_values > 0
355             if num_values > node.num_values:
356                 num_values = 0
357             return (self.caller.node.id, self.target, num_values), node.num_values
358         else:
359             raise ValueError, "Don't try and get values from this node because it doesn't have any"
360
361     def processResponse(self, dict):
362         """Save the returned values, calling the L{callback} each time there are new ones."""
363         if dict.has_key('values'):
364             def x(y, z=self.results):
365                 if not z.has_key(y):
366                     z[y] = 1
367                     return y
368                 else:
369                     return None
370             z = len(dict['values'])
371             v = filter(None, map(x, dict['values']))
372             if len(v):
373                 reactor.callLater(0, self.callback, self.target, v)
374
375     def generateResult(self):
376         """Results have all been returned, now send the empty list to end the action."""
377         self.stats.completedAction(self.action, self.started)
378         return (self.target, [])
379         
380
381 class StoreValue(ActionBase):
382     """Store a value in a list of nodes."""
383
384     def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
385         """Initialize the action with the value to store.
386         
387         @type value: C{string}
388         @param value: the value to store in the nodes
389         """
390         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
391         self.value = value
392         
393     def getNodesToProcess(self):
394         """Nodes are never added, always return the same sorted list."""
395         return self.sorted_nodes
396
397     def generateArgs(self, node):
398         """Args include the value to store and the node's token."""
399         if node.token:
400             return (self.caller.node.id, self.target, self.value, node.token), 1
401         else:
402             raise ValueError, "Don't store at this node since we don't know it's token"
403
404     def processResponse(self, dict):
405         """Save the response, though it should be nothin but the ID."""
406         self.results[dict["id"]] = dict
407     
408     def generateResult(self):
409         """Return all the response IDs received."""
410         self.stats.completedAction(self.action, self.started)
411         return (self.target, self.value, self.results.values())