1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from twisted.internet import reactor
5 from twisted.python import log
7 from khash import intify
10 """ base class for some long running asynchronous proccesses like finding nodes or values """
11 def __init__(self, caller, target, callback, config):
15 self.num = intify(target)
19 self.callback = callback
23 def sort(a, b, num=self.num):
24 """ this function is for sorting nodes relative to the ID we are looking for """
25 x, y = num ^ a.num, num ^ b.num
33 def goWithNodes(self, t):
38 FIND_NODE_TIMEOUT = 15
40 class FindNode(ActionBase):
41 """ find node action merits it's own class as it is a long running stateful process """
42 def handleGotNodes(self, dict):
43 _krpc_sender = dict['_krpc_sender']
45 n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
46 self.caller.insertNode(n)
48 if self.finished or self.answered.has_key(dict["id"]):
49 # a day late and a dollar short
51 self.outstanding = self.outstanding - 1
52 self.answered[dict["id"]] = 1
54 n = self.caller.Node(node)
55 if not self.found.has_key(n.id):
61 send messages to new peers, if necessary
65 l = self.found.values()
67 for node in l[:self.config['K']]:
68 if node.id == self.target:
70 return self.callback([node])
71 if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
72 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
73 df = node.findNode(self.target, self.caller.node.id)
74 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
75 self.outstanding = self.outstanding + 1
76 self.queried[node.id] = 1
77 if self.outstanding >= self.config['CONCURRENT_REQS']:
79 assert self.outstanding >=0
80 if self.outstanding == 0:
83 reactor.callLater(0, self.callback, l[:self.config['K']])
85 def makeMsgFailed(self, node):
86 def defaultGotNodes(err, self=self, node=node):
87 log.msg("find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
89 self.caller.table.nodeFailed(node)
90 self.outstanding = self.outstanding - 1
92 return defaultGotNodes
94 def goWithNodes(self, nodes):
96 this starts the process, our argument is a transaction with t.extras being our list of nodes
97 it's a transaction since we got called from the dispatcher
100 if node.id == self.caller.node.id:
103 self.found[node.id] = node
108 get_value_timeout = 15
109 class GetValue(FindNode):
110 def __init__(self, caller, target, callback, config, find="findValue"):
111 FindNode.__init__(self, caller, target, callback, config)
112 self.findValue = find
114 """ get value task """
115 def handleGotNodes(self, dict):
116 _krpc_sender = dict['_krpc_sender']
118 n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
119 self.caller.insertNode(n)
120 if self.finished or self.answered.has_key(dict["id"]):
121 # a day late and a dollar short
123 self.outstanding = self.outstanding - 1
124 self.answered[dict["id"]] = 1
126 # if we have any closer than what we already got, query them
127 if dict.has_key('nodes'):
128 for node in dict['nodes']:
129 n = self.caller.Node(node)
130 if not self.found.has_key(n.id):
132 elif dict.has_key('values'):
133 def x(y, z=self.results):
139 z = len(dict['values'])
140 v = filter(None, map(x, dict['values']))
142 reactor.callLater(0, self.callback, self.target, v)
149 l = self.found.values()
152 for node in l[:self.config['K']]:
153 if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
154 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
156 f = getattr(node, self.findValue)
157 except AttributeError:
158 log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue))
160 df = f(self.target, self.caller.node.id)
161 df.addCallback(self.handleGotNodes)
162 df.addErrback(self.makeMsgFailed(node))
163 self.outstanding = self.outstanding + 1
164 self.queried[node.id] = 1
165 if self.outstanding >= self.config['CONCURRENT_REQS']:
167 assert self.outstanding >=0
168 if self.outstanding == 0:
169 ## all done, didn't find it!!
171 reactor.callLater(0, self.callback, self.target, [])
174 def goWithNodes(self, nodes, found=None):
180 if node.id == self.caller.node.id:
183 self.found[node.id] = node
188 class StoreValue(ActionBase):
189 def __init__(self, caller, target, value, originated, callback, config, store="storeValue"):
190 ActionBase.__init__(self, caller, target, callback, config)
192 self.originated = originated
196 def storedValue(self, t, node):
197 self.outstanding -= 1
198 self.caller.insertNode(node)
201 self.stored.append(t)
202 if len(self.stored) >= self.config['STORE_REDUNDANCY']:
204 self.callback(self.target, self.value, self.stored)
206 if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
210 def storeFailed(self, t, node):
211 log.msg("store failed %s/%s" % (node.host, node.port))
212 self.caller.nodeFailed(node)
213 self.outstanding -= 1
222 num = self.config['CONCURRENT_REQS'] - self.outstanding
223 if num > self.config['STORE_REDUNDANCY']:
224 num = self.config['STORE_REDUNDANCY']
227 node = self.nodes.pop()
229 if self.outstanding == 0:
231 self.callback(self.target, self.value, self.stored)
233 if not node.id == self.caller.node.id:
234 self.outstanding += 1
236 f = getattr(node, self.store)
237 except AttributeError:
238 log.msg("%s doesn't have a %s method!" % (node, self.store))
240 df = f(self.target, self.value, self.originated, self.caller.node.id)
241 df.addCallback(self.storedValue, node=node)
242 df.addErrback(self.storeFailed, node=node)
244 def goWithNodes(self, nodes):
246 self.nodes.sort(self.sort)
251 def __init__(self, store, config):
254 self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
257 self.store.expireValues(self.config['KE_AGE'])
258 self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
262 self.next_expire.cancel()