2 from pickle import loads, dumps
6 from const import reactor
9 from hash import intify
10 from knode import KNode as Node
11 from ktable import KTable, K
13 # concurrent FIND_NODE/VALUE requests!
17 """ base class for some long running asynchronous proccesses like finding nodes or values """
18 def __init__(self, table, target, callback):
21 self.int = intify(target)
25 self.callback = callback
29 def sort(a, b, int=self.int):
30 """ this function is for sorting nodes relative to the ID we are looking for """
31 x, y = int ^ a.int, int ^ b.int
39 def goWithNodes(self, t):
44 FIND_NODE_TIMEOUT = 15
46 class FindNode(ActionBase):
47 """ find node action merits it's own class as it is a long running stateful process """
48 def handleGotNodes(self, args):
50 sender = Node().initWithDict(sender)
51 self.table.table.insertNode(sender)
52 if self.finished or self.answered.has_key(sender.id):
53 # a day late and a dollar short
55 self.outstanding = self.outstanding - 1
56 self.answered[sender.id] = 1
58 n = Node().initWithDict(node)
59 if not self.found.has_key(n.id):
61 self.table.insertNode(n)
66 send messages to new peers, if necessary
70 l = self.found.values()
74 if node.id == self.target:
76 return self.callback([node])
77 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
78 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
79 df = node.findNode(self.target, self.table.node.senderDict())
80 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
81 self.outstanding = self.outstanding + 1
82 self.queried[node.id] = 1
83 if self.outstanding >= N:
85 assert(self.outstanding) >=0
86 if self.outstanding == 0:
89 reactor.callFromThread(self.callback, l[:K])
91 def makeMsgFailed(self, node):
92 def defaultGotNodes(err, self=self, node=node):
93 self.table.table.nodeFailed(node)
96 self.outstanding = self.outstanding - 1
98 return defaultGotNodes
100 def goWithNodes(self, nodes):
102 this starts the process, our argument is a transaction with t.extras being our list of nodes
103 it's a transaction since we got called from the dispatcher
106 if node.id == self.table.node.id:
108 self.found[node.id] = node
109 #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
110 df = node.findNode(self.target, self.table.node.senderDict())
111 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
112 self.outstanding = self.outstanding + 1
113 self.queried[node.id] = 1
114 if self.outstanding == 0:
118 GET_VALUE_TIMEOUT = 15
119 class GetValue(FindNode):
120 """ get value task """
121 def handleGotNodes(self, args):
123 sender = Node().initWithDict(sender)
124 self.table.table.insertNode(sender)
125 if self.finished or self.answered.has_key(sender.id):
126 # a day late and a dollar short
128 self.outstanding = self.outstanding - 1
129 self.answered[sender.id] = 1
131 # if we have any closer than what we already got, query them
132 if l.has_key('nodes'):
133 for node in l['nodes']:
134 n = Node().initWithDict(node)
135 if not self.found.has_key(n.id):
137 self.table.insertNode(n)
138 elif l.has_key('values'):
139 def x(y, z=self.results):
146 v = filter(None, map(x, l['values']))
148 reactor.callFromThread(self.callback, v)
155 l = self.found.values()
159 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
160 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
161 df = node.findValue(self.target, self.table.node.senderDict())
162 df.addCallback(self.handleGotNodes)
163 df.addErrback(self.makeMsgFailed(node))
164 self.outstanding = self.outstanding + 1
165 self.queried[node.id] = 1
166 if self.outstanding >= N:
168 assert(self.outstanding) >=0
169 if self.outstanding == 0:
170 ## all done, didn't find it!!
172 reactor.callFromThread(self.callback,[])
175 def goWithNodes(self, nodes, found=None):
181 if node.id == self.table.node.id:
183 self.found[node.id] = node
184 #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
185 df = node.findValue(self.target, self.table.node.senderDict())
186 df.addCallback(self.handleGotNodes)
187 df.addErrback(self.makeMsgFailed(node))
188 self.outstanding = self.outstanding + 1
189 self.queried[node.id] = 1
190 if self.outstanding == 0:
191 reactor.callFromThread(self.callback, [])
196 def __init__(self, store, itime, kw):
200 reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
203 self.cut = `time() - const.KE_AGE`
207 ic = self.itime.cursor()
208 sc = self.store.cursor()
209 kc = self.kw.cursor()
212 irec = ic.set_range(self.cut)
213 except db.DBNotFoundError:
214 # everything is expired
223 k, v, lt = loads(self.store[h])
230 except db.DBNotFoundError:
231 print "Database inconsistency! No key->value entry when a store entry was found!"
241 reactor.callLater(const.KE_DELAY, self.doExpire)
243 print ">>>KE: done expiring %d" % i