8542706691fdd8d72394db040c1cb53a17ed226f
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / actions.py
1
2 """Details of how to perform actions on remote peers."""
3
4 from datetime import datetime
5
6 from twisted.internet import reactor, defer
7 from twisted.python import log
8
9 from khash import intify
10 from ktable import K
11 from util import uncompact
12
13 class ActionBase:
14     """Base class for some long running asynchronous proccesses like finding nodes or values.
15     
16     @type caller: L{khashmir.Khashmir}
17     @ivar caller: the DHT instance that is performing the action
18     @type target: C{string}
19     @ivar target: the target of the action, usually a DHT key
20     @type config: C{dictionary}
21     @ivar config: the configuration variables for the DHT
22     @type action: C{string}
23     @ivar action: the name of the action to call on remote nodes
24     @type stats: L{stats.StatsLogger}
25     @ivar stats: the statistics modules to report to
26     @type num: C{long}
27     @ivar num: the target key in integer form
28     @type queried: C{dictionary}
29     @ivar queried: the nodes that have been queried for this action,
30         keys are node IDs, values are the node itself
31     @type answered: C{dictionary}
32     @ivar answered: the nodes that have answered the queries
33     @type failed: C{dictionary}
34     @ivar failed: the nodes that have failed to answer the queries
35     @type found: C{dictionary}
36     @ivar found: nodes that have been found so far by the action
37     @type sorted_nodes: C{list} of L{node.Node}
38     @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
39     @type results: C{dictionary}
40     @ivar results: keys are the results found so far by the action
41     @type desired_results: C{int}
42     @ivar desired_results: the minimum number of results that are needed
43         before the action should stop
44     @type callback: C{method}
45     @ivar callback: the method to call with the results
46     @type outstanding: C{dictionary}
47     @ivar outstanding: the nodes that have outstanding requests for this action,
48         keys are node IDs, values are the number of outstanding results from the node
49     @type outstanding_results: C{int}
50     @ivar outstanding_results: the total number of results that are expected from
51         the requests that are currently outstanding
52     @type finished: C{boolean}
53     @ivar finished: whether the action is done
54     @type started: C{datetime.datetime}
55     @ivar started: the time the action was started at
56     @type sort: C{method}
57     @ivar sort: used to sort nodes by their proximity to the target
58     """
59     
60     def __init__(self, caller, target, callback, config, stats, action, num_results = None):
61         """Initialize the action.
62         
63         @type caller: L{khashmir.Khashmir}
64         @param caller: the DHT instance that is performing the action
65         @type target: C{string}
66         @param target: the target of the action, usually a DHT key
67         @type callback: C{method}
68         @param callback: the method to call with the results
69         @type config: C{dictionary}
70         @param config: the configuration variables for the DHT
71         @type stats: L{stats.StatsLogger}
72         @param stats: the statistics gatherer
73         @type action: C{string}
74         @param action: the name of the action to call on remote nodes
75         @type num_results: C{int}
76         @param num_results: the minimum number of results that are needed before
77             the action should stop (optional, defaults to getting all the results)
78         
79         """
80         
81         self.caller = caller
82         self.target = target
83         self.config = config
84         self.action = action
85         self.stats = stats
86         self.stats.startedAction(action)
87         self.num = intify(target)
88         self.queried = {}
89         self.answered = {}
90         self.failed = {}
91         self.found = {}
92         self.sorted_nodes = []
93         self.results = {}
94         self.desired_results = num_results
95         self.callback = callback
96         self.outstanding = {}
97         self.outstanding_results = 0
98         self.finished = False
99         self.started = datetime.now()
100     
101         def sort(a, b, num=self.num):
102             """Sort nodes relative to the ID we are looking for."""
103             x, y = num ^ a.num, num ^ b.num
104             if x > y:
105                 return 1
106             elif x < y:
107                 return -1
108             return 0
109         self.sort = sort
110
111     #{ Main operation
112     def goWithNodes(self, nodes):
113         """Start the action's process with a list of nodes to contact."""
114         self.started = datetime.now()
115         for node in nodes:
116             self.found[node.id] = node
117         self.sortNodes()
118         self.schedule()
119     
120     def schedule(self):
121         """Schedule requests to be sent to remote nodes."""
122         if self.finished:
123             return
124         
125         # Get the nodes to be processed
126         nodes = self.getNodesToProcess()
127         
128         # Check if we are already done
129         if nodes is None or (self.desired_results and
130                              ((len(self.results) >= abs(self.desired_results)) or
131                               (self.desired_results < 0 and
132                                len(self.answered) >= self.config['STORE_REDUNDANCY']))):
133             self.finished = True
134             result = self.generateResult()
135             reactor.callLater(0, self.callback, *result)
136             return
137
138         # Check if we have enough outstanding results coming
139         if (self.desired_results and 
140             len(self.results) + self.outstanding_results >= abs(self.desired_results)):
141             return
142         
143         # Loop for each node that should be processed
144         for node in nodes:
145             # Don't send requests twice or to ourself
146             if node.id not in self.queried:
147                 self.queried[node.id] = 1
148                 
149                 # Get the action to call on the node
150                 if node.id == self.caller.node.id:
151                     try:
152                         f = getattr(self.caller, 'krpc_' + self.action)
153                     except AttributeError:
154                         log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
155                         continue
156                 else:
157                     try:
158                         f = getattr(node, self.action)
159                     except AttributeError:
160                         log.msg("%s doesn't have a %s method!" % (node, self.action))
161                         continue
162
163                 # Get the arguments to the action's method
164                 try:
165                     args, expected_results = self.generateArgs(node)
166                 except ValueError:
167                     continue
168
169                 # Call the action on the remote node
170                 self.outstanding[node.id] = expected_results
171                 self.outstanding_results += expected_results
172                 df = defer.maybeDeferred(f, *args)
173                 reactor.callLater(0, df.addCallbacks,
174                                   *(self.gotResponse, self.actionFailed),
175                                   **{'callbackArgs': (node, ),
176                                      'errbackArgs': (node, )})
177                         
178             # We might have to stop for now
179             if (len(self.outstanding) >= self.config['CONCURRENT_REQS'] or
180                 (self.desired_results and
181                  len(self.results) + self.outstanding_results >= abs(self.desired_results))):
182                 break
183             
184         assert self.outstanding_results >= 0
185
186         # If no requests are outstanding, then we are done
187         if len(self.outstanding) == 0:
188             self.finished = True
189             result = self.generateResult()
190             reactor.callLater(0, self.callback, *result)
191
192     def gotResponse(self, dict, node):
193         """Receive a response from a remote node."""
194         if node.id != self.caller.node.id:
195             reactor.callLater(0, self.caller.insertNode, node)
196         if self.finished or self.answered.has_key(node.id):
197             # a day late and a dollar short
198             return
199         try:
200             # Process the response
201             self.processResponse(dict)
202             self.answered[node.id] = 1
203         except Exception, e:
204             # Unexpected error with the response
205             log.msg("action %s failed on %s/%s: %r" % (self.action, node.host, node.port, e))
206             if node.id != self.caller.node.id:
207                 self.caller.nodeFailed(node)
208             self.failed[node.id] = 1
209         if self.outstanding.has_key(node.id):
210             self.outstanding_results -= self.outstanding[node.id]
211             del self.outstanding[node.id]
212         self.schedule()
213
214     def actionFailed(self, err, node):
215         """Receive an error from a remote node."""
216         log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage()))
217         if node.id != self.caller.node.id:
218             self.caller.nodeFailed(node)
219         self.failed[node.id] = 1
220         if self.outstanding.has_key(node.id):
221             self.outstanding_results -= self.outstanding[node.id]
222             del self.outstanding[node.id]
223         self.schedule()
224     
225     def handleGotNodes(self, nodes):
226         """Process any received node contact info in the response.
227         
228         Not called by default, but suitable for being called by
229         L{processResponse} in a recursive node search.
230         """
231         if nodes and type(nodes) != list:
232             raise ValueError, "got a malformed response, from bittorrent perhaps"
233         for compact_node in nodes:
234             node_contact = uncompact(compact_node)
235             node = self.caller.Node(node_contact)
236             if not self.found.has_key(node.id):
237                 self.found[node.id] = node
238
239     def sortNodes(self):
240         """Sort the nodes, if necessary.
241         
242         Assumes nodes are never removed from the L{found} dictionary.
243         """
244         if len(self.sorted_nodes) != len(self.found):
245             self.sorted_nodes = self.found.values()
246             self.sorted_nodes.sort(self.sort)
247                 
248     #{ Subclass for specific actions
249     def getNodesToProcess(self):
250         """Generate a list of nodes to process next.
251         
252         This implementation is suitable for a recurring search over all nodes.
253         It will stop the search when the closest K nodes have been queried.
254         It also prematurely drops requests to nodes that have fallen way behind.
255         
256         @return: sorted list of nodes to query, or None if we are done
257         """
258         # Find the K closest nodes that haven't failed, count how many answered
259         self.sortNodes()
260         closest_K = []
261         ans = 0
262         for node in self.sorted_nodes:
263             if node.id not in self.failed:
264                 closest_K.append(node)
265                 if node.id in self.answered:
266                     ans += 1
267                 if len(closest_K) >= K:
268                     break
269         
270         # If we have responses from the K closest nodes, then we are done
271         if ans >= K:
272             log.msg('Got the answers we need, aborting search')
273             return None
274         
275         # Check the oustanding requests to see if they are still closest
276         for id in self.outstanding.keys():
277             if self.found[id] not in closest_K:
278                 # Request is not important, allow another to go
279                 log.msg("Request to %s/%s is taking too long, moving on" %
280                         (self.found[id].host, self.found[id].port))
281                 self.outstanding_results -= self.outstanding[id]
282                 del self.outstanding[id]
283
284         return closest_K
285     
286     def generateArgs(self, node):
287         """Generate the arguments to the node's action.
288         
289         Also return the number of results expected from this request.
290         
291         @raise ValueError: if the node should not be queried
292         """
293         return (self.caller.node.id, self.target), 0
294     
295     def processResponse(self, dict):
296         """Process the response dictionary received from the remote node."""
297         self.handleGotNodes(dict['nodes'])
298
299     def generateResult(self, nodes):
300         """Create the final result to return to the L{callback} function."""
301         self.stats.completedAction(self.action, self.started)
302         return []
303         
304
305 class FindNode(ActionBase):
306     """Find the closest nodes to the key."""
307
308     def __init__(self, caller, target, callback, config, stats, action="find_node"):
309         ActionBase.__init__(self, caller, target, callback, config, stats, action)
310
311     def processResponse(self, dict):
312         """Save the token received from each node."""
313         if dict["id"] in self.found:
314             self.found[dict["id"]].updateToken(dict.get('token', ''))
315         self.handleGotNodes(dict['nodes'])
316
317     def generateResult(self):
318         """Result is the K closest nodes to the target."""
319         self.sortNodes()
320         self.stats.completedAction(self.action, self.started)
321         closest_K = []
322         for node in self.sorted_nodes:
323             if node.id not in self.failed:
324                 closest_K.append(node)
325                 if len(closest_K) >= K:
326                     break
327         return (closest_K, )
328     
329
330 class FindValue(ActionBase):
331     """Find the closest nodes to the key and check for values."""
332
333     def __init__(self, caller, target, callback, config, stats, action="find_value"):
334         ActionBase.__init__(self, caller, target, callback, config, stats, action)
335
336     def processResponse(self, dict):
337         """Save the number of values each node has."""
338         if dict["id"] in self.found:
339             self.found[dict["id"]].updateNumValues(dict.get('num', 0))
340         self.handleGotNodes(dict['nodes'])
341         
342     def generateResult(self):
343         """Result is the nodes that have values, sorted by proximity to the key."""
344         self.sortNodes()
345         self.stats.completedAction(self.action, self.started)
346         return ([node for node in self.sorted_nodes if node.num_values > 0], )
347     
348
349 class GetValue(ActionBase):
350     """Retrieve values from a list of nodes."""
351     
352     def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
353         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
354
355     def getNodesToProcess(self):
356         """Nodes are never added, always return the same sorted node list."""
357         return self.sorted_nodes
358     
359     def generateArgs(self, node):
360         """Arguments include the number of values to request."""
361         if node.num_values > 0:
362             # Request all desired results from each node, just to be sure.
363             num_values = abs(self.desired_results) - len(self.results)
364             assert num_values > 0
365             if num_values > node.num_values:
366                 num_values = 0
367             return (self.caller.node.id, self.target, num_values), node.num_values
368         else:
369             raise ValueError, "Don't try and get values from this node because it doesn't have any"
370
371     def processResponse(self, dict):
372         """Save the returned values, calling the L{callback} each time there are new ones."""
373         if dict.has_key('values'):
374             def x(y, z=self.results):
375                 if not z.has_key(y):
376                     z[y] = 1
377                     return y
378                 else:
379                     return None
380             z = len(dict['values'])
381             v = filter(None, map(x, dict['values']))
382             if len(v):
383                 reactor.callLater(0, self.callback, self.target, v)
384
385     def generateResult(self):
386         """Results have all been returned, now send the empty list to end the action."""
387         self.stats.completedAction(self.action, self.started)
388         return (self.target, [])
389         
390
391 class StoreValue(ActionBase):
392     """Store a value in a list of nodes."""
393
394     def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
395         """Initialize the action with the value to store.
396         
397         @type value: C{string}
398         @param value: the value to store in the nodes
399         """
400         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
401         self.value = value
402         
403     def getNodesToProcess(self):
404         """Nodes are never added, always return the same sorted list."""
405         return self.sorted_nodes
406
407     def generateArgs(self, node):
408         """Args include the value to store and the node's token."""
409         if node.token:
410             return (self.caller.node.id, self.target, self.value, node.token), 1
411         else:
412             raise ValueError, "Don't store at this node since we don't know it's token"
413
414     def processResponse(self, dict):
415         """Save the response, though it should be nothin but the ID."""
416         self.results[dict["id"]] = dict
417     
418     def generateResult(self):
419         """Return all the response IDs received."""
420         self.stats.completedAction(self.action, self.started)
421         return (self.target, self.value, self.results.values())