2 from pickle import loads, dumps
6 from const import reactor
8 from hash import intify
9 from knode import KNode as Node
10 from ktable import KTable, K
12 # concurrent FIND_NODE/VALUE requests!
16 """ base class for some long running asynchronous proccesses like finding nodes or values """
17 def __init__(self, table, target, callback):
20 self.int = intify(target)
24 self.callback = callback
28 def sort(a, b, int=self.int):
29 """ this function is for sorting nodes relative to the ID we are looking for """
30 x, y = int ^ a.int, int ^ b.int
38 def goWithNodes(self, t):
43 FIND_NODE_TIMEOUT = 15
45 class FindNode(ActionBase):
46 """ find node action merits it's own class as it is a long running stateful process """
47 def handleGotNodes(self, args):
49 sender = Node().initWithDict(sender)
50 if self.finished or self.answered.has_key(sender.id):
51 # a day late and a dollar short
53 self.outstanding = self.outstanding - 1
54 self.answered[sender.id] = 1
56 n = Node().initWithDict(node)
57 if not self.found.has_key(n.id):
59 self.table.insertNode(n)
64 send messages to new peers, if necessary
68 l = self.found.values()
72 if node.id == self.target:
74 return self.callback([node])
75 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
76 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
77 df = node.findNode(self.target, self.table.node.senderDict())
78 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
79 self.outstanding = self.outstanding + 1
80 self.queried[node.id] = 1
81 if self.outstanding >= N:
83 assert(self.outstanding) >=0
84 if self.outstanding == 0:
87 reactor.callFromThread(self.callback, l[:K])
89 def defaultGotNodes(self, t):
92 self.outstanding = self.outstanding - 1
96 def goWithNodes(self, nodes):
98 this starts the process, our argument is a transaction with t.extras being our list of nodes
99 it's a transaction since we got called from the dispatcher
102 if node.id == self.table.node.id:
104 self.found[node.id] = node
105 #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
106 df = node.findNode(self.target, self.table.node.senderDict())
107 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
108 self.outstanding = self.outstanding + 1
109 self.queried[node.id] = 1
110 if self.outstanding == 0:
114 GET_VALUE_TIMEOUT = 15
115 class GetValue(FindNode):
116 """ get value task """
117 def handleGotNodes(self, args):
119 sender = Node().initWithDict(sender)
120 if self.finished or self.answered.has_key(sender.id):
121 # a day late and a dollar short
123 self.outstanding = self.outstanding - 1
124 self.answered[sender.id] = 1
126 # if we have any closer than what we already got, query them
127 if l.has_key('nodes'):
128 for node in l['nodes']:
129 n = Node().initWithDict(node)
130 if not self.found.has_key(n.id):
132 self.table.insertNode(n)
133 elif l.has_key('values'):
134 def x(y, z=self.results):
139 v = filter(None, map(x, l['values']))
141 reactor.callFromThread(self.callback, v)
148 l = self.found.values()
152 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
153 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
154 df = node.findValue(self.target, self.table.node.senderDict())
155 df.addCallback(self.handleGotNodes)
156 df.addErrback(self.defaultGotNodes)
157 self.outstanding = self.outstanding + 1
158 self.queried[node.id] = 1
159 if self.outstanding >= N:
161 assert(self.outstanding) >=0
162 if self.outstanding == 0:
163 ## all done, didn't find it!!
165 reactor.callFromThread(self.callback,[])
168 def goWithNodes(self, nodes, found=None):
174 if node.id == self.table.node.id:
176 self.found[node.id] = node
177 #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
178 df = node.findValue(self.target, self.table.node.senderDict())
179 df.addCallback(self.handleGotNodes)
180 df.addErrback(self.defaultGotNodes)
181 self.outstanding = self.outstanding + 1
182 self.queried[node.id] = 1
183 if self.outstanding == 0:
184 reactor.callFromThread(self.callback, [])
187 KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours
188 KE_DELAY = 60 * 60 # 1 hour
189 KE_AGE = KEINITIAL_DELAY
192 def __init__(self, store, itime, kw):
196 reactor.callLater(KEINITIAL_DELAY, self.doExpire)
199 self.cut = `time() - KE_AGE`
203 ic = self.itime.cursor()
204 sc = self.store.cursor()
205 kc = self.kw.cursor()
208 irec = ic.set_range(self.cut)
209 except db.DBNotFoundError:
210 # everything is expired
219 k, v, lt = loads(self.store[h])
226 except db.DBNotFoundError:
227 print "Database inconsistency! No key->value entry when a store entry was found!"
237 reactor.callLater(KE_DELAY, self.doExpire)
239 print ">>>KE: done expiring %d" % i