Standardize the number of values retrieved from the DHT.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from twisted.internet import reactor
5 from twisted.python import log
6
7 from khash import intify
8 from util import uncompact
9
10 class ActionBase:
11     """ base class for some long running asynchronous proccesses like finding nodes or values """
12     def __init__(self, caller, target, callback, config, action, num_results = None):
13         """Initialize the action."""
14         self.caller = caller
15         self.target = target
16         self.config = config
17         self.action = action
18         self.num = intify(target)
19         self.queried = {}
20         self.answered = {}
21         self.found = {}
22         self.sorted_nodes = []
23         self.results = {}
24         self.desired_results = num_results
25         self.callback = callback
26         self.outstanding = 0
27         self.outstanding_results = 0
28         self.finished = 0
29     
30         def sort(a, b, num=self.num):
31             """Sort nodes relative to the ID we are looking for."""
32             x, y = num ^ a.num, num ^ b.num
33             if x > y:
34                 return 1
35             elif x < y:
36                 return -1
37             return 0
38         self.sort = sort
39         
40     def goWithNodes(self, nodes):
41         """Start the action's process with a list of nodes to contact."""
42         for node in nodes:
43             if node.id == self.caller.node.id:
44                 continue
45             else:
46                 self.found[node.id] = node
47         self.sortNodes()
48         self.schedule()
49     
50     def schedule(self):
51         """Schedule requests to be sent to remote nodes."""
52         # Check if we are already done
53         if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
54                                      (self.desired_results < 0 and
55                                       len(self.answered) >= self.config['STORE_REDUNDANCY'])):
56             self.finished=1
57             result = self.generateResult()
58             reactor.callLater(0, self.callback, *result)
59
60         if self.finished or (self.desired_results and 
61                              len(self.results) + self.outstanding_results >= abs(self.desired_results)):
62             return
63         
64         for node in self.getNodesToProcess():
65             if node.id not in self.queried and node.id != self.caller.node.id:
66                 self.queried[node.id] = 1
67                 
68                 # Get the action to call on the node
69                 try:
70                     f = getattr(node, self.action)
71                 except AttributeError:
72                     log.msg("%s doesn't have a %s method!" % (node, self.action))
73                 else:
74                     # Get the arguments to the action's method
75                     try:
76                         args, expected_results = self.generateArgs(node)
77                     except ValueError:
78                         pass
79                     else:
80                         # Call the action on the remote node
81                         self.outstanding += 1
82                         self.outstanding_results += expected_results
83                         df = f(self.caller.node.id, *args)
84                         df.addCallbacks(self.gotResponse, self.actionFailed,
85                                         callbackArgs = (node, expected_results),
86                                         errbackArgs = (node, expected_results))
87                         
88             # We might have to stop for now
89             if (self.outstanding >= self.config['CONCURRENT_REQS'] or
90                 (self.desired_results and
91                  len(self.results) + self.outstanding_results >= abs(self.desired_results))):
92                 break
93             
94         assert self.outstanding >= 0
95         assert self.outstanding_results >= 0
96
97         # If no requests are outstanding, then we are done
98         if self.outstanding == 0:
99             self.finished = 1
100             result = self.generateResult()
101             reactor.callLater(0, self.callback, *result)
102
103     def gotResponse(self, dict, node, expected_results):
104         """Receive a response from a remote node."""
105         self.caller.insertNode(node)
106         if self.finished or self.answered.has_key(node.id):
107             # a day late and a dollar short
108             return
109         self.outstanding -= 1
110         self.outstanding_results -= expected_results
111         self.answered[node.id] = 1
112         self.processResponse(dict['rsp'])
113         self.schedule()
114
115     def actionFailed(self, err, node, expected_results):
116         """Receive an error from a remote node."""
117         log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
118         log.err(err)
119         self.caller.table.nodeFailed(node)
120         self.outstanding -= 1
121         self.outstanding_results -= expected_results
122         self.schedule()
123     
124     def handleGotNodes(self, nodes):
125         """Process any received node contact info in the response."""
126         for compact_node in nodes:
127             node_contact = uncompact(compact_node)
128             node = self.caller.Node(node_contact)
129             if not self.found.has_key(node.id):
130                 self.found[node.id] = node
131
132     def sortNodes(self):
133         """Sort the nodes, if necessary.
134         
135         Assumes nodes are never removed from the L{found} dictionary.
136         """
137         if len(self.sorted_nodes) != len(self.found):
138             self.sorted_nodes = self.found.values()
139             self.sorted_nodes.sort(self.sort)
140                 
141     # The methods below are meant to be subclassed by actions
142     def getNodesToProcess(self):
143         """Generate a list of nodes to process next.
144         
145         This implementation is suitable for a recurring search over all nodes.
146         """
147         self.sortNodes()
148         return self.sorted_nodes[:self.config['K']]
149     
150     def generateArgs(self, node):
151         """Generate the arguments to the node's action.
152         
153         These arguments will be appended to our node ID when calling the action.
154         Also return the number of results expected from this request.
155         
156         @raise ValueError: if the node should not be queried
157         """
158         return (self.target, ), 0
159     
160     def processResponse(self, dict):
161         """Process the response dictionary received from the remote node."""
162         self.handleGotNodes(dict['nodes'])
163
164     def generateResult(self, nodes):
165         """Create the result to return to the callback function."""
166         return []
167         
168
169 class FindNode(ActionBase):
170     """Find the closest nodes to the key."""
171
172     def __init__(self, caller, target, callback, config, action="findNode"):
173         ActionBase.__init__(self, caller, target, callback, config, action)
174
175     def processResponse(self, dict):
176         """Save the token received from each node."""
177         if dict["id"] in self.found:
178             self.found[dict["id"]].updateToken(dict.get('token', ''))
179         self.handleGotNodes(dict['nodes'])
180
181     def generateResult(self):
182         """Result is the K closest nodes to the target."""
183         self.sortNodes()
184         return (self.sorted_nodes[:self.config['K']], )
185     
186
187 class FindValue(ActionBase):
188     """Find the closest nodes to the key and check their values."""
189
190     def __init__(self, caller, target, callback, config, action="findValue"):
191         ActionBase.__init__(self, caller, target, callback, config, action)
192
193     def processResponse(self, dict):
194         """Save the number of values each node has."""
195         if dict["id"] in self.found:
196             self.found[dict["id"]].updateNumValues(dict.get('num', 0))
197         self.handleGotNodes(dict['nodes'])
198         
199     def generateResult(self):
200         """Result is the nodes that have values, sorted by proximity to the key."""
201         self.sortNodes()
202         return ([node for node in self.sorted_nodes if node.num_values > 0], )
203     
204
205 class GetValue(ActionBase):
206     def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"):
207         ActionBase.__init__(self, caller, target, callback, config, action, num_results)
208         if local_results:
209             for result in local_results:
210                 self.results[result] = 1
211
212     def getNodesToProcess(self):
213         """Nodes are never added, always return the same thing."""
214         return self.sorted_nodes
215     
216     def generateArgs(self, node):
217         """Args include the number of values to request."""
218         if node.num_values > 0:
219             # Request all desired results from each node, just to be sure.
220             num_values = abs(self.desired_results) - len(self.results)
221             assert num_values > 0
222             if num_values > node.num_values:
223                 num_values = 0
224             return (self.target, num_values), node.num_values
225         else:
226             raise ValueError, "Don't try and get values from this node because it doesn't have any"
227
228     def processResponse(self, dict):
229         """Save the returned values, calling the callback each time there are new ones."""
230         if dict.has_key('values'):
231             def x(y, z=self.results):
232                 if not z.has_key(y):
233                     z[y] = 1
234                     return y
235                 else:
236                     return None
237             z = len(dict['values'])
238             v = filter(None, map(x, dict['values']))
239             if len(v):
240                 reactor.callLater(0, self.callback, self.target, v)
241
242     def generateResult(self):
243         """Results have all been returned, now send the empty list to end it."""
244         return (self.target, [])
245         
246
247 class StoreValue(ActionBase):
248     def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"):
249         ActionBase.__init__(self, caller, target, callback, config, action, num_results)
250         self.value = value
251         
252     def getNodesToProcess(self):
253         """Nodes are never added, always return the same thing."""
254         return self.sorted_nodes
255
256     def generateArgs(self, node):
257         """Args include the value to request and the node's token."""
258         if node.token:
259             return (self.target, self.value, node.token), 1
260         else:
261             raise ValueError, "Don't store at this node since we don't know it's token"
262
263     def processResponse(self, dict):
264         """Save the response, though it should be nothin but the ID."""
265         self.results[dict["id"]] = dict
266     
267     def generateResult(self):
268         """Return all the response IDs received."""
269         return (self.target, self.value, self.results.values())