c80591a4dcfbb87e1889734955be6c7ba33f7351
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from twisted.internet import reactor
5 from twisted.python import log
6
7 from khash import intify
8 from util import uncompact
9
10 class ActionBase:
11     """ base class for some long running asynchronous proccesses like finding nodes or values """
12     def __init__(self, caller, target, callback, config, action, num_results = None):
13         """Initialize the action."""
14         self.caller = caller
15         self.target = target
16         self.config = config
17         self.action = action
18         self.num = intify(target)
19         self.queried = {}
20         self.answered = {}
21         self.found = {}
22         self.sorted_nodes = []
23         self.results = {}
24         self.desired_results = num_results
25         self.callback = callback
26         self.outstanding = 0
27         self.outstanding_results = 0
28         self.finished = 0
29     
30         def sort(a, b, num=self.num):
31             """Sort nodes relative to the ID we are looking for."""
32             x, y = num ^ a.num, num ^ b.num
33             if x > y:
34                 return 1
35             elif x < y:
36                 return -1
37             return 0
38         self.sort = sort
39         
40     def goWithNodes(self, nodes):
41         """Start the action's process with a list of nodes to contact."""
42         for node in nodes:
43             if node.id == self.caller.node.id:
44                 continue
45             else:
46                 self.found[node.id] = node
47         self.sortNodes()
48         self.schedule()
49     
50     def schedule(self):
51         """Schedule requests to be sent to remote nodes."""
52         # Check if we are already done
53         if self.desired_results and len(self.results) >= self.desired_results:
54             self.finished=1
55             result = self.generateResult()
56             reactor.callLater(0, self.callback, *result)
57
58         if self.finished or (self.desired_results and 
59                              len(self.results) + self.outstanding_results >= self.desired_results):
60             return
61         
62         for node in self.getNodesToProcess():
63             if node.id not in self.queried and node.id != self.caller.node.id:
64                 self.queried[node.id] = 1
65                 
66                 # Get the action to call on the node
67                 try:
68                     f = getattr(node, self.action)
69                 except AttributeError:
70                     log.msg("%s doesn't have a %s method!" % (node, self.action))
71                 else:
72                     # Get the arguments to the action's method
73                     try:
74                         args, expected_results = self.generateArgs(node)
75                     except ValueError:
76                         pass
77                     else:
78                         # Call the action on the remote node
79                         self.outstanding += 1
80                         self.outstanding_results += expected_results
81                         df = f(self.caller.node.id, *args)
82                         df.addCallbacks(self.gotResponse, self.actionFailed,
83                                         callbackArgs = (node, expected_results),
84                                         errbackArgs = (node, expected_results))
85                         
86             # We might have to stop for now
87             if (self.outstanding >= self.config['CONCURRENT_REQS'] or
88                 (self.desired_results and
89                  self.outstanding_results >= self.desired_results)):
90                 break
91             
92         # If no requests are outstanding, then we are done
93         assert self.outstanding >=0
94         if self.outstanding == 0:
95             self.finished = 1
96             result = self.generateResult()
97             reactor.callLater(0, self.callback, *result)
98
99     def gotResponse(self, dict, node, expected_results):
100         """Receive a response from a remote node."""
101         self.caller.insertNode(node)
102         if self.finished or self.answered.has_key(node.id):
103             # a day late and a dollar short
104             return
105         self.outstanding -= 1
106         self.outstanding_results -= expected_results
107         self.answered[node.id] = 1
108         self.processResponse(dict['rsp'])
109         self.schedule()
110
111     def actionFailed(self, err, node, expected_results):
112         """Receive an error from a remote node."""
113         log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
114         log.err(err)
115         self.caller.table.nodeFailed(node)
116         self.answered[node.id] = 1
117         self.outstanding -= 1
118         self.outstanding_results -= expected_results
119         self.schedule()
120     
121     def handleGotNodes(self, nodes):
122         """Process any received node contact info in the response."""
123         for compact_node in nodes:
124             node_contact = uncompact(compact_node)
125             node = self.caller.Node(node_contact)
126             if not self.found.has_key(node.id):
127                 self.found[node.id] = node
128
129     def sortNodes(self):
130         """Sort the nodes, if necessary.
131         
132         Assumes nodes are never removed from the L{found} dictionary.
133         """
134         if len(self.sorted_nodes) != len(self.found):
135             self.sorted_nodes = self.found.values()
136             self.sorted_nodes.sort(self.sort)
137                 
138     # The methods below are meant to be subclassed by actions
139     def getNodesToProcess(self):
140         """Generate a list of nodes to process next.
141         
142         This implementation is suitable for a recurring search over all nodes.
143         """
144         self.sortNodes()
145         return self.sorted_nodes[:self.config['K']]
146     
147     def generateArgs(self, node):
148         """Generate the arguments to the node's action.
149         
150         These arguments will be appended to our node ID when calling the action.
151         Also return the number of results expected from this request.
152         
153         @raise ValueError: if the node should not be queried
154         """
155         return (self.target, ), 0
156     
157     def processResponse(self, dict):
158         """Process the response dictionary received from the remote node."""
159         pass
160
161     def generateResult(self, nodes):
162         """Create the result to return to the callback function."""
163         return []
164         
165
166 class FindNode(ActionBase):
167     """Find the closest nodes to the key."""
168
169     def __init__(self, caller, target, callback, config, action="findNode"):
170         ActionBase.__init__(self, caller, target, callback, config, action)
171
172     def processResponse(self, dict):
173         """Save the token received from each node."""
174         if dict["id"] in self.found:
175             self.found[dict["id"]].updateToken(dict.get('token', ''))
176         self.handleGotNodes(dict['nodes'])
177
178     def generateResult(self):
179         """Result is the K closest nodes to the target."""
180         self.sortNodes()
181         return (self.sorted_nodes[:self.config['K']], )
182     
183
184 class FindValue(ActionBase):
185     """Find the closest nodes to the key and check their values."""
186
187     def __init__(self, caller, target, callback, config, action="findValue"):
188         ActionBase.__init__(self, caller, target, callback, config, action)
189
190     def processResponse(self, dict):
191         """Save the number of values each node has."""
192         if dict["id"] in self.found:
193             self.found[dict["id"]].updateNumValues(dict.get('num', 0))
194         self.handleGotNodes(dict['nodes'])
195         
196     def generateResult(self):
197         """Result is the nodes that have values, sorted by proximity to the key."""
198         self.sortNodes()
199         return ([node for node in self.sorted_nodes if node.num_values > 0], )
200     
201
202 class GetValue(ActionBase):
203     def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"):
204         ActionBase.__init__(self, caller, target, callback, config, action, num_results)
205         if local_results:
206             for result in local_results:
207                 self.results[result] = 1
208
209     def getNodesToProcess(self):
210         """Nodes are never added, always return the same thing."""
211         return self.sorted_nodes
212     
213     def generateArgs(self, node):
214         """Args include the number of values to request."""
215         if node.num_values > 0:
216             return (self.target, 0), node.num_values
217         else:
218             raise ValueError, "Don't try and get values from this node because it doesn't have any"
219
220     def processResponse(self, dict):
221         """Save the returned values, calling the callback each time there are new ones."""
222         if dict.has_key('values'):
223             def x(y, z=self.results):
224                 if not z.has_key(y):
225                     z[y] = 1
226                     return y
227                 else:
228                     return None
229             z = len(dict['values'])
230             v = filter(None, map(x, dict['values']))
231             if len(v):
232                 reactor.callLater(0, self.callback, self.target, v)
233
234     def generateResult(self):
235         """Results have all been returned, now send the empty list to end it."""
236         return (self.target, [])
237         
238
239 class StoreValue(ActionBase):
240     def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"):
241         ActionBase.__init__(self, caller, target, callback, config, action, num_results)
242         self.value = value
243         
244     def getNodesToProcess(self):
245         """Nodes are never added, always return the same thing."""
246         return self.sorted_nodes
247
248     def generateArgs(self, node):
249         """Args include the value to request and the node's token."""
250         if node.token:
251             return (self.target, self.value, node.token), 1
252         else:
253             raise ValueError, "Don't store at this node since we don't know it's token"
254
255     def processResponse(self, dict):
256         """Save the response, though it should be nothin but the ID."""
257         self.results[dict["id"]] = dict
258     
259     def generateResult(self):
260         """Return all the response IDs received."""
261         return (self.target, self.value, self.results.values())