1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from twisted.internet import reactor
5 from twisted.python import log
7 from khash import intify
8 from util import uncompact
11 """ base class for some long running asynchronous proccesses like finding nodes or values """
12 def __init__(self, caller, target, callback, config, action, num_results = None):
13 """Initialize the action."""
18 self.num = intify(target)
22 self.sorted_nodes = []
24 self.desired_results = num_results
25 self.callback = callback
27 self.outstanding_results = 0
30 def sort(a, b, num=self.num):
31 """Sort nodes relative to the ID we are looking for."""
32 x, y = num ^ a.num, num ^ b.num
40 def goWithNodes(self, nodes):
41 """Start the action's process with a list of nodes to contact."""
43 if node.id == self.caller.node.id:
46 self.found[node.id] = node
51 """Schedule requests to be sent to remote nodes."""
52 # Check if we are already done
53 if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
54 (self.desired_results < 0 and
55 len(self.answered) >= self.config['STORE_REDUNDANCY'])):
57 result = self.generateResult()
58 reactor.callLater(0, self.callback, *result)
60 if self.finished or (self.desired_results and
61 len(self.results) + self.outstanding_results >= abs(self.desired_results)):
64 for node in self.getNodesToProcess():
65 if node.id not in self.queried and node.id != self.caller.node.id:
66 self.queried[node.id] = 1
68 # Get the action to call on the node
70 f = getattr(node, self.action)
71 except AttributeError:
72 log.msg("%s doesn't have a %s method!" % (node, self.action))
74 # Get the arguments to the action's method
76 args, expected_results = self.generateArgs(node)
80 # Call the action on the remote node
82 self.outstanding_results += expected_results
83 df = f(self.caller.node.id, *args)
84 df.addCallbacks(self.gotResponse, self.actionFailed,
85 callbackArgs = (node, expected_results),
86 errbackArgs = (node, expected_results))
88 # We might have to stop for now
89 if (self.outstanding >= self.config['CONCURRENT_REQS'] or
90 (self.desired_results and
91 len(self.results) + self.outstanding_results >= abs(self.desired_results))):
94 assert self.outstanding >= 0
95 assert self.outstanding_results >= 0
97 # If no requests are outstanding, then we are done
98 if self.outstanding == 0:
100 result = self.generateResult()
101 reactor.callLater(0, self.callback, *result)
103 def gotResponse(self, dict, node, expected_results):
104 """Receive a response from a remote node."""
105 self.caller.insertNode(node)
106 if self.finished or self.answered.has_key(node.id):
107 # a day late and a dollar short
109 self.outstanding -= 1
110 self.outstanding_results -= expected_results
111 self.answered[node.id] = 1
112 self.processResponse(dict['rsp'])
115 def actionFailed(self, err, node, expected_results):
116 """Receive an error from a remote node."""
117 log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
119 self.caller.table.nodeFailed(node)
120 self.outstanding -= 1
121 self.outstanding_results -= expected_results
124 def handleGotNodes(self, nodes):
125 """Process any received node contact info in the response."""
126 for compact_node in nodes:
127 node_contact = uncompact(compact_node)
128 node = self.caller.Node(node_contact)
129 if not self.found.has_key(node.id):
130 self.found[node.id] = node
133 """Sort the nodes, if necessary.
135 Assumes nodes are never removed from the L{found} dictionary.
137 if len(self.sorted_nodes) != len(self.found):
138 self.sorted_nodes = self.found.values()
139 self.sorted_nodes.sort(self.sort)
141 # The methods below are meant to be subclassed by actions
142 def getNodesToProcess(self):
143 """Generate a list of nodes to process next.
145 This implementation is suitable for a recurring search over all nodes.
148 return self.sorted_nodes[:self.config['K']]
150 def generateArgs(self, node):
151 """Generate the arguments to the node's action.
153 These arguments will be appended to our node ID when calling the action.
154 Also return the number of results expected from this request.
156 @raise ValueError: if the node should not be queried
158 return (self.target, ), 0
160 def processResponse(self, dict):
161 """Process the response dictionary received from the remote node."""
162 self.handleGotNodes(dict['nodes'])
164 def generateResult(self, nodes):
165 """Create the result to return to the callback function."""
169 class FindNode(ActionBase):
170 """Find the closest nodes to the key."""
172 def __init__(self, caller, target, callback, config, action="findNode"):
173 ActionBase.__init__(self, caller, target, callback, config, action)
175 def processResponse(self, dict):
176 """Save the token received from each node."""
177 if dict["id"] in self.found:
178 self.found[dict["id"]].updateToken(dict.get('token', ''))
179 self.handleGotNodes(dict['nodes'])
181 def generateResult(self):
182 """Result is the K closest nodes to the target."""
184 return (self.sorted_nodes[:self.config['K']], )
187 class FindValue(ActionBase):
188 """Find the closest nodes to the key and check their values."""
190 def __init__(self, caller, target, callback, config, action="findValue"):
191 ActionBase.__init__(self, caller, target, callback, config, action)
193 def processResponse(self, dict):
194 """Save the number of values each node has."""
195 if dict["id"] in self.found:
196 self.found[dict["id"]].updateNumValues(dict.get('num', 0))
197 self.handleGotNodes(dict['nodes'])
199 def generateResult(self):
200 """Result is the nodes that have values, sorted by proximity to the key."""
202 return ([node for node in self.sorted_nodes if node.num_values > 0], )
205 class GetValue(ActionBase):
206 def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"):
207 ActionBase.__init__(self, caller, target, callback, config, action, num_results)
209 for result in local_results:
210 self.results[result] = 1
212 def getNodesToProcess(self):
213 """Nodes are never added, always return the same thing."""
214 return self.sorted_nodes
216 def generateArgs(self, node):
217 """Args include the number of values to request."""
218 if node.num_values > 0:
219 # Request all desired results from each node, just to be sure.
220 num_values = abs(self.desired_results) - len(self.results)
221 assert num_values > 0
222 if num_values > node.num_values:
224 return (self.target, num_values), node.num_values
226 raise ValueError, "Don't try and get values from this node because it doesn't have any"
228 def processResponse(self, dict):
229 """Save the returned values, calling the callback each time there are new ones."""
230 if dict.has_key('values'):
231 def x(y, z=self.results):
237 z = len(dict['values'])
238 v = filter(None, map(x, dict['values']))
240 reactor.callLater(0, self.callback, self.target, v)
242 def generateResult(self):
243 """Results have all been returned, now send the empty list to end it."""
244 return (self.target, [])
247 class StoreValue(ActionBase):
248 def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"):
249 ActionBase.__init__(self, caller, target, callback, config, action, num_results)
252 def getNodesToProcess(self):
253 """Nodes are never added, always return the same thing."""
254 return self.sorted_nodes
256 def generateArgs(self, node):
257 """Args include the value to request and the node's token."""
259 return (self.target, self.value, node.token), 1
261 raise ValueError, "Don't store at this node since we don't know it's token"
263 def processResponse(self, dict):
264 """Save the response, though it should be nothin but the ID."""
265 self.results[dict["id"]] = dict
267 def generateResult(self):
268 """Return all the response IDs received."""
269 return (self.target, self.value, self.results.values())