1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
6 from twisted.internet import reactor
8 from khash import intify
11 """ base class for some long running asynchronous proccesses like finding nodes or values """
12 def __init__(self, table, target, callback, config):
16 self.num = intify(target)
20 self.callback = callback
24 def sort(a, b, num=self.num):
25 """ this function is for sorting nodes relative to the ID we are looking for """
26 x, y = num ^ a.num, num ^ b.num
34 def goWithNodes(self, t):
39 FIND_NODE_TIMEOUT = 15
41 class FindNode(ActionBase):
42 """ find node action merits it's own class as it is a long running stateful process """
43 def handleGotNodes(self, dict):
44 _krpc_sender = dict['_krpc_sender']
47 sender = {'id' : dict["id"]}
48 sender['port'] = _krpc_sender[1]
49 sender['host'] = _krpc_sender[0]
50 sender = self.table.Node().initWithDict(sender)
51 sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
52 self.table.table.insertNode(sender)
53 if self.finished or self.answered.has_key(sender.id):
54 # a day late and a dollar short
56 self.outstanding = self.outstanding - 1
57 self.answered[sender.id] = 1
59 n = self.table.Node().initWithDict(node)
60 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
61 if not self.found.has_key(n.id):
67 send messages to new peers, if necessary
71 l = self.found.values()
73 for node in l[:self.config['K']]:
74 if node.id == self.target:
76 return self.callback([node])
77 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
78 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
79 df = node.findNode(self.target, self.table.node.id)
80 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
81 self.outstanding = self.outstanding + 1
82 self.queried[node.id] = 1
83 if self.outstanding >= self.config['CONCURRENT_REQS']:
85 assert(self.outstanding) >=0
86 if self.outstanding == 0:
89 reactor.callLater(0, self.callback, l[:self.config['K']])
91 def makeMsgFailed(self, node):
92 def defaultGotNodes(err, self=self, node=node):
93 print ">>> find failed %s/%s" % (node.host, node.port), err
94 self.table.table.nodeFailed(node)
95 self.outstanding = self.outstanding - 1
97 return defaultGotNodes
99 def goWithNodes(self, nodes):
101 this starts the process, our argument is a transaction with t.extras being our list of nodes
102 it's a transaction since we got called from the dispatcher
105 if node.id == self.table.node.id:
108 self.found[node.id] = node
113 get_value_timeout = 15
114 class GetValue(FindNode):
115 def __init__(self, table, target, callback, config, find="findValue"):
116 FindNode.__init__(self, table, target, callback, config)
117 self.findValue = find
119 """ get value task """
120 def handleGotNodes(self, dict):
121 _krpc_sender = dict['_krpc_sender']
123 sender = {'id' : dict["id"]}
124 sender['port'] = _krpc_sender[1]
125 sender['host'] = _krpc_sender[0]
126 sender = self.table.Node().initWithDict(sender)
127 sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
128 self.table.table.insertNode(sender)
129 if self.finished or self.answered.has_key(sender.id):
130 # a day late and a dollar short
132 self.outstanding = self.outstanding - 1
133 self.answered[sender.id] = 1
135 # if we have any closer than what we already got, query them
136 if dict.has_key('nodes'):
137 for node in dict['nodes']:
138 n = self.table.Node().initWithDict(node)
139 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
140 if not self.found.has_key(n.id):
142 elif dict.has_key('values'):
143 def x(y, z=self.results):
149 z = len(dict['values'])
150 v = filter(None, map(x, dict['values']))
152 reactor.callLater(0, self.callback, self.target, v)
159 l = self.found.values()
162 for node in l[:self.config['K']]:
163 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
164 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
166 f = getattr(node, self.findValue)
167 except AttributeError:
168 print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
170 df = f(self.target, self.table.node.id)
171 df.addCallback(self.handleGotNodes)
172 df.addErrback(self.makeMsgFailed(node))
173 self.outstanding = self.outstanding + 1
174 self.queried[node.id] = 1
175 if self.outstanding >= self.config['CONCURRENT_REQS']:
177 assert(self.outstanding) >=0
178 if self.outstanding == 0:
179 ## all done, didn't find it!!
181 reactor.callLater(0, self.callback, self.target, [])
184 def goWithNodes(self, nodes, found=None):
190 if node.id == self.table.node.id:
193 self.found[node.id] = node
198 class StoreValue(ActionBase):
199 def __init__(self, table, target, value, callback, config, store="storeValue"):
200 ActionBase.__init__(self, table, target, callback, config)
205 def storedValue(self, t, node):
206 self.outstanding -= 1
207 self.table.insertNode(node)
210 self.stored.append(t)
211 if len(self.stored) >= self.config['STORE_REDUNDANCY']:
213 self.callback(self.target, self.value, self.stored)
215 if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
219 def storeFailed(self, t, node):
220 print ">>> store failed %s/%s" % (node.host, node.port)
221 self.table.nodeFailed(node)
222 self.outstanding -= 1
231 num = self.config['CONCURRENT_REQS'] - self.outstanding
232 if num > self.config['STORE_REDUNDANCY']:
233 num = self.config['STORE_REDUNDANCY']
236 node = self.nodes.pop()
238 if self.outstanding == 0:
240 self.callback(self.target, self.value, self.stored)
242 if not node.id == self.table.node.id:
243 self.outstanding += 1
245 f = getattr(node, self.store)
246 except AttributeError:
247 print ">>> %s doesn't have a %s method!" % (node, self.store)
249 df = f(self.target, self.value, self.table.node.id)
250 df.addCallback(self.storedValue, node=node)
251 df.addErrback(self.storeFailed, node=node)
253 def goWithNodes(self, nodes):
255 self.nodes.sort(self.sort)
260 def __init__(self, store, config):
263 self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
266 self.store.expireValues(time() - self.config['KE_AGE'])
267 self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
271 self.next_expire.cancel()