Fix an error in the actions that allowed for the result to be sent twice.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """Details of how to perform actions on remote peers."""
5
6 from datetime import datetime
7
8 from twisted.internet import reactor, defer
9 from twisted.python import log
10
11 from khash import intify
12 from ktable import K
13 from util import uncompact
14
15 class ActionBase:
16     """Base class for some long running asynchronous proccesses like finding nodes or values.
17     
18     @type caller: L{khashmir.Khashmir}
19     @ivar caller: the DHT instance that is performing the action
20     @type target: C{string}
21     @ivar target: the target of the action, usually a DHT key
22     @type config: C{dictionary}
23     @ivar config: the configuration variables for the DHT
24     @type action: C{string}
25     @ivar action: the name of the action to call on remote nodes
26     @type stats: L{stats.StatsLogger}
27     @ivar stats: the statistics modules to report to
28     @type num: C{long}
29     @ivar num: the target key in integer form
30     @type queried: C{dictionary}
31     @ivar queried: the nodes that have been queried for this action,
32         keys are node IDs, values are the node itself
33     @type answered: C{dictionary}
34     @ivar answered: the nodes that have answered the queries
35     @type found: C{dictionary}
36     @ivar found: nodes that have been found so far by the action
37     @type sorted_nodes: C{list} of L{node.Node}
38     @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
39     @type results: C{dictionary}
40     @ivar results: keys are the results found so far by the action
41     @type desired_results: C{int}
42     @ivar desired_results: the minimum number of results that are needed
43         before the action should stop
44     @type callback: C{method}
45     @ivar callback: the method to call with the results
46     @type outstanding: C{int}
47     @ivar outstanding: the number of requests currently outstanding
48     @type outstanding_results: C{int}
49     @ivar outstanding_results: the number of results that are expected from
50         the requests that are currently outstanding
51     @type finished: C{boolean}
52     @ivar finished: whether the action is done
53     @type started: C{datetime.datetime}
54     @ivar started: the time the action was started at
55     @type sort: C{method}
56     @ivar sort: used to sort nodes by their proximity to the target
57     """
58     
59     def __init__(self, caller, target, callback, config, stats, action, num_results = None):
60         """Initialize the action.
61         
62         @type caller: L{khashmir.Khashmir}
63         @param caller: the DHT instance that is performing the action
64         @type target: C{string}
65         @param target: the target of the action, usually a DHT key
66         @type callback: C{method}
67         @param callback: the method to call with the results
68         @type config: C{dictionary}
69         @param config: the configuration variables for the DHT
70         @type stats: L{stats.StatsLogger}
71         @param stats: the statistics gatherer
72         @type action: C{string}
73         @param action: the name of the action to call on remote nodes
74         @type num_results: C{int}
75         @param num_results: the minimum number of results that are needed before
76             the action should stop (optional, defaults to getting all the results)
77         
78         """
79         
80         self.caller = caller
81         self.target = target
82         self.config = config
83         self.action = action
84         self.stats = stats
85         self.stats.startedAction(action)
86         self.num = intify(target)
87         self.queried = {}
88         self.answered = {}
89         self.found = {}
90         self.sorted_nodes = []
91         self.results = {}
92         self.desired_results = num_results
93         self.callback = callback
94         self.outstanding = 0
95         self.outstanding_results = 0
96         self.finished = False
97         self.started = datetime.now()
98     
99         def sort(a, b, num=self.num):
100             """Sort nodes relative to the ID we are looking for."""
101             x, y = num ^ a.num, num ^ b.num
102             if x > y:
103                 return 1
104             elif x < y:
105                 return -1
106             return 0
107         self.sort = sort
108
109     #{ Main operation
110     def goWithNodes(self, nodes):
111         """Start the action's process with a list of nodes to contact."""
112         self.started = datetime.now()
113         for node in nodes:
114             self.found[node.id] = node
115         self.sortNodes()
116         self.schedule()
117     
118     def schedule(self):
119         """Schedule requests to be sent to remote nodes."""
120         if self.finished:
121             return
122         
123         # Check if we are already done
124         if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
125                                      (self.desired_results < 0 and
126                                       len(self.answered) >= self.config['STORE_REDUNDANCY'])):
127             self.finished = True
128             result = self.generateResult()
129             reactor.callLater(0, self.callback, *result)
130             return
131
132         # Check if we have enough outstanding results coming
133         if (self.desired_results and 
134             len(self.results) + self.outstanding_results >= abs(self.desired_results)):
135             return
136         
137         # Loop for each node that should be processed
138         for node in self.getNodesToProcess():
139             # Don't send requests twice or to ourself
140             if node.id not in self.queried:
141                 self.queried[node.id] = 1
142                 
143                 # Get the action to call on the node
144                 if node.id == self.caller.node.id:
145                     try:
146                         f = getattr(self.caller, 'krpc_' + self.action)
147                     except AttributeError:
148                         log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
149                         continue
150                 else:
151                     try:
152                         f = getattr(node, self.action)
153                     except AttributeError:
154                         log.msg("%s doesn't have a %s method!" % (node, self.action))
155                         continue
156
157                 # Get the arguments to the action's method
158                 try:
159                     args, expected_results = self.generateArgs(node)
160                 except ValueError:
161                     continue
162
163                 # Call the action on the remote node
164                 self.outstanding += 1
165                 self.outstanding_results += expected_results
166                 df = defer.maybeDeferred(f, *args)
167                 reactor.callLater(0, df.addCallbacks,
168                                   *(self.gotResponse, self.actionFailed),
169                                   **{'callbackArgs': (node, expected_results, df),
170                                      'errbackArgs': (node, expected_results, df)})
171                         
172             # We might have to stop for now
173             if (self.outstanding >= self.config['CONCURRENT_REQS'] or
174                 (self.desired_results and
175                  len(self.results) + self.outstanding_results >= abs(self.desired_results))):
176                 break
177             
178         assert self.outstanding >= 0
179         assert self.outstanding_results >= 0
180
181         # If no requests are outstanding, then we are done
182         if self.outstanding == 0:
183             self.finished = True
184             result = self.generateResult()
185             reactor.callLater(0, self.callback, *result)
186
187     def gotResponse(self, dict, node, expected_results, df):
188         """Receive a response from a remote node."""
189         self.caller.insertNode(node)
190         if self.finished or self.answered.has_key(node.id):
191             # a day late and a dollar short
192             return
193         self.outstanding -= 1
194         self.outstanding_results -= expected_results
195         self.answered[node.id] = 1
196         self.processResponse(dict)
197         self.schedule()
198
199     def actionFailed(self, err, node, expected_results, df):
200         """Receive an error from a remote node."""
201         log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
202         log.err(err)
203         self.caller.table.nodeFailed(node)
204         self.outstanding -= 1
205         self.outstanding_results -= expected_results
206         self.schedule()
207     
208     def handleGotNodes(self, nodes):
209         """Process any received node contact info in the response.
210         
211         Not called by default, but suitable for being called by
212         L{processResponse} in a recursive node search.
213         """
214         for compact_node in nodes:
215             node_contact = uncompact(compact_node)
216             node = self.caller.Node(node_contact)
217             if not self.found.has_key(node.id):
218                 self.found[node.id] = node
219
220     def sortNodes(self):
221         """Sort the nodes, if necessary.
222         
223         Assumes nodes are never removed from the L{found} dictionary.
224         """
225         if len(self.sorted_nodes) != len(self.found):
226             self.sorted_nodes = self.found.values()
227             self.sorted_nodes.sort(self.sort)
228                 
229     #{ Subclass for specific actions
230     def getNodesToProcess(self):
231         """Generate a list of nodes to process next.
232         
233         This implementation is suitable for a recurring search over all nodes.
234         """
235         self.sortNodes()
236         return self.sorted_nodes[:K]
237     
238     def generateArgs(self, node):
239         """Generate the arguments to the node's action.
240         
241         Also return the number of results expected from this request.
242         
243         @raise ValueError: if the node should not be queried
244         """
245         return (self.caller.node.id, self.target), 0
246     
247     def processResponse(self, dict):
248         """Process the response dictionary received from the remote node."""
249         self.handleGotNodes(dict['nodes'])
250
251     def generateResult(self, nodes):
252         """Create the final result to return to the L{callback} function."""
253         self.stats.completedAction(self.action, self.started)
254         return []
255         
256
257 class FindNode(ActionBase):
258     """Find the closest nodes to the key."""
259
260     def __init__(self, caller, target, callback, config, stats, action="find_node"):
261         ActionBase.__init__(self, caller, target, callback, config, stats, action)
262
263     def processResponse(self, dict):
264         """Save the token received from each node."""
265         if dict["id"] in self.found:
266             self.found[dict["id"]].updateToken(dict.get('token', ''))
267         self.handleGotNodes(dict['nodes'])
268
269     def generateResult(self):
270         """Result is the K closest nodes to the target."""
271         self.sortNodes()
272         self.stats.completedAction(self.action, self.started)
273         return (self.sorted_nodes[:K], )
274     
275
276 class FindValue(ActionBase):
277     """Find the closest nodes to the key and check for values."""
278
279     def __init__(self, caller, target, callback, config, stats, action="find_value"):
280         ActionBase.__init__(self, caller, target, callback, config, stats, action)
281
282     def processResponse(self, dict):
283         """Save the number of values each node has."""
284         if dict["id"] in self.found:
285             self.found[dict["id"]].updateNumValues(dict.get('num', 0))
286         self.handleGotNodes(dict['nodes'])
287         
288     def generateResult(self):
289         """Result is the nodes that have values, sorted by proximity to the key."""
290         self.sortNodes()
291         self.stats.completedAction(self.action, self.started)
292         return ([node for node in self.sorted_nodes if node.num_values > 0], )
293     
294
295 class GetValue(ActionBase):
296     """Retrieve values from a list of nodes."""
297     
298     def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
299         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
300
301     def getNodesToProcess(self):
302         """Nodes are never added, always return the same sorted node list."""
303         return self.sorted_nodes
304     
305     def generateArgs(self, node):
306         """Arguments include the number of values to request."""
307         if node.num_values > 0:
308             # Request all desired results from each node, just to be sure.
309             num_values = abs(self.desired_results) - len(self.results)
310             assert num_values > 0
311             if num_values > node.num_values:
312                 num_values = 0
313             return (self.caller.node.id, self.target, num_values), node.num_values
314         else:
315             raise ValueError, "Don't try and get values from this node because it doesn't have any"
316
317     def processResponse(self, dict):
318         """Save the returned values, calling the L{callback} each time there are new ones."""
319         if dict.has_key('values'):
320             def x(y, z=self.results):
321                 if not z.has_key(y):
322                     z[y] = 1
323                     return y
324                 else:
325                     return None
326             z = len(dict['values'])
327             v = filter(None, map(x, dict['values']))
328             if len(v):
329                 reactor.callLater(0, self.callback, self.target, v)
330
331     def generateResult(self):
332         """Results have all been returned, now send the empty list to end the action."""
333         self.stats.completedAction(self.action, self.started)
334         return (self.target, [])
335         
336
337 class StoreValue(ActionBase):
338     """Store a value in a list of nodes."""
339
340     def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
341         """Initialize the action with the value to store.
342         
343         @type value: C{string}
344         @param value: the value to store in the nodes
345         """
346         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
347         self.value = value
348         
349     def getNodesToProcess(self):
350         """Nodes are never added, always return the same sorted list."""
351         return self.sorted_nodes
352
353     def generateArgs(self, node):
354         """Args include the value to store and the node's token."""
355         if node.token:
356             return (self.caller.node.id, self.target, self.value, node.token), 1
357         else:
358             raise ValueError, "Don't store at this node since we don't know it's token"
359
360     def processResponse(self, dict):
361         """Save the response, though it should be nothin but the ID."""
362         self.results[dict["id"]] = dict
363     
364     def generateResult(self):
365         """Return all the response IDs received."""
366         self.stats.completedAction(self.action, self.started)
367         return (self.target, self.value, self.results.values())