3 from const import reactor
6 from hash import intify
7 from knode import KNode as Node
8 from ktable import KTable, K
11 """ base class for some long running asynchronous proccesses like finding nodes or values """
12 def __init__(self, table, target, callback):
15 self.num = intify(target)
19 self.callback = callback
23 def sort(a, b, num=self.num):
24 """ this function is for sorting nodes relative to the ID we are looking for """
25 x, y = num ^ a.num, num ^ b.num
33 def goWithNodes(self, t):
38 FIND_NODE_TIMEOUT = 15
40 class FindNode(ActionBase):
41 """ find node action merits it's own class as it is a long running stateful process """
42 def handleGotNodes(self, args):
44 sender = Node().initWithDict(sender)
45 self.table.table.insertNode(sender)
46 if self.finished or self.answered.has_key(sender.id):
47 # a day late and a dollar short
49 self.outstanding = self.outstanding - 1
50 self.answered[sender.id] = 1
52 n = Node().initWithDict(node)
53 if not self.found.has_key(n.id):
59 send messages to new peers, if necessary
63 l = self.found.values()
67 if node.id == self.target:
69 return self.callback([node])
70 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
71 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
72 df = node.findNode(self.target, self.table.node.senderDict())
73 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
74 self.outstanding = self.outstanding + 1
75 self.queried[node.id] = 1
76 if self.outstanding >= const.CONCURRENT_REQS:
78 assert(self.outstanding) >=0
79 if self.outstanding == 0:
82 reactor.callFromThread(self.callback, l[:K])
84 def makeMsgFailed(self, node):
85 def defaultGotNodes(err, self=self, node=node):
86 self.table.table.nodeFailed(node)
87 self.outstanding = self.outstanding - 1
89 return defaultGotNodes
91 def goWithNodes(self, nodes):
93 this starts the process, our argument is a transaction with t.extras being our list of nodes
94 it's a transaction since we got called from the dispatcher
97 if node.id == self.table.node.id:
100 self.found[node.id] = node
105 GET_VALUE_TIMEOUT = 15
106 class GetValue(FindNode):
107 """ get value task """
108 def handleGotNodes(self, args):
110 sender = Node().initWithDict(sender)
111 self.table.table.insertNode(sender)
112 if self.finished or self.answered.has_key(sender.id):
113 # a day late and a dollar short
115 self.outstanding = self.outstanding - 1
116 self.answered[sender.id] = 1
118 # if we have any closer than what we already got, query them
119 if l.has_key('nodes'):
120 for node in l['nodes']:
121 n = Node().initWithDict(node)
122 if not self.found.has_key(n.id):
124 elif l.has_key('values'):
125 def x(y, z=self.results):
126 y = y.decode('base64')
132 v = filter(None, map(x, l['values']))
134 reactor.callFromThread(self.callback, v)
141 l = self.found.values()
145 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
146 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
147 df = node.findValue(self.target, self.table.node.senderDict())
148 df.addCallback(self.handleGotNodes)
149 df.addErrback(self.makeMsgFailed(node))
150 self.outstanding = self.outstanding + 1
151 self.queried[node.id] = 1
152 if self.outstanding >= const.CONCURRENT_REQS:
154 assert(self.outstanding) >=0
155 if self.outstanding == 0:
156 ## all done, didn't find it!!
158 reactor.callFromThread(self.callback,[])
161 def goWithNodes(self, nodes, found=None):
167 if node.id == self.table.node.id:
170 self.found[node.id] = node
177 def __init__(self, store):
179 reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
182 self.cut = "%0.6f" % (time() - const.KE_AGE)
186 c = self.store.cursor()
187 s = "delete from kv where time < '%s';" % self.cut
189 reactor.callLater(const.KE_DELAY, self.doExpire)