1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
6 from const import reactor
9 from khash import intify
10 from knode import KNode as Node
11 from ktable import KTable, K
14 """ base class for some long running asynchronous proccesses like finding nodes or values """
15 def __init__(self, table, target, callback):
18 self.num = intify(target)
22 self.callback = callback
26 def sort(a, b, num=self.num):
27 """ this function is for sorting nodes relative to the ID we are looking for """
28 x, y = num ^ a.num, num ^ b.num
36 def goWithNodes(self, t):
41 FIND_NODE_TIMEOUT = 15
43 class FindNode(ActionBase):
44 """ find node action merits it's own class as it is a long running stateful process """
45 def handleGotNodes(self, dict):
46 _krpc_sender = dict['_krpc_sender']
49 sender = {'id' : dict["id"]}
50 sender['port'] = _krpc_sender[1]
51 sender['host'] = _krpc_sender[0]
52 sender = Node().initWithDict(sender)
53 sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
54 self.table.table.insertNode(sender)
55 if self.finished or self.answered.has_key(sender.id):
56 # a day late and a dollar short
58 self.outstanding = self.outstanding - 1
59 self.answered[sender.id] = 1
61 n = Node().initWithDict(node)
62 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
63 if not self.found.has_key(n.id):
69 send messages to new peers, if necessary
73 l = self.found.values()
76 if node.id == self.target:
78 return self.callback([node])
79 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
80 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
81 df = node.findNode(self.target, self.table.node.id)
82 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
83 self.outstanding = self.outstanding + 1
84 self.queried[node.id] = 1
85 if self.outstanding >= const.CONCURRENT_REQS:
87 assert(self.outstanding) >=0
88 if self.outstanding == 0:
91 reactor.callFromThread(self.callback, l[:K])
93 def makeMsgFailed(self, node):
94 def defaultGotNodes(err, self=self, node=node):
95 print ">>> find failed %s/%s" % (node.host, node.port)
96 self.table.table.nodeFailed(node)
97 self.outstanding = self.outstanding - 1
99 return defaultGotNodes
101 def goWithNodes(self, nodes):
103 this starts the process, our argument is a transaction with t.extras being our list of nodes
104 it's a transaction since we got called from the dispatcher
107 if node.id == self.table.node.id:
110 self.found[node.id] = node
115 GET_VALUE_TIMEOUT = 15
116 class GetValue(FindNode):
117 """ get value task """
118 def handleGotNodes(self, dict):
119 _krpc_sender = dict['_krpc_sender']
121 sender = {'id' : dict["id"]}
122 sender['port'] = _krpc_sender[1]
123 sender['host'] = _krpc_sender[0]
124 sender = Node().initWithDict(sender)
125 sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
126 self.table.table.insertNode(sender)
127 if self.finished or self.answered.has_key(sender.id):
128 # a day late and a dollar short
130 self.outstanding = self.outstanding - 1
131 self.answered[sender.id] = 1
133 # if we have any closer than what we already got, query them
134 if dict.has_key('nodes'):
135 for node in dict['nodes']:
136 n = Node().initWithDict(node)
137 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
138 if not self.found.has_key(n.id):
140 elif dict.has_key('values'):
141 def x(y, z=self.results):
147 z = len(dict['values'])
148 v = filter(None, map(x, dict['values']))
150 reactor.callFromThread(self.callback, v)
157 l = self.found.values()
161 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
162 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
163 df = node.findValue(self.target, self.table.node.id)
164 df.addCallback(self.handleGotNodes)
165 df.addErrback(self.makeMsgFailed(node))
166 self.outstanding = self.outstanding + 1
167 self.queried[node.id] = 1
168 if self.outstanding >= const.CONCURRENT_REQS:
170 assert(self.outstanding) >=0
171 if self.outstanding == 0:
172 ## all done, didn't find it!!
174 reactor.callFromThread(self.callback,[])
177 def goWithNodes(self, nodes, found=None):
183 if node.id == self.table.node.id:
186 self.found[node.id] = node
191 class StoreValue(ActionBase):
192 def __init__(self, table, target, value, callback):
193 ActionBase.__init__(self, table, target, callback)
197 def storedValue(self, t, node):
198 self.outstanding -= 1
199 self.table.insertNode(node)
202 self.stored.append(t)
203 if len(self.stored) >= const.STORE_REDUNDANCY:
205 self.callback(self.stored)
207 if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
210 def storeFailed(self, t, node):
211 print ">>> store failed %s/%s" % (node.host, node.port)
212 self.table.nodeFailed(node)
213 self.outstanding -= 1
221 num = const.CONCURRENT_REQS - self.outstanding
222 if num > const.STORE_REDUNDANCY:
223 num = const.STORE_REDUNDANCY
226 node = self.nodes.pop()
228 if self.outstanding == 0:
230 self.callback(self.stored)
232 if not node.id == self.table.node.id:
233 self.outstanding += 1
234 df = node.storeValue(self.target, self.value, self.table.node.id)
235 df.addCallback(self.storedValue, node=node)
236 df.addErrback(self.storeFailed, node=node)
238 def goWithNodes(self, nodes):
240 self.nodes.sort(self.sort)
245 def __init__(self, store):
247 reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
250 self.cut = "%0.6f" % (time() - const.KE_AGE)
254 c = self.store.cursor()
255 s = "delete from kv where time < '%s';" % self.cut
257 reactor.callLater(const.KE_DELAY, self.doExpire)