2 from pickle import loads, dumps
6 from const import reactor
9 from hash import intify
10 from knode import KNode as Node
11 from ktable import KTable, K
14 """ base class for some long running asynchronous proccesses like finding nodes or values """
15 def __init__(self, table, target, callback):
18 self.int = intify(target)
22 self.callback = callback
26 def sort(a, b, int=self.int):
27 """ this function is for sorting nodes relative to the ID we are looking for """
28 x, y = int ^ a.int, int ^ b.int
36 def goWithNodes(self, t):
41 FIND_NODE_TIMEOUT = 15
43 class FindNode(ActionBase):
44 """ find node action merits it's own class as it is a long running stateful process """
45 def handleGotNodes(self, args):
48 sender['host'] = conn['host']
49 sender = Node().initWithDict(sender)
50 self.table.table.insertNode(sender)
51 if self.finished or self.answered.has_key(sender.id):
52 # a day late and a dollar short
54 self.outstanding = self.outstanding - 1
55 self.answered[sender.id] = 1
57 n = Node().initWithDict(node)
58 if not self.found.has_key(n.id):
64 send messages to new peers, if necessary
68 l = self.found.values()
72 if node.id == self.target:
74 return self.callback([node])
75 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
76 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
77 df = node.findNode(self.target, self.table.node.senderDict())
78 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
79 self.outstanding = self.outstanding + 1
80 self.queried[node.id] = 1
81 if self.outstanding >= const.CONCURRENT_REQS:
83 assert(self.outstanding) >=0
84 if self.outstanding == 0:
87 reactor.callFromThread(self.callback, l[:K])
89 def makeMsgFailed(self, node):
90 def defaultGotNodes(err, self=self, node=node):
91 self.table.table.nodeFailed(node)
94 self.outstanding = self.outstanding - 1
96 return defaultGotNodes
98 def goWithNodes(self, nodes):
100 this starts the process, our argument is a transaction with t.extras being our list of nodes
101 it's a transaction since we got called from the dispatcher
104 if node.id == self.table.node.id:
106 self.found[node.id] = node
107 #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
108 df = node.findNode(self.target, self.table.node.senderDict())
109 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
110 self.outstanding = self.outstanding + 1
111 self.queried[node.id] = 1
112 if self.outstanding >= const.CONCURRENT_REQS:
115 if self.outstanding == 0:
119 GET_VALUE_TIMEOUT = 15
120 class GetValue(FindNode):
121 """ get value task """
122 def handleGotNodes(self, args):
125 sender['host'] = conn['host']
126 sender = Node().initWithDict(sender)
127 self.table.table.insertNode(sender)
128 if self.finished or self.answered.has_key(sender.id):
129 # a day late and a dollar short
131 self.outstanding = self.outstanding - 1
132 self.answered[sender.id] = 1
134 # if we have any closer than what we already got, query them
135 if l.has_key('nodes'):
136 for node in l['nodes']:
137 n = Node().initWithDict(node)
138 if not self.found.has_key(n.id):
140 elif l.has_key('values'):
141 def x(y, z=self.results):
148 v = filter(None, map(x, l['values']))
150 reactor.callFromThread(self.callback, v)
157 l = self.found.values()
161 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
162 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
163 df = node.findValue(self.target, self.table.node.senderDict())
164 df.addCallback(self.handleGotNodes)
165 df.addErrback(self.makeMsgFailed(node))
166 self.outstanding = self.outstanding + 1
167 self.queried[node.id] = 1
168 if self.outstanding >= const.CONCURRENT_REQS:
170 assert(self.outstanding) >=0
171 if self.outstanding == 0:
172 ## all done, didn't find it!!
174 reactor.callFromThread(self.callback,[])
177 def goWithNodes(self, nodes, found=None):
183 if node.id == self.table.node.id:
185 self.found[node.id] = node
186 #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
187 df = node.findValue(self.target, self.table.node.senderDict())
188 df.addCallback(self.handleGotNodes)
189 df.addErrback(self.makeMsgFailed(node))
190 self.outstanding = self.outstanding + 1
191 self.queried[node.id] = 1
192 if self.outstanding >= const.CONCURRENT_REQS:
195 if self.outstanding == 0:
196 reactor.callFromThread(self.callback, [])
201 def __init__(self, store, itime, kw):
205 reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
208 self.cut = `time() - const.KE_AGE`
212 ic = self.itime.cursor()
213 sc = self.store.cursor()
214 kc = self.kw.cursor()
217 irec = ic.set_range(self.cut)
218 except db.DBNotFoundError:
219 # everything is expired
228 k, v, lt = loads(self.store[h])
235 except db.DBNotFoundError:
236 print "Database inconsistency! No key->value entry when a store entry was found!"
246 reactor.callLater(const.KE_DELAY, self.doExpire)
248 print ">>>KE: done expiring %d" % i