1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from twisted.internet import reactor
5 from twisted.python import log
7 from khash import intify
8 from util import uncompact
11 """ base class for some long running asynchronous proccesses like finding nodes or values """
12 def __init__(self, caller, target, callback, config):
16 self.num = intify(target)
20 self.callback = callback
24 def sort(a, b, num=self.num):
25 """ this function is for sorting nodes relative to the ID we are looking for """
26 x, y = num ^ a.num, num ^ b.num
34 def goWithNodes(self, t):
39 FIND_NODE_TIMEOUT = 15
41 class FindNode(ActionBase):
42 """ find node action merits it's own class as it is a long running stateful process """
43 def handleGotNodes(self, dict):
44 _krpc_sender = dict['_krpc_sender']
46 n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
47 self.caller.insertNode(n)
48 if dict["id"] in self.found:
49 self.found[dict["id"]].updateToken(dict.get('token', ''))
51 if self.finished or self.answered.has_key(dict["id"]):
52 # a day late and a dollar short
54 self.outstanding = self.outstanding - 1
55 self.answered[dict["id"]] = 1
56 for compact_node in l:
57 node = uncompact(compact_node)
58 n = self.caller.Node(node)
59 if not self.found.has_key(n.id):
65 send messages to new peers, if necessary
69 l = self.found.values()
71 for node in l[:self.config['K']]:
72 if node.id == self.target:
74 return self.callback([node])
75 if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
76 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
77 df = node.findNode(self.target, self.caller.node.id)
78 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
79 self.outstanding = self.outstanding + 1
80 self.queried[node.id] = 1
81 if self.outstanding >= self.config['CONCURRENT_REQS']:
83 assert self.outstanding >=0
84 if self.outstanding == 0:
87 reactor.callLater(0, self.callback, l[:self.config['K']])
89 def makeMsgFailed(self, node):
90 def defaultGotNodes(err, self=self, node=node):
91 log.msg("find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
93 self.caller.table.nodeFailed(node)
94 self.outstanding = self.outstanding - 1
96 return defaultGotNodes
98 def goWithNodes(self, nodes):
100 this starts the process, our argument is a transaction with t.extras being our list of nodes
101 it's a transaction since we got called from the dispatcher
104 if node.id == self.caller.node.id:
107 self.found[node.id] = node
112 get_value_timeout = 15
113 class GetValue(FindNode):
114 def __init__(self, caller, target, callback, config, find="findValue"):
115 FindNode.__init__(self, caller, target, callback, config)
116 self.findValue = find
118 """ get value task """
119 def handleGotNodes(self, dict):
120 _krpc_sender = dict['_krpc_sender']
122 n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
123 self.caller.insertNode(n)
124 if self.finished or self.answered.has_key(dict["id"]):
125 # a day late and a dollar short
127 self.outstanding = self.outstanding - 1
128 self.answered[dict["id"]] = 1
130 # if we have any closer than what we already got, query them
131 if dict.has_key('nodes'):
132 for compact_node in dict['nodes']:
133 node = uncompact(compact_node)
134 n = self.caller.Node(node)
135 if not self.found.has_key(n.id):
137 elif dict.has_key('values'):
138 def x(y, z=self.results):
144 z = len(dict['values'])
145 v = filter(None, map(x, dict['values']))
147 reactor.callLater(0, self.callback, self.target, v)
154 l = self.found.values()
157 for node in l[:self.config['K']]:
158 if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
159 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
161 f = getattr(node, self.findValue)
162 except AttributeError:
163 log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue))
165 df = f(self.target, self.caller.node.id)
166 df.addCallback(self.handleGotNodes)
167 df.addErrback(self.makeMsgFailed(node))
168 self.outstanding = self.outstanding + 1
169 self.queried[node.id] = 1
170 if self.outstanding >= self.config['CONCURRENT_REQS']:
172 assert self.outstanding >=0
173 if self.outstanding == 0:
174 ## all done, didn't find it!!
176 reactor.callLater(0, self.callback, self.target, [])
179 def goWithNodes(self, nodes, found=None):
185 if node.id == self.caller.node.id:
188 self.found[node.id] = node
193 class StoreValue(ActionBase):
194 def __init__(self, caller, target, value, callback, config, store="storeValue"):
195 ActionBase.__init__(self, caller, target, callback, config)
200 def storedValue(self, t, node):
201 self.outstanding -= 1
202 self.caller.insertNode(node)
205 self.stored.append(t)
206 if len(self.stored) >= self.config['STORE_REDUNDANCY']:
208 self.callback(self.target, self.value, self.stored)
210 if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
214 def storeFailed(self, t, node):
215 log.msg("store failed %s/%s" % (node.host, node.port))
216 self.caller.nodeFailed(node)
217 self.outstanding -= 1
226 num = self.config['CONCURRENT_REQS'] - self.outstanding
227 if num > self.config['STORE_REDUNDANCY']:
228 num = self.config['STORE_REDUNDANCY']
231 node = self.nodes.pop()
233 if self.outstanding == 0:
235 self.callback(self.target, self.value, self.stored)
237 if not node.id == self.caller.node.id:
238 self.outstanding += 1
240 f = getattr(node, self.store)
241 except AttributeError:
242 log.msg("%s doesn't have a %s method!" % (node, self.store))
244 df = f(self.target, self.value, node.token, self.caller.node.id)
245 df.addCallback(self.storedValue, node=node)
246 df.addErrback(self.storeFailed, node=node)
248 def goWithNodes(self, nodes):
250 self.nodes.sort(self.sort)
255 def __init__(self, store, config):
258 self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
261 self.store.expireValues(self.config['KE_AGE'])
262 self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
266 self.next_expire.cancel()