1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from twisted.internet import reactor
5 from twisted.python import log
7 from khash import intify
8 from util import uncompact
11 """ base class for some long running asynchronous proccesses like finding nodes or values """
12 def __init__(self, caller, target, callback, config, action, num_results = None):
13 """Initialize the action."""
18 self.num = intify(target)
22 self.sorted_nodes = []
24 self.desired_results = num_results
25 self.callback = callback
27 self.outstanding_results = 0
30 def sort(a, b, num=self.num):
31 """Sort nodes relative to the ID we are looking for."""
32 x, y = num ^ a.num, num ^ b.num
40 def goWithNodes(self, nodes):
41 """Start the action's process with a list of nodes to contact."""
43 if node.id == self.caller.node.id:
46 self.found[node.id] = node
51 """Schedule requests to be sent to remote nodes."""
52 # Check if we are already done
53 if self.desired_results and len(self.results) >= self.desired_results:
55 result = self.generateResult()
56 reactor.callLater(0, self.callback, *result)
58 if self.finished or (self.desired_results and
59 len(self.results) + self.outstanding_results >= self.desired_results):
62 for node in self.getNodesToProcess():
63 if node.id not in self.queried and node.id != self.caller.node.id:
64 self.queried[node.id] = 1
66 # Get the action to call on the node
68 f = getattr(node, self.action)
69 except AttributeError:
70 log.msg("%s doesn't have a %s method!" % (node, self.action))
72 # Get the arguments to the action's method
74 args, expected_results = self.generateArgs(node)
78 # Call the action on the remote node
80 self.outstanding_results += expected_results
81 df = f(self.caller.node.id, *args)
82 df.addCallbacks(self.gotResponse, self.actionFailed,
83 callbackArgs = (node, expected_results),
84 errbackArgs = (node, expected_results))
86 # We might have to stop for now
87 if (self.outstanding >= self.config['CONCURRENT_REQS'] or
88 (self.desired_results and
89 self.outstanding_results >= self.desired_results)):
92 # If no requests are outstanding, then we are done
93 assert self.outstanding >=0
94 if self.outstanding == 0:
96 result = self.generateResult()
97 reactor.callLater(0, self.callback, *result)
99 def gotResponse(self, dict, node, expected_results):
100 """Receive a response from a remote node."""
101 self.caller.insertNode(node)
102 if self.finished or self.answered.has_key(node.id):
103 # a day late and a dollar short
105 self.outstanding -= 1
106 self.outstanding_results -= expected_results
107 self.answered[node.id] = 1
108 self.processResponse(dict['rsp'])
111 def actionFailed(self, err, node, expected_results):
112 """Receive an error from a remote node."""
113 log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
115 self.caller.table.nodeFailed(node)
116 self.answered[node.id] = 1
117 self.outstanding -= 1
118 self.outstanding_results -= expected_results
121 def handleGotNodes(self, nodes):
122 """Process any received node contact info in the response."""
123 for compact_node in nodes:
124 node_contact = uncompact(compact_node)
125 node = self.caller.Node(node_contact)
126 if not self.found.has_key(node.id):
127 self.found[node.id] = node
130 """Sort the nodes, if necessary.
132 Assumes nodes are never removed from the L{found} dictionary.
134 if len(self.sorted_nodes) != len(self.found):
135 self.sorted_nodes = self.found.values()
136 self.sorted_nodes.sort(self.sort)
138 # The methods below are meant to be subclassed by actions
139 def getNodesToProcess(self):
140 """Generate a list of nodes to process next.
142 This implementation is suitable for a recurring search over all nodes.
145 return self.sorted_nodes[:self.config['K']]
147 def generateArgs(self, node):
148 """Generate the arguments to the node's action.
150 These arguments will be appended to our node ID when calling the action.
151 Also return the number of results expected from this request.
153 @raise ValueError: if the node should not be queried
155 return (self.target, ), 0
157 def processResponse(self, dict):
158 """Process the response dictionary received from the remote node."""
161 def generateResult(self, nodes):
162 """Create the result to return to the callback function."""
166 class FindNode(ActionBase):
167 """Find the closest nodes to the key."""
169 def __init__(self, caller, target, callback, config, action="findNode"):
170 ActionBase.__init__(self, caller, target, callback, config, action)
172 def processResponse(self, dict):
173 """Save the token received from each node."""
174 if dict["id"] in self.found:
175 self.found[dict["id"]].updateToken(dict.get('token', ''))
176 self.handleGotNodes(dict['nodes'])
178 def generateResult(self):
179 """Result is the K closest nodes to the target."""
181 return (self.sorted_nodes[:self.config['K']], )
184 class FindValue(ActionBase):
185 """Find the closest nodes to the key and check their values."""
187 def __init__(self, caller, target, callback, config, action="findValue"):
188 ActionBase.__init__(self, caller, target, callback, config, action)
190 def processResponse(self, dict):
191 """Save the number of values each node has."""
192 if dict["id"] in self.found:
193 self.found[dict["id"]].updateNumValues(dict.get('num', 0))
194 self.handleGotNodes(dict['nodes'])
196 def generateResult(self):
197 """Result is the nodes that have values, sorted by proximity to the key."""
199 return ([node for node in self.sorted_nodes if node.num_values > 0], )
202 class GetValue(ActionBase):
203 def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"):
204 ActionBase.__init__(self, caller, target, callback, config, action, num_results)
206 for result in local_results:
207 self.results[result] = 1
209 def getNodesToProcess(self):
210 """Nodes are never added, always return the same thing."""
211 return self.sorted_nodes
213 def generateArgs(self, node):
214 """Args include the number of values to request."""
215 if node.num_values > 0:
216 return (self.target, 0), node.num_values
218 raise ValueError, "Don't try and get values from this node because it doesn't have any"
220 def processResponse(self, dict):
221 """Save the returned values, calling the callback each time there are new ones."""
222 if dict.has_key('values'):
223 def x(y, z=self.results):
229 z = len(dict['values'])
230 v = filter(None, map(x, dict['values']))
232 reactor.callLater(0, self.callback, self.target, v)
234 def generateResult(self):
235 """Results have all been returned, now send the empty list to end it."""
236 return (self.target, [])
239 class StoreValue(ActionBase):
240 def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"):
241 ActionBase.__init__(self, caller, target, callback, config, action, num_results)
244 def getNodesToProcess(self):
245 """Nodes are never added, always return the same thing."""
246 return self.sorted_nodes
248 def generateArgs(self, node):
249 """Args include the value to request and the node's token."""
251 return (self.target, self.value, node.token), 1
253 raise ValueError, "Don't store at this node since we don't know it's token"
255 def processResponse(self, dict):
256 """Save the response, though it should be nothin but the ID."""
257 self.results[dict["id"]] = dict
259 def generateResult(self):
260 """Return all the response IDs received."""
261 return (self.target, self.value, self.results.values())