1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from twisted.internet import reactor
6 from khash import intify
9 """ base class for some long running asynchronous proccesses like finding nodes or values """
10 def __init__(self, caller, target, callback, config):
14 self.num = intify(target)
18 self.callback = callback
22 def sort(a, b, num=self.num):
23 """ this function is for sorting nodes relative to the ID we are looking for """
24 x, y = num ^ a.num, num ^ b.num
32 def goWithNodes(self, t):
37 FIND_NODE_TIMEOUT = 15
39 class FindNode(ActionBase):
40 """ find node action merits it's own class as it is a long running stateful process """
41 def handleGotNodes(self, dict):
42 _krpc_sender = dict['_krpc_sender']
44 n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
45 self.caller.insertNode(n)
47 if self.finished or self.answered.has_key(dict["id"]):
48 # a day late and a dollar short
50 self.outstanding = self.outstanding - 1
51 self.answered[dict["id"]] = 1
53 n = self.caller.Node(node)
54 if not self.found.has_key(n.id):
60 send messages to new peers, if necessary
64 l = self.found.values()
66 for node in l[:self.config['K']]:
67 if node.id == self.target:
69 return self.callback([node])
70 if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
71 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
72 df = node.findNode(self.target, self.caller.node.id)
73 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
74 self.outstanding = self.outstanding + 1
75 self.queried[node.id] = 1
76 if self.outstanding >= self.config['CONCURRENT_REQS']:
78 assert self.outstanding >=0
79 if self.outstanding == 0:
82 reactor.callLater(0, self.callback, l[:self.config['K']])
84 def makeMsgFailed(self, node):
85 def defaultGotNodes(err, self=self, node=node):
86 print ">>> find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port), err
87 self.caller.table.nodeFailed(node)
88 self.outstanding = self.outstanding - 1
90 return defaultGotNodes
92 def goWithNodes(self, nodes):
94 this starts the process, our argument is a transaction with t.extras being our list of nodes
95 it's a transaction since we got called from the dispatcher
98 if node.id == self.caller.node.id:
101 self.found[node.id] = node
106 get_value_timeout = 15
107 class GetValue(FindNode):
108 def __init__(self, caller, target, callback, config, find="findValue"):
109 FindNode.__init__(self, caller, target, callback, config)
110 self.findValue = find
112 """ get value task """
113 def handleGotNodes(self, dict):
114 _krpc_sender = dict['_krpc_sender']
116 n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
117 self.caller.insertNode(n)
118 if self.finished or self.answered.has_key(dict["id"]):
119 # a day late and a dollar short
121 self.outstanding = self.outstanding - 1
122 self.answered[dict["id"]] = 1
124 # if we have any closer than what we already got, query them
125 if dict.has_key('nodes'):
126 for node in dict['nodes']:
127 n = self.caller.Node(node)
128 if not self.found.has_key(n.id):
130 elif dict.has_key('values'):
131 def x(y, z=self.results):
137 z = len(dict['values'])
138 v = filter(None, map(x, dict['values']))
140 reactor.callLater(0, self.callback, self.target, v)
147 l = self.found.values()
150 for node in l[:self.config['K']]:
151 if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
152 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
154 f = getattr(node, self.findValue)
155 except AttributeError:
156 print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
158 df = f(self.target, self.caller.node.id)
159 df.addCallback(self.handleGotNodes)
160 df.addErrback(self.makeMsgFailed(node))
161 self.outstanding = self.outstanding + 1
162 self.queried[node.id] = 1
163 if self.outstanding >= self.config['CONCURRENT_REQS']:
165 assert self.outstanding >=0
166 if self.outstanding == 0:
167 ## all done, didn't find it!!
169 reactor.callLater(0, self.callback, self.target, [])
172 def goWithNodes(self, nodes, found=None):
178 if node.id == self.caller.node.id:
181 self.found[node.id] = node
186 class StoreValue(ActionBase):
187 def __init__(self, caller, target, value, originated, callback, config, store="storeValue"):
188 ActionBase.__init__(self, caller, target, callback, config)
190 self.originated = originated
194 def storedValue(self, t, node):
195 self.outstanding -= 1
196 self.caller.insertNode(node)
199 self.stored.append(t)
200 if len(self.stored) >= self.config['STORE_REDUNDANCY']:
202 self.callback(self.target, self.value, self.stored)
204 if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
208 def storeFailed(self, t, node):
209 print ">>> store failed %s/%s" % (node.host, node.port)
210 self.caller.nodeFailed(node)
211 self.outstanding -= 1
220 num = self.config['CONCURRENT_REQS'] - self.outstanding
221 if num > self.config['STORE_REDUNDANCY']:
222 num = self.config['STORE_REDUNDANCY']
225 node = self.nodes.pop()
227 if self.outstanding == 0:
229 self.callback(self.target, self.value, self.stored)
231 if not node.id == self.caller.node.id:
232 self.outstanding += 1
234 f = getattr(node, self.store)
235 except AttributeError:
236 print ">>> %s doesn't have a %s method!" % (node, self.store)
238 df = f(self.target, self.value, self.originated, self.caller.node.id)
239 df.addCallback(self.storedValue, node=node)
240 df.addErrback(self.storeFailed, node=node)
242 def goWithNodes(self, nodes):
244 self.nodes.sort(self.sort)
249 def __init__(self, store, config):
252 self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
255 self.store.expireValues(self.config['KE_AGE'])
256 self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
260 self.next_expire.cancel()