2 from bencode import bdecode as loads
3 from bencode import bencode as dumps
7 from const import reactor
10 from hash import intify
11 from knode import KNode as Node
12 from ktable import KTable, K
15 """ base class for some long running asynchronous proccesses like finding nodes or values """
16 def __init__(self, table, target, callback):
19 self.int = intify(target)
23 self.callback = callback
27 def sort(a, b, int=self.int):
28 """ this function is for sorting nodes relative to the ID we are looking for """
29 x, y = int ^ a.int, int ^ b.int
37 def goWithNodes(self, t):
42 FIND_NODE_TIMEOUT = 15
44 class FindNode(ActionBase):
45 """ find node action merits it's own class as it is a long running stateful process """
46 def handleGotNodes(self, args):
48 sender = Node().initWithDict(sender)
49 self.table.table.insertNode(sender)
50 if self.finished or self.answered.has_key(sender.id):
51 # a day late and a dollar short
53 self.outstanding = self.outstanding - 1
54 self.answered[sender.id] = 1
56 n = Node().initWithDict(node)
57 if not self.found.has_key(n.id):
63 send messages to new peers, if necessary
67 l = self.found.values()
71 if node.id == self.target:
73 return self.callback([node])
74 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
75 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
76 df = node.findNode(self.target, self.table.node.senderDict())
77 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
78 self.outstanding = self.outstanding + 1
79 self.queried[node.id] = 1
80 if self.outstanding >= const.CONCURRENT_REQS:
82 assert(self.outstanding) >=0
83 if self.outstanding == 0:
86 reactor.callFromThread(self.callback, l[:K])
88 def makeMsgFailed(self, node):
89 def defaultGotNodes(err, self=self, node=node):
90 self.table.table.nodeFailed(node)
91 self.outstanding = self.outstanding - 1
93 return defaultGotNodes
95 def goWithNodes(self, nodes):
97 this starts the process, our argument is a transaction with t.extras being our list of nodes
98 it's a transaction since we got called from the dispatcher
101 if node.id == self.table.node.id:
104 self.found[node.id] = node
109 GET_VALUE_TIMEOUT = 15
110 class GetValue(FindNode):
111 """ get value task """
112 def handleGotNodes(self, args):
114 sender = Node().initWithDict(sender)
115 self.table.table.insertNode(sender)
116 if self.finished or self.answered.has_key(sender.id):
117 # a day late and a dollar short
119 self.outstanding = self.outstanding - 1
120 self.answered[sender.id] = 1
122 # if we have any closer than what we already got, query them
123 if l.has_key('nodes'):
124 for node in l['nodes']:
125 n = Node().initWithDict(node)
126 if not self.found.has_key(n.id):
128 elif l.has_key('values'):
129 def x(y, z=self.results):
130 y = y.decode('base64')
136 v = filter(None, map(x, l['values']))
138 reactor.callFromThread(self.callback, v)
145 l = self.found.values()
149 if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
150 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
151 df = node.findValue(self.target, self.table.node.senderDict())
152 df.addCallback(self.handleGotNodes)
153 df.addErrback(self.makeMsgFailed(node))
154 self.outstanding = self.outstanding + 1
155 self.queried[node.id] = 1
156 if self.outstanding >= const.CONCURRENT_REQS:
158 assert(self.outstanding) >=0
159 if self.outstanding == 0:
160 ## all done, didn't find it!!
162 reactor.callFromThread(self.callback,[])
165 def goWithNodes(self, nodes, found=None):
171 if node.id == self.table.node.id:
174 self.found[node.id] = node
181 def __init__(self, store):
183 reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
186 self.cut = `time() - const.KE_AGE`
190 c = self.store.cursor()
191 s = "delete from kv where time < '%s';" % self.cut
194 reactor.callLater(const.KE_DELAY, self.doExpire)