1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from twisted.internet import reactor
5 from twisted.python import log
7 from khash import intify
8 from util import uncompact
11 """ base class for some long running asynchronous proccesses like finding nodes or values """
12 def __init__(self, caller, target, callback, config):
16 self.num = intify(target)
20 self.callback = callback
24 def sort(a, b, num=self.num):
25 """ this function is for sorting nodes relative to the ID we are looking for """
26 x, y = num ^ a.num, num ^ b.num
34 def actionFailed(self, err, node):
35 log.msg("action %s failed (%s) %s/%s" % (self.__class__.__name__, self.config['PORT'], node.host, node.port))
37 self.caller.table.nodeFailed(node)
38 self.outstanding = self.outstanding - 1
41 def goWithNodes(self, t):
45 class FindNode(ActionBase):
46 """ find node action merits it's own class as it is a long running stateful process """
47 def handleGotNodes(self, dict):
48 _krpc_sender = dict['_krpc_sender']
50 n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
51 self.caller.insertNode(n)
52 if dict["id"] in self.found:
53 self.found[dict["id"]].updateToken(dict.get('token', ''))
55 if self.finished or self.answered.has_key(dict["id"]):
56 # a day late and a dollar short
58 self.outstanding = self.outstanding - 1
59 self.answered[dict["id"]] = 1
60 for compact_node in l:
61 node = uncompact(compact_node)
62 n = self.caller.Node(node)
63 if not self.found.has_key(n.id):
69 send messages to new peers, if necessary
73 l = self.found.values()
75 for node in l[:self.config['K']]:
76 if node.id == self.target:
78 return self.callback([node])
79 if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
80 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
81 df = node.findNode(self.target, self.caller.node.id)
82 df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
83 self.outstanding = self.outstanding + 1
84 self.queried[node.id] = 1
85 if self.outstanding >= self.config['CONCURRENT_REQS']:
87 assert self.outstanding >=0
88 if self.outstanding == 0:
91 reactor.callLater(0, self.callback, l[:self.config['K']])
93 def goWithNodes(self, nodes):
95 this starts the process, our argument is a transaction with t.extras being our list of nodes
96 it's a transaction since we got called from the dispatcher
99 if node.id == self.caller.node.id:
102 self.found[node.id] = node
107 class FindValue(ActionBase):
108 def handleGotNodes(self, dict):
109 _krpc_sender = dict['_krpc_sender']
111 n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
112 self.caller.insertNode(n)
113 if dict["id"] in self.found:
114 self.found[dict["id"]].updateNumValues(dict.get('num', 0))
116 if self.finished or self.answered.has_key(dict["id"]):
117 # a day late and a dollar short
119 self.outstanding = self.outstanding - 1
120 self.answered[dict["id"]] = 1
121 for compact_node in l:
122 node = uncompact(compact_node)
123 n = self.caller.Node(node)
124 if not self.found.has_key(n.id):
130 send messages to new peers, if necessary
134 l = self.found.values()
136 for node in l[:self.config['K']]:
137 if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
138 df = node.findValue(self.target, self.caller.node.id)
139 df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
140 self.outstanding = self.outstanding + 1
141 self.queried[node.id] = 1
142 if self.outstanding >= self.config['CONCURRENT_REQS']:
144 assert self.outstanding >=0
145 if self.outstanding == 0:
148 l = [node for node in self.found.values() if node.num_values > 0]
149 reactor.callLater(0, self.callback, l)
151 def goWithNodes(self, nodes):
153 this starts the process, our argument is a transaction with t.extras being our list of nodes
154 it's a transaction since we got called from the dispatcher
157 if node.id == self.caller.node.id:
160 self.found[node.id] = node
165 class GetValue(ActionBase):
166 def __init__(self, caller, target, num, callback, config, action="getValue"):
167 ActionBase.__init__(self, caller, target, callback, config)
168 self.num_values = num
169 self.outstanding_gets = 0
172 def gotValues(self, dict, node):
174 self.outstanding -= 1
175 self.caller.insertNode(node)
178 if dict.has_key('values'):
179 def x(y, z=self.results):
185 z = len(dict['values'])
186 v = filter(None, map(x, dict['values']))
188 reactor.callLater(0, self.callback, self.target, v)
189 if len(self.results) >= self.num_values:
191 reactor.callLater(0, self.callback, self.target, [])
193 if not len(self.results) + self.outstanding_gets >= self.num_values:
200 for node in self.nodes:
201 if node.id not in self.queried and node.id != self.caller.node.id and node.num_values > 0:
203 f = getattr(node, self.action)
204 except AttributeError:
205 log.msg("%s doesn't have a %s method!" % (node, self.action))
207 self.outstanding += 1
208 self.outstanding_gets += node.num_values
209 df = f(self.target, 0, self.caller.node.id)
210 df.addCallbacks(self.gotValues, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
211 self.queried[node.id] = 1
212 if len(self.results) + self.outstanding_gets >= self.num_values or \
213 self.outstanding >= self.config['CONCURRENT_REQS']:
215 assert self.outstanding >=0
216 if self.outstanding == 0:
218 reactor.callLater(0, self.callback, self.target, [])
220 def goWithNodes(self, nodes, found = None):
226 self.nodes.sort(self.sort)
230 class StoreValue(ActionBase):
231 def __init__(self, caller, target, value, callback, config, action="storeValue"):
232 ActionBase.__init__(self, caller, target, callback, config)
237 def storedValue(self, t, node):
238 self.outstanding -= 1
239 self.caller.insertNode(node)
242 self.stored.append(t)
243 if len(self.stored) >= self.config['STORE_REDUNDANCY']:
245 self.callback(self.target, self.value, self.stored)
247 if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
254 num = self.config['CONCURRENT_REQS'] - self.outstanding
255 if num > self.config['STORE_REDUNDANCY']:
256 num = self.config['STORE_REDUNDANCY']
259 node = self.nodes.pop()
261 if self.outstanding == 0:
263 self.callback(self.target, self.value, self.stored)
265 if not node.id == self.caller.node.id:
266 self.outstanding += 1
268 f = getattr(node, self.action)
269 except AttributeError:
270 log.msg("%s doesn't have a %s method!" % (node, self.action))
272 df = f(self.target, self.value, node.token, self.caller.node.id)
273 df.addCallbacks(self.storedValue, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
275 def goWithNodes(self, nodes):
277 self.nodes.sort(self.sort)