1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from twisted.internet import reactor
5 from twisted.python import log
7 from khash import intify
8 from util import uncompact
11 """ base class for some long running asynchronous proccesses like finding nodes or values """
12 def __init__(self, caller, target, callback, config):
16 self.num = intify(target)
20 self.callback = callback
24 def sort(a, b, num=self.num):
25 """ this function is for sorting nodes relative to the ID we are looking for """
26 x, y = num ^ a.num, num ^ b.num
34 def actionFailed(self, err, node):
35 log.msg("action %s failed (%s) %s/%s" % (self.__class__.__name__, self.config['PORT'], node.host, node.port))
37 self.caller.table.nodeFailed(node)
38 self.outstanding = self.outstanding - 1
41 def goWithNodes(self, t):
46 FIND_NODE_TIMEOUT = 15
48 class FindNode(ActionBase):
49 """ find node action merits it's own class as it is a long running stateful process """
50 def handleGotNodes(self, dict):
51 _krpc_sender = dict['_krpc_sender']
53 n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
54 self.caller.insertNode(n)
55 if dict["id"] in self.found:
56 self.found[dict["id"]].updateToken(dict.get('token', ''))
58 if self.finished or self.answered.has_key(dict["id"]):
59 # a day late and a dollar short
61 self.outstanding = self.outstanding - 1
62 self.answered[dict["id"]] = 1
63 for compact_node in l:
64 node = uncompact(compact_node)
65 n = self.caller.Node(node)
66 if not self.found.has_key(n.id):
72 send messages to new peers, if necessary
76 l = self.found.values()
78 for node in l[:self.config['K']]:
79 if node.id == self.target:
81 return self.callback([node])
82 if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
83 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
84 df = node.findNode(self.target, self.caller.node.id)
85 df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
86 self.outstanding = self.outstanding + 1
87 self.queried[node.id] = 1
88 if self.outstanding >= self.config['CONCURRENT_REQS']:
90 assert self.outstanding >=0
91 if self.outstanding == 0:
94 reactor.callLater(0, self.callback, l[:self.config['K']])
96 def goWithNodes(self, nodes):
98 this starts the process, our argument is a transaction with t.extras being our list of nodes
99 it's a transaction since we got called from the dispatcher
102 if node.id == self.caller.node.id:
105 self.found[node.id] = node
110 get_value_timeout = 15
111 class GetValue(FindNode):
112 def __init__(self, caller, target, callback, config, find="findValue"):
113 FindNode.__init__(self, caller, target, callback, config)
114 self.findValue = find
116 """ get value task """
117 def handleGotNodes(self, dict):
118 _krpc_sender = dict['_krpc_sender']
120 n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
121 self.caller.insertNode(n)
122 if self.finished or self.answered.has_key(dict["id"]):
123 # a day late and a dollar short
125 self.outstanding = self.outstanding - 1
126 self.answered[dict["id"]] = 1
128 # if we have any closer than what we already got, query them
129 if dict.has_key('nodes'):
130 for compact_node in dict['nodes']:
131 node = uncompact(compact_node)
132 n = self.caller.Node(node)
133 if not self.found.has_key(n.id):
135 elif dict.has_key('values'):
136 def x(y, z=self.results):
142 z = len(dict['values'])
143 v = filter(None, map(x, dict['values']))
145 reactor.callLater(0, self.callback, self.target, v)
152 l = self.found.values()
155 for node in l[:self.config['K']]:
156 if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
157 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
159 f = getattr(node, self.findValue)
160 except AttributeError:
161 log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue))
163 df = f(self.target, self.caller.node.id)
164 df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
165 self.outstanding = self.outstanding + 1
166 self.queried[node.id] = 1
167 if self.outstanding >= self.config['CONCURRENT_REQS']:
169 assert self.outstanding >=0
170 if self.outstanding == 0:
171 ## all done, didn't find it!!
173 reactor.callLater(0, self.callback, self.target, [])
176 def goWithNodes(self, nodes, found=None):
182 if node.id == self.caller.node.id:
185 self.found[node.id] = node
190 class StoreValue(ActionBase):
191 def __init__(self, caller, target, value, callback, config, store="storeValue"):
192 ActionBase.__init__(self, caller, target, callback, config)
197 def storedValue(self, t, node):
198 self.outstanding -= 1
199 self.caller.insertNode(node)
202 self.stored.append(t)
203 if len(self.stored) >= self.config['STORE_REDUNDANCY']:
205 self.callback(self.target, self.value, self.stored)
207 if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
214 num = self.config['CONCURRENT_REQS'] - self.outstanding
215 if num > self.config['STORE_REDUNDANCY']:
216 num = self.config['STORE_REDUNDANCY']
219 node = self.nodes.pop()
221 if self.outstanding == 0:
223 self.callback(self.target, self.value, self.stored)
225 if not node.id == self.caller.node.id:
226 self.outstanding += 1
228 f = getattr(node, self.store)
229 except AttributeError:
230 log.msg("%s doesn't have a %s method!" % (node, self.store))
232 df = f(self.target, self.value, node.token, self.caller.node.id)
233 df.addCallbacks(self.storedValue, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
235 def goWithNodes(self, nodes):
237 self.nodes.sort(self.sort)
242 def __init__(self, store, config):
245 self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
248 self.store.expireValues(self.config['KE_AGE'])
249 self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
253 self.next_expire.cancel()