1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from twisted.internet import reactor
6 from khash import intify
9 """ base class for some long running asynchronous proccesses like finding nodes or values """
10 def __init__(self, table, target, callback, config):
14 self.num = intify(target)
18 self.callback = callback
22 def sort(a, b, num=self.num):
23 """ this function is for sorting nodes relative to the ID we are looking for """
24 x, y = num ^ a.num, num ^ b.num
32 def goWithNodes(self, t):
37 FIND_NODE_TIMEOUT = 15
39 class FindNode(ActionBase):
40 """ find node action merits it's own class as it is a long running stateful process """
41 def handleGotNodes(self, dict):
42 _krpc_sender = dict['_krpc_sender']
45 sender = {'id' : dict["id"]}
46 sender['port'] = _krpc_sender[1]
47 sender['host'] = _krpc_sender[0]
48 sender = self.table.Node().initWithDict(sender)
49 sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
50 self.table.table.insertNode(sender)
51 if self.finished or self.answered.has_key(sender.id):
52 # a day late and a dollar short
54 self.outstanding = self.outstanding - 1
55 self.answered[sender.id] = 1
57 n = self.table.Node().initWithDict(node)
58 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
59 if not self.found.has_key(n.id):
65 send messages to new peers, if necessary
69 l = self.found.values()
71 for node in l[:self.config['K']]:
72 if node.id == self.target:
74 return self.callback([node])
75 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
76 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
77 df = node.findNode(self.target, self.table.node.id)
78 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
79 self.outstanding = self.outstanding + 1
80 self.queried[node.id] = 1
81 if self.outstanding >= self.config['CONCURRENT_REQS']:
83 assert(self.outstanding) >=0
84 if self.outstanding == 0:
87 reactor.callLater(0, self.callback, l[:self.config['K']])
89 def makeMsgFailed(self, node):
90 def defaultGotNodes(err, self=self, node=node):
91 print ">>> find failed %s/%s" % (node.host, node.port), err
92 self.table.table.nodeFailed(node)
93 self.outstanding = self.outstanding - 1
95 return defaultGotNodes
97 def goWithNodes(self, nodes):
99 this starts the process, our argument is a transaction with t.extras being our list of nodes
100 it's a transaction since we got called from the dispatcher
103 if node.id == self.table.node.id:
106 self.found[node.id] = node
111 get_value_timeout = 15
112 class GetValue(FindNode):
113 def __init__(self, table, target, callback, config, find="findValue"):
114 FindNode.__init__(self, table, target, callback, config)
115 self.findValue = find
117 """ get value task """
118 def handleGotNodes(self, dict):
119 _krpc_sender = dict['_krpc_sender']
121 sender = {'id' : dict["id"]}
122 sender['port'] = _krpc_sender[1]
123 sender['host'] = _krpc_sender[0]
124 sender = self.table.Node().initWithDict(sender)
125 sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
126 self.table.table.insertNode(sender)
127 if self.finished or self.answered.has_key(sender.id):
128 # a day late and a dollar short
130 self.outstanding = self.outstanding - 1
131 self.answered[sender.id] = 1
133 # if we have any closer than what we already got, query them
134 if dict.has_key('nodes'):
135 for node in dict['nodes']:
136 n = self.table.Node().initWithDict(node)
137 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
138 if not self.found.has_key(n.id):
140 elif dict.has_key('values'):
141 def x(y, z=self.results):
147 z = len(dict['values'])
148 v = filter(None, map(x, dict['values']))
150 reactor.callLater(0, self.callback, self.target, v)
157 l = self.found.values()
160 for node in l[:self.config['K']]:
161 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
162 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
164 f = getattr(node, self.findValue)
165 except AttributeError:
166 print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
168 df = f(self.target, self.table.node.id)
169 df.addCallback(self.handleGotNodes)
170 df.addErrback(self.makeMsgFailed(node))
171 self.outstanding = self.outstanding + 1
172 self.queried[node.id] = 1
173 if self.outstanding >= self.config['CONCURRENT_REQS']:
175 assert(self.outstanding) >=0
176 if self.outstanding == 0:
177 ## all done, didn't find it!!
179 reactor.callLater(0, self.callback, self.target, [])
182 def goWithNodes(self, nodes, found=None):
188 if node.id == self.table.node.id:
191 self.found[node.id] = node
196 class StoreValue(ActionBase):
197 def __init__(self, table, target, value, callback, config, store="storeValue"):
198 ActionBase.__init__(self, table, target, callback, config)
203 def storedValue(self, t, node):
204 self.outstanding -= 1
205 self.table.insertNode(node)
208 self.stored.append(t)
209 if len(self.stored) >= self.config['STORE_REDUNDANCY']:
211 self.callback(self.target, self.value, self.stored)
213 if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
217 def storeFailed(self, t, node):
218 print ">>> store failed %s/%s" % (node.host, node.port)
219 self.table.nodeFailed(node)
220 self.outstanding -= 1
229 num = self.config['CONCURRENT_REQS'] - self.outstanding
230 if num > self.config['STORE_REDUNDANCY']:
231 num = self.config['STORE_REDUNDANCY']
234 node = self.nodes.pop()
236 if self.outstanding == 0:
238 self.callback(self.target, self.value, self.stored)
240 if not node.id == self.table.node.id:
241 self.outstanding += 1
243 f = getattr(node, self.store)
244 except AttributeError:
245 print ">>> %s doesn't have a %s method!" % (node, self.store)
247 df = f(self.target, self.value, self.table.node.id)
248 df.addCallback(self.storedValue, node=node)
249 df.addErrback(self.storeFailed, node=node)
251 def goWithNodes(self, nodes):
253 self.nodes.sort(self.sort)
258 def __init__(self, store, config):
261 self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
264 self.store.expireValues(self.config['KE_AGE'])
265 self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
269 self.next_expire.cancel()