1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
6 from const import reactor
9 from khash import intify
10 from ktable import KTable, K
13 """ base class for some long running asynchronous proccesses like finding nodes or values """
14 def __init__(self, table, target, callback):
17 self.num = intify(target)
21 self.callback = callback
25 def sort(a, b, num=self.num):
26 """ this function is for sorting nodes relative to the ID we are looking for """
27 x, y = num ^ a.num, num ^ b.num
35 def goWithNodes(self, t):
40 FIND_NODE_TIMEOUT = 15
42 class FindNode(ActionBase):
43 """ find node action merits it's own class as it is a long running stateful process """
44 def handleGotNodes(self, dict):
45 _krpc_sender = dict['_krpc_sender']
48 sender = {'id' : dict["id"]}
49 sender['port'] = _krpc_sender[1]
50 sender['host'] = _krpc_sender[0]
51 sender = self.table.Node().initWithDict(sender)
52 sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
53 self.table.table.insertNode(sender)
54 if self.finished or self.answered.has_key(sender.id):
55 # a day late and a dollar short
57 self.outstanding = self.outstanding - 1
58 self.answered[sender.id] = 1
60 n = self.table.Node().initWithDict(node)
61 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
62 if not self.found.has_key(n.id):
68 send messages to new peers, if necessary
72 l = self.found.values()
75 if node.id == self.target:
77 return self.callback([node])
78 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
79 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
80 df = node.findNode(self.target, self.table.node.id)
81 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
82 self.outstanding = self.outstanding + 1
83 self.queried[node.id] = 1
84 if self.outstanding >= const.CONCURRENT_REQS:
86 assert(self.outstanding) >=0
87 if self.outstanding == 0:
90 reactor.callLater(0, self.callback, l[:K])
92 def makeMsgFailed(self, node):
93 def defaultGotNodes(err, self=self, node=node):
94 print ">>> find failed %s/%s" % (node.host, node.port), err
95 self.table.table.nodeFailed(node)
96 self.outstanding = self.outstanding - 1
98 return defaultGotNodes
100 def goWithNodes(self, nodes):
102 this starts the process, our argument is a transaction with t.extras being our list of nodes
103 it's a transaction since we got called from the dispatcher
106 if node.id == self.table.node.id:
109 self.found[node.id] = node
114 get_value_timeout = 15
115 class GetValue(FindNode):
116 def __init__(self, table, target, callback, find="findValue"):
117 FindNode.__init__(self, table, target, callback)
118 self.findValue = find
120 """ get value task """
121 def handleGotNodes(self, dict):
122 _krpc_sender = dict['_krpc_sender']
124 sender = {'id' : dict["id"]}
125 sender['port'] = _krpc_sender[1]
126 sender['host'] = _krpc_sender[0]
127 sender = self.table.Node().initWithDict(sender)
128 sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
129 self.table.table.insertNode(sender)
130 if self.finished or self.answered.has_key(sender.id):
131 # a day late and a dollar short
133 self.outstanding = self.outstanding - 1
134 self.answered[sender.id] = 1
136 # if we have any closer than what we already got, query them
137 if dict.has_key('nodes'):
138 for node in dict['nodes']:
139 n = self.table.Node().initWithDict(node)
140 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
141 if not self.found.has_key(n.id):
143 elif dict.has_key('values'):
144 def x(y, z=self.results):
150 z = len(dict['values'])
151 v = filter(None, map(x, dict['values']))
153 reactor.callLater(0, self.callback, v)
160 l = self.found.values()
164 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
165 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
167 f = getattr(node, self.findValue)
168 except AttributeError:
169 print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
171 df = f(self.target, self.table.node.id)
172 df.addCallback(self.handleGotNodes)
173 df.addErrback(self.makeMsgFailed(node))
174 self.outstanding = self.outstanding + 1
175 self.queried[node.id] = 1
176 if self.outstanding >= const.CONCURRENT_REQS:
178 assert(self.outstanding) >=0
179 if self.outstanding == 0:
180 ## all done, didn't find it!!
182 reactor.callLater(0, self.callback,[])
185 def goWithNodes(self, nodes, found=None):
191 if node.id == self.table.node.id:
194 self.found[node.id] = node
199 class StoreValue(ActionBase):
200 def __init__(self, table, target, value, callback, store="storeValue"):
201 ActionBase.__init__(self, table, target, callback)
206 def storedValue(self, t, node):
207 self.outstanding -= 1
208 self.table.insertNode(node)
211 self.stored.append(t)
212 if len(self.stored) >= const.STORE_REDUNDANCY:
214 self.callback(self.stored)
216 if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
220 def storeFailed(self, t, node):
221 print ">>> store failed %s/%s" % (node.host, node.port)
222 self.table.nodeFailed(node)
223 self.outstanding -= 1
232 num = const.CONCURRENT_REQS - self.outstanding
233 if num > const.STORE_REDUNDANCY:
234 num = const.STORE_REDUNDANCY
237 node = self.nodes.pop()
239 if self.outstanding == 0:
241 self.callback(self.stored)
243 if not node.id == self.table.node.id:
244 self.outstanding += 1
246 f = getattr(node, self.store)
247 except AttributeError:
248 print ">>> %s doesn't have a %s method!" % (node, self.store)
250 df = f(self.target, self.value, self.table.node.id)
251 df.addCallback(self.storedValue, node=node)
252 df.addErrback(self.storeFailed, node=node)
254 def goWithNodes(self, nodes):
256 self.nodes.sort(self.sort)
261 def __init__(self, store):
263 reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
266 self.cut = "%0.6f" % (time() - const.KE_AGE)
270 c = self.store.cursor()
271 s = "delete from kv where time < '%s';" % self.cut
273 reactor.callLater(const.KE_DELAY, self.doExpire)