]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_p2p_Khashmir/actions.py
b54632065002dbd2a0dc6c4439451ffa3c2d5ab0
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """Details of how to perform actions on remote peers."""
5
6 from twisted.internet import reactor, defer
7 from twisted.python import log
8
9 from khash import intify
10 from util import uncompact
11
12 class ActionBase:
13     """Base class for some long running asynchronous proccesses like finding nodes or values.
14     
15     @type caller: L{khashmir.Khashmir}
16     @ivar caller: the DHT instance that is performing the action
17     @type target: C{string}
18     @ivar target: the target of the action, usually a DHT key
19     @type config: C{dictionary}
20     @ivar config: the configuration variables for the DHT
21     @type action: C{string}
22     @ivar action: the name of the action to call on remote nodes
23     @type num: C{long}
24     @ivar num: the target key in integer form
25     @type queried: C{dictionary}
26     @ivar queried: the nodes that have been queried for this action,
27         keys are node IDs, values are the node itself
28     @type answered: C{dictionary}
29     @ivar answered: the nodes that have answered the queries
30     @type found: C{dictionary}
31     @ivar found: nodes that have been found so far by the action
32     @type sorted_nodes: C{list} of L{node.Node}
33     @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
34     @type results: C{dictionary}
35     @ivar results: keys are the results found so far by the action
36     @type desired_results: C{int}
37     @ivar desired_results: the minimum number of results that are needed
38         before the action should stop
39     @type callback: C{method}
40     @ivar callback: the method to call with the results
41     @type outstanding: C{int}
42     @ivar outstanding: the number of requests currently outstanding
43     @type outstanding_results: C{int}
44     @ivar outstanding_results: the number of results that are expected from
45         the requests that are currently outstanding
46     @type finished: C{boolean}
47     @ivar finished: whether the action is done
48     @type sort: C{method}
49     @ivar sort: used to sort nodes by their proximity to the target
50     """
51     
52     def __init__(self, caller, target, callback, config, stats, action, num_results = None):
53         """Initialize the action.
54         
55         @type caller: L{khashmir.Khashmir}
56         @param caller: the DHT instance that is performing the action
57         @type target: C{string}
58         @param target: the target of the action, usually a DHT key
59         @type callback: C{method}
60         @param callback: the method to call with the results
61         @type config: C{dictionary}
62         @param config: the configuration variables for the DHT
63         @type stats: L{stats.StatsLogger}
64         @param stats: the statistics gatherer
65         @type action: C{string}
66         @param action: the name of the action to call on remote nodes
67         @type num_results: C{int}
68         @param num_results: the minimum number of results that are needed before
69             the action should stop (optional, defaults to getting all the results)
70         
71         """
72         
73         self.caller = caller
74         self.target = target
75         self.config = config
76         self.action = action
77         stats.startedAction(action)
78         self.num = intify(target)
79         self.queried = {}
80         self.answered = {}
81         self.found = {}
82         self.sorted_nodes = []
83         self.results = {}
84         self.desired_results = num_results
85         self.callback = callback
86         self.outstanding = 0
87         self.outstanding_results = 0
88         self.finished = False
89     
90         def sort(a, b, num=self.num):
91             """Sort nodes relative to the ID we are looking for."""
92             x, y = num ^ a.num, num ^ b.num
93             if x > y:
94                 return 1
95             elif x < y:
96                 return -1
97             return 0
98         self.sort = sort
99
100     #{ Main operation
101     def goWithNodes(self, nodes):
102         """Start the action's process with a list of nodes to contact."""
103         for node in nodes:
104             self.found[node.id] = node
105         self.sortNodes()
106         self.schedule()
107     
108     def schedule(self):
109         """Schedule requests to be sent to remote nodes."""
110         # Check if we are already done
111         if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
112                                      (self.desired_results < 0 and
113                                       len(self.answered) >= self.config['STORE_REDUNDANCY'])):
114             self.finished = True
115             result = self.generateResult()
116             reactor.callLater(0, self.callback, *result)
117
118         if self.finished or (self.desired_results and 
119                              len(self.results) + self.outstanding_results >= abs(self.desired_results)):
120             return
121         
122         # Loop for each node that should be processed
123         for node in self.getNodesToProcess():
124             # Don't send requests twice or to ourself
125             if node.id not in self.queried:
126                 self.queried[node.id] = 1
127                 
128                 # Get the action to call on the node
129                 if node.id == self.caller.node.id:
130                     try:
131                         f = getattr(self.caller, 'krpc_' + self.action)
132                     except AttributeError:
133                         log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
134                         continue
135                 else:
136                     try:
137                         f = getattr(node, self.action)
138                     except AttributeError:
139                         log.msg("%s doesn't have a %s method!" % (node, self.action))
140                         continue
141
142                 # Get the arguments to the action's method
143                 try:
144                     args, expected_results = self.generateArgs(node)
145                 except ValueError:
146                     continue
147
148                 # Call the action on the remote node
149                 self.outstanding += 1
150                 self.outstanding_results += expected_results
151                 df = defer.maybeDeferred(f, *args)
152                 reactor.callLater(0, df.addCallbacks,
153                                   *(self.gotResponse, self.actionFailed),
154                                   **{'callbackArgs': (node, expected_results, df),
155                                      'errbackArgs': (node, expected_results, df)})
156                         
157             # We might have to stop for now
158             if (self.outstanding >= self.config['CONCURRENT_REQS'] or
159                 (self.desired_results and
160                  len(self.results) + self.outstanding_results >= abs(self.desired_results))):
161                 break
162             
163         assert self.outstanding >= 0
164         assert self.outstanding_results >= 0
165
166         # If no requests are outstanding, then we are done
167         if self.outstanding == 0:
168             self.finished = True
169             result = self.generateResult()
170             reactor.callLater(0, self.callback, *result)
171
172     def gotResponse(self, dict, node, expected_results, df):
173         """Receive a response from a remote node."""
174         self.caller.insertNode(node)
175         if self.finished or self.answered.has_key(node.id):
176             # a day late and a dollar short
177             return
178         self.outstanding -= 1
179         self.outstanding_results -= expected_results
180         self.answered[node.id] = 1
181         self.processResponse(dict)
182         self.schedule()
183
184     def actionFailed(self, err, node, expected_results, df):
185         """Receive an error from a remote node."""
186         log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
187         log.err(err)
188         self.caller.table.nodeFailed(node)
189         self.outstanding -= 1
190         self.outstanding_results -= expected_results
191         self.schedule()
192     
193     def handleGotNodes(self, nodes):
194         """Process any received node contact info in the response.
195         
196         Not called by default, but suitable for being called by
197         L{processResponse} in a recursive node search.
198         """
199         for compact_node in nodes:
200             node_contact = uncompact(compact_node)
201             node = self.caller.Node(node_contact)
202             if not self.found.has_key(node.id):
203                 self.found[node.id] = node
204
205     def sortNodes(self):
206         """Sort the nodes, if necessary.
207         
208         Assumes nodes are never removed from the L{found} dictionary.
209         """
210         if len(self.sorted_nodes) != len(self.found):
211             self.sorted_nodes = self.found.values()
212             self.sorted_nodes.sort(self.sort)
213                 
214     #{ Subclass for specific actions
215     def getNodesToProcess(self):
216         """Generate a list of nodes to process next.
217         
218         This implementation is suitable for a recurring search over all nodes.
219         """
220         self.sortNodes()
221         return self.sorted_nodes[:self.config['K']]
222     
223     def generateArgs(self, node):
224         """Generate the arguments to the node's action.
225         
226         Also return the number of results expected from this request.
227         
228         @raise ValueError: if the node should not be queried
229         """
230         return (self.caller.node.id, self.target), 0
231     
232     def processResponse(self, dict):
233         """Process the response dictionary received from the remote node."""
234         self.handleGotNodes(dict['nodes'])
235
236     def generateResult(self, nodes):
237         """Create the final result to return to the L{callback} function."""
238         return []
239         
240
241 class FindNode(ActionBase):
242     """Find the closest nodes to the key."""
243
244     def __init__(self, caller, target, callback, config, stats, action="find_node"):
245         ActionBase.__init__(self, caller, target, callback, config, stats, action)
246
247     def processResponse(self, dict):
248         """Save the token received from each node."""
249         if dict["id"] in self.found:
250             self.found[dict["id"]].updateToken(dict.get('token', ''))
251         self.handleGotNodes(dict['nodes'])
252
253     def generateResult(self):
254         """Result is the K closest nodes to the target."""
255         self.sortNodes()
256         return (self.sorted_nodes[:self.config['K']], )
257     
258
259 class FindValue(ActionBase):
260     """Find the closest nodes to the key and check for values."""
261
262     def __init__(self, caller, target, callback, config, stats, action="find_value"):
263         ActionBase.__init__(self, caller, target, callback, config, stats, action)
264
265     def processResponse(self, dict):
266         """Save the number of values each node has."""
267         if dict["id"] in self.found:
268             self.found[dict["id"]].updateNumValues(dict.get('num', 0))
269         self.handleGotNodes(dict['nodes'])
270         
271     def generateResult(self):
272         """Result is the nodes that have values, sorted by proximity to the key."""
273         self.sortNodes()
274         return ([node for node in self.sorted_nodes if node.num_values > 0], )
275     
276
277 class GetValue(ActionBase):
278     """Retrieve values from a list of nodes."""
279     
280     def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
281         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
282
283     def getNodesToProcess(self):
284         """Nodes are never added, always return the same sorted node list."""
285         return self.sorted_nodes
286     
287     def generateArgs(self, node):
288         """Arguments include the number of values to request."""
289         if node.num_values > 0:
290             # Request all desired results from each node, just to be sure.
291             num_values = abs(self.desired_results) - len(self.results)
292             assert num_values > 0
293             if num_values > node.num_values:
294                 num_values = 0
295             return (self.caller.node.id, self.target, num_values), node.num_values
296         else:
297             raise ValueError, "Don't try and get values from this node because it doesn't have any"
298
299     def processResponse(self, dict):
300         """Save the returned values, calling the L{callback} each time there are new ones."""
301         if dict.has_key('values'):
302             def x(y, z=self.results):
303                 if not z.has_key(y):
304                     z[y] = 1
305                     return y
306                 else:
307                     return None
308             z = len(dict['values'])
309             v = filter(None, map(x, dict['values']))
310             if len(v):
311                 reactor.callLater(0, self.callback, self.target, v)
312
313     def generateResult(self):
314         """Results have all been returned, now send the empty list to end the action."""
315         return (self.target, [])
316         
317
318 class StoreValue(ActionBase):
319     """Store a value in a list of nodes."""
320
321     def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
322         """Initialize the action with the value to store.
323         
324         @type value: C{string}
325         @param value: the value to store in the nodes
326         """
327         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
328         self.value = value
329         
330     def getNodesToProcess(self):
331         """Nodes are never added, always return the same sorted list."""
332         return self.sorted_nodes
333
334     def generateArgs(self, node):
335         """Args include the value to store and the node's token."""
336         if node.token:
337             return (self.caller.node.id, self.target, self.value, node.token), 1
338         else:
339             raise ValueError, "Don't store at this node since we don't know it's token"
340
341     def processResponse(self, dict):
342         """Save the response, though it should be nothin but the ID."""
343         self.results[dict["id"]] = dict
344     
345     def generateResult(self):
346         """Return all the response IDs received."""
347         return (self.target, self.value, self.results.values())