3 from const import reactor
6 from hash import intify
7 from knode import KNode as Node
8 from ktable import KTable, K
11 """ base class for some long running asynchronous proccesses like finding nodes or values """
12 def __init__(self, table, target, callback):
15 self.num = intify(target)
19 self.callback = callback
23 def sort(a, b, num=self.num):
24 """ this function is for sorting nodes relative to the ID we are looking for """
25 x, y = num ^ a.num, num ^ b.num
33 def goWithNodes(self, t):
38 FIND_NODE_TIMEOUT = 15
40 class FindNode(ActionBase):
41 """ find node action merits it's own class as it is a long running stateful process """
42 def handleGotNodes(self, dict):
43 _krpc_sender = dict['_krpc_sender']
46 sender = dict["sender"]
47 sender['port'] = _krpc_sender[1]
48 sender = Node().initWithDict(sender)
49 sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
50 self.table.table.insertNode(sender)
51 if self.finished or self.answered.has_key(sender.id):
52 # a day late and a dollar short
54 self.outstanding = self.outstanding - 1
55 self.answered[sender.id] = 1
57 n = Node().initWithDict(node)
58 n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
59 if not self.found.has_key(n.id):
65 send messages to new peers, if necessary
69 l = self.found.values()
72 if node.id == self.target:
74 return self.callback([node])
75 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
76 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
77 df = node.findNode(self.target, self.table.node.senderDict())
78 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
79 self.outstanding = self.outstanding + 1
80 self.queried[node.id] = 1
81 if self.outstanding >= const.CONCURRENT_REQS:
83 assert(self.outstanding) >=0
84 if self.outstanding == 0:
87 reactor.callFromThread(self.callback, l[:K])
89 def makeMsgFailed(self, node):
90 def defaultGotNodes(err, self=self, node=node):
91 print ">>> find failed"
92 self.table.table.nodeFailed(node)
93 self.outstanding = self.outstanding - 1
95 return defaultGotNodes
97 def goWithNodes(self, nodes):
99 this starts the process, our argument is a transaction with t.extras being our list of nodes
100 it's a transaction since we got called from the dispatcher
103 if node.id == self.table.node.id:
106 self.found[node.id] = node
111 GET_VALUE_TIMEOUT = 15
112 class GetValue(FindNode):
113 """ get value task """
114 def handleGotNodes(self, dict):
115 _krpc_sender = dict['_krpc_sender']
117 sender = dict["sender"]
118 sender['port'] = _krpc_sender[1]
119 sender = Node().initWithDict(sender)
120 sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
121 self.table.table.insertNode(sender)
122 if self.finished or self.answered.has_key(sender.id):
123 # a day late and a dollar short
125 self.outstanding = self.outstanding - 1
126 self.answered[sender.id] = 1
128 # if we have any closer than what we already got, query them
129 if dict.has_key('nodes'):
130 for node in dict['nodes']:
131 n = Node().initWithDict(node)
132 n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
133 if not self.found.has_key(n.id):
135 elif dict.has_key('values'):
136 def x(y, z=self.results):
142 z = len(dict['values'])
143 v = filter(None, map(x, dict['values']))
145 reactor.callFromThread(self.callback, v)
152 l = self.found.values()
156 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
157 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
158 df = node.findValue(self.target, self.table.node.senderDict())
159 df.addCallback(self.handleGotNodes)
160 df.addErrback(self.makeMsgFailed(node))
161 self.outstanding = self.outstanding + 1
162 self.queried[node.id] = 1
163 if self.outstanding >= const.CONCURRENT_REQS:
165 assert(self.outstanding) >=0
166 if self.outstanding == 0:
167 ## all done, didn't find it!!
169 reactor.callFromThread(self.callback,[])
172 def goWithNodes(self, nodes, found=None):
178 if node.id == self.table.node.id:
181 self.found[node.id] = node
186 class StoreValue(ActionBase):
187 def __init__(self, table, target, value, callback):
188 ActionBase.__init__(self, table, target, callback)
192 def storedValue(self, t, node):
193 self.outstanding -= 1
194 self.table.insertNode(node)
197 self.stored.append(t)
198 if len(self.stored) >= const.STORE_REDUNDANCY:
200 self.callback(self.stored)
202 if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
205 def storeFailed(self, t, node):
206 print ">>> store failed"
207 self.table.nodeFailed(node)
208 self.outstanding -= 1
216 num = const.CONCURRENT_REQS - self.outstanding
217 if num > const.STORE_REDUNDANCY:
218 num = const.STORE_REDUNDANCY
221 node = self.nodes.pop()
223 if self.outstanding == 0:
225 self.callback(self.stored)
227 if not node.id == self.table.node.id:
228 self.outstanding += 1
229 df = node.storeValue(self.target, self.value, self.table.node.senderDict())
230 df.addCallback(self.storedValue, node=node)
231 df.addErrback(self.storeFailed, node=node)
233 def goWithNodes(self, nodes):
235 self.nodes.sort(self.sort)
240 def __init__(self, store):
242 reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
245 self.cut = "%0.6f" % (time() - const.KE_AGE)
249 c = self.store.cursor()
250 s = "delete from kv where time < '%s';" % self.cut
252 reactor.callLater(const.KE_DELAY, self.doExpire)